Getting Started with List Processing

Introduction

The ArcaPix PixStor Python API provides effective methods for analysing PixStor filesystems, by combining the ease and flexibility of Python with the speed of PixStor’s parallelised data scanning.

With the ArcaPix API and as few as 4 lines of Python, it is simple to determine the amount of filesystem space used by temporary files, analyse high data usage by users, compare snapshots to find new or deleted files, report the average age of data, and more.

In addition, the ArcaPix PixStor Python API can combine list processing with other (third-party) libraries enabling tasks such as creating file analytics graphs (E.G. via MatPlotLib).

Background

The PixStor policy engine generates lists of the files on a filesystem and their attributes. PixStor can interrogate the filesystem metadata across many nodes in parallel, with many concurrent threads on each node, to scan files faster than a conventional (‘tree-walk’) recursive scan.

Traditional list processing with PixStor requires the creation of PixStor policies with LIST and EXTERNAL LIST rules. The file lists generated by the policy engine are passed to external scripts for post-processing.

The ArcaPix PixStor Python API simplifies this process.

Firstly, policies can be generated and run directly from the Python, removing the need for external policy files or standalone scripts.

Secondly, the API combines the actions of a list rule and an external list rule into a single object - the ListProcessingRule. Moreover, the API allows the definition of a (Python) processing function, which is passed to the list processing rule in place of an external script.

Thirdly, the API takes care of the boilerplate - loading and parsing the file lists that PixStor generates. In effect programmer’s only concern is the returned information.


Before You Start…

List Processing with the ArcaPix API requires only a basic knowledge of Python. An awareness of generators, serialisation (’pickling’) and lambda expressions is highly useful, but non-essential.

You can find more information on these Python topics in our ArcaPix API Almanac.



List Processing In Action

Say we want to know how much of our filesystem’s space is being used by temporary files. To begin, we create a management policy object

>>> from arcapix.fs.gpfs import ManagementPolicy

>>> p = ManagementPolicy()

To work out the total amount of space used by the files matched by the policy engine, we use processing function which loops over those files and adds up their sizes

>>> def total_size(file_list):
...    result = 0
...    for f in file_list:
...       result = result + f.filesize
...    return result

The processing function will be given a generator (a collection) of a special type of object called an AttrDict. Each AttrDict object corresponds to a file returned by the policy engine, and its attributes can be looked up as f.name, f.filesize, etc.

Now we create a list processing rule to apply the function

>>> from arcapix.fs.gpfs import ListProcessingRule

>>> r = p.rules.new(ListProcessingRule,
...                 listname  = 'size_list',
...                 processor = total_size )

Notice that we set the rule’s ‘processor’ equal to the function we just defined, but without its brackets. In Python, functions are objects, which aren’t executed until ‘called’ - that is, referenced with brackets, such as total_size(file_list)

If we were to run the policy rule now, it would work out the total size for all files on the filesystem. To restrict the processing to only temporary files (files with the ‘.tmp’ extension), we take advantage of rule criteria

>>> from arcapix.fs.gpfs import Criteria

>>> r.criteria.new(Criteria.like('name', '*.tmp'))

Now we can run our policy. To run the policy on filesystem mmfs1, we simply call

>>> p.run('mmfs1')

{'size_list': 123432}

This returns a dictionary with the rule’s ‘listname’ as the key, and the result of the list processing as the value. A policy with multiple list processing rules (executing multiple processing rules in parallel) returns a (key: value) entry for each rule.

MapReduce

We’ve seen how simple the ArcaPix API makes list processing. But the previous example doesn’t take full advantage of PixStor’s parallelism.

If the policy engine matches a large number of files, multiple sub-lists will be generated - each containing up to 8000 files, by default. The previous, simple list processing example waits for all of these lists to be generated before any processing is done. But with the ArcaPix API, each list can be processed as it is generated by taking advantage of the MapReduceRule functionality.

Breaking Up Functions

MapReduce rules break the processing function into two parts

Map:Takes an AttrDict object and returns a value - often a particular file attribute or header
Reduce:Combines each value returned by Map with a running ‘result’ - typically a simple addition operation is used

Ignoring parallelism, the effective MapReduce process works as follows:

1
2
3
for f in file_list:
    result = reducefn(result, mapfn(f))
return result

In the basic List Processing example (above), the total_size function looks up the ‘filesize’ attribute for each file in the lists, then adds up those values to produce a total.

This same process can be split into two parts:

1
2
3
4
5
6
7
def mapfn(f):
   # look up the filesize
   return f.filesize

def reducefn(val1, val2):
   # add up two values
   return val1 + val2

But why bother doing this?

Processing in Parallel

In the basic List Processing example, the total_size function needs all the lists to be generated before it can loop over them.

With separate map and reduce functions, each sub-list can be processed individually to give an intermediate result - and more importantly, multiple sub-lists can be processed at once in parallel.

Once all the sub-lists have been generated and processed individually, their intermediate results can be reduced once more to produce a single, final result for all the files found by the policy engine.

ArcaPix PixStor Python API MapReduce Diagram

As an example - if we have 80,000 files, the basic list processing has to loop over (iterate) all these files one by one. The MapReduce methodology, however, would loop over 10 lists of 8,000 files concurrently, to produce 10 intermediate results. Each intermediate result is then looped over to produce the single final result.

That means basic list processing does 80,000 sequential operations, compared with an effective 8000+10 sequential operations for the equivalent MapReduce processing - an almost 10x speed increase.

Using PixStor’s default list processing settings, upto 24 lists can be processed concurrently, providing a best case speed increase of almost 20x, compared with the equivalent basic list processing.

MapReduce In Action

MapReduceRules are implemented in much the same way as ListProcessingRules. A management policy is created. Processing functions are defined. A rule is added to the policy. The policy is run.

>>> from arcapix.fs.gpfs import ManagementPolicy, MapReduceRule, Criteria

>>> p = ManagementPolicy()

>>> r = p.rules.new(MapReduceRule,
...                 listname = 'size_list2',
...                 mapfn    = lambda f: f.filesize,
...                 reducefn = lambda val1, val2: val1+val2)

>>> r.criteria.new(Criteria.like('name', '*.tmp'))

>>> p.run('mmfs1')

{'size_list': 123432}

Notice that the functions for map and reduce weren’t separately - we defined them ‘in-line’, using the special ‘lambda’ syntax. For simple functions, the lambda syntax lets us write more compact code; but writing the functions separately is equally valid.

Otherwise, the only difference between creating a ListProcessingRule and creating a MapReduceRule is how the processing function(s) are defined.

See the API Alamanc for further discussion.

Post-Processing MapReduce

In most cases, a processing function can be refactored into map and reduce functions. Often the reduce function will performs a simple addition.

But occasionally, map and reduce alone can’t produce the required result. To work out an average of file sizes, it would be incorrect to use the reduce function

>>> def reducefn(x, y):
...     return (x + y) / 2

E.G. calculating the average of the list [1, 2, 3] by reducing with this function would give an incorrect result after the first iteration

>>> result = reducefn(1, 2)  # 1.5
>>> result = reducefn(result, 3)  # 2.25

Instead, an output function can be used to post-process the results of a MapReduce operation.

To work out the average of file size, we have the map and reduce functions produce the sum of file sizes and the total file count

1
2
3
4
5
def mapfn(f):
   return (f.filesize, 1)

def reducefn(val1, val2):
   return (val1[0] + val2[0], val1[1] + val2[1])

Then an output function is used to divide the size by the count

6
7
def output(result):
   return result[0] / result[1]

Putting it together, and using lambda-style functions, the resulting MapReduce implementation is

1
2
3
4
5
p.rules.new(MapReduceRule,
            listname = 'averagesize',
            mapfn    = lambda x   : (x.filesize, 1),
            reducefn = lambda x,y : (x[0] + y[0], x[1] + y[1]),
            output   = lambda x   :  x[0] / x[1])

To take full advantage of the MapReduce paradigm, the map and reduce functions should do as much of the processing work as possible, and post-processing (output) should do as little as possible.

Tips and Tricks

Building Collections

To build a list of filenames, the initial approach might be to use Python’s standard list.append functionality. However, due to the method by which the MapReduce rules work, a better approach exists.

By having the map function return each value as a single-membered list, the reduce function can simply be an addition as in the following implementation

1
2
3
4
5
def mapfn(f):
   return [f.filename]

def reducefn(x, y):
   return x + y

Various collection types can similarly constructed.

To build a set of unique files owners, the following implementation could be used

1
2
3
4
5
def mapfn(f):
   return set([f.userid])

def reducefn(x, y):
   return x | y

In all cases, the map function is required to return a single-membered collection of the appropriate type.

Counters

List processing provides an excellent method for enumerating file attributes. To aid in this, the Counter object from the Python collections library can be used.

>>> from collections import Counter

To count how many files are owned by each user on a filesystem, a processor function can be defined:

1
2
3
4
5
def count_users(file_list):
   c = Counter()
   for f in file_list:
      c.update([f.userid])
   return c

Alternatively, this can be implemented using MapReduce:

1
2
mapfn    = lambda f   : Counter([f.userid])
reducefn = lambda x,y : x + y

Counters can also be used to find total sizes for different categories, E.G

To determine the amount of space used by different file types:

1
2
3
4
5
6
def type_sizes(file_list):
    c = Counter()
    for f in file_list:
        ext = os.path.splitext(f.pathname)
        c.update({ext: f.filesize})
    return c

To determine which user is using the most space on a filesystem:

1
2
3
mapfn    = lambda f      : Counter({f.userid: f.filesize})
reducefn = lambda x,y    : x + y
output   = lambda result : result.most_common()

Here, the Counter object’s most_common() function is used to return the items in the Counter in descending count (size) order.

Determining Usernames

List processing returns file owners by numeric userid. To convert this value to a user name, the Python pwd library can be used.

>>> pwd.getpwuid(f.userid)[0]  # f.userid = 0
'root'

Similarly, to look-up group names from a groupid, the Python grp Library can be used.

It is advised to cache the names, otherwise performance is likely to be affected by repeated lookups.

Time-Based Attributes

File access time, creation time, and modification time are automatically converted to Python datetime objects.

To calculate the age of a file (in days), a method can be used as follows:

>>> from datetime import datetime
>>> (datetime.today() - f.creation).days
13

The year a file was last accessed can be determined as:

>>> f.access.year
2016

Refer to the Python datetime documentation page for more information.

Directories Plus

The default setting for the policy engine ensures only ‘regular’ file objects are returned. To include directories and symlinks in search results, enable the ‘directories plus’ option with the Rule change method

>>> r.change(directories_plus=True)

Run Options

PixStor’s default policy execution values specify processing of 100 files per thread and 24 threads per node.

In most cases the PixStor maxFiles parameter is vastly conservative and should be increased by orders of magnitude. For that reason, the default maxFiles has been changed to 8000 files per thread in the ArcaPix API.

But, while greater processing performance will be observed with higher numbers of files and threads, if the cluster performance is impeded by policy processing it is advised to decrease the threadLevel parameter.

It is advised to start with a smaller value for threadLevel and increase until the optimum balance between List processing and performance is determined.

These options can be changed as part of the policy.run function:

>>> p.run('mmfs1', maxFiles=250, threadLevel=10)

Use the nodes parameter to specify one or more nodes, or a nodeclass on which to run policies. By default policies are run on the local node only.

Use the cleanup parameter to specify whether the temporary workfiles are removed after policy execution. Keeping workfiles can be useful for debugging.

Refer to the main Policy documentation page for more information.

Warnings and Caveats

API Policy Dependecies

The API can be used to export policies as PixStor-compatible text files in order to backup or share them, as the API can easily reload these files.

However using a policy produced via the API natively (I.E. via the mmapplypolicy command) will fail to execute if the API isn’t installed.

An example of an API policy is given below; it invokes the policydriver on any specified node(s) upon which the API is installed.

RULE 'mr_unnamed_c11f7868' EXTERNAL LIST snapshot-search EXEC '/usr/bin/python /opt/arcapix/pythonapi/arcapix/fs/gpfs/policydriver.py'
OPTS 'eJyFjbtOwzAYRn+nQIO5lNvEwtoOydBW7VJ1gc1ShyCPJTKOCyaO048mKqBGYim8NrcHYD1HR+c90NqVdRavzLNVzr6pypY+/mVLq3NneLqwzqWL2usfxUHd
/5NC5SZd5eav42DamzXXZWY4gq5gIhAtcS0vH4g2RDfzDjWMNoyeiO4CQptu0ZptsSPD77nxqjDY3WJPshe0JbMIP7AvryZ2+Vo9lj6yfllX0WgQjbUeDLUZjkf9
/hRchhOnivtMTXEgmCQc9noVjhIcC2rQ+cRJgtMGZ7M5zhtcVEn8BT4OVJo=e1ea6883ad53e6a57c565a9b110a65d4 snaptest-c11f7868'

RULE 'unnamed_c11f7868' LIST 'snapshot-search'
 SHOW(' pathname=' || varchar(PATH_NAME))

In such instances where the policy is invoked manually, the policydriver creates workfiles which require further processing to produce a result.

$ cat -v snapshot-search-c11f7868-15774.F94FF09C.1.out
M-^@^Bccollections
Counter
q^@}q^A(U^Q/mmfs1/.policytmpq^BMM-)2U^F/mmfs1q^CMM-)2uM-^Eq^DRq^E.

It is therefore recommended to utilise the API directly, since it provides all such functionality.

Temporary Files

All temporary files are witten to /<filesystem_mount_point>/.policytmp by default. If you do not have permission to write to this directory, your policies will fail.

To change where temporary policy files are written to, use the filesystem.setTempFilePath method, E.G.

>>> from arcapix.fs.gpfs.filesystem import setTempFilePath
>>> setTempFilePath('/tmp')

This will change the temporary file path for the particular session in which it was called.

To change where temporary work files are written to, use the localWorkDirectory or globalWorkDirectory options with ManagementPolicy.run, E.G.

>>> p.run('mmfs1', localWorkDirectory='/tmp')

Initial Values

MapReduce rules can be specified with an initial value. In most cases specifying an initial value is not necessary. If the initial value is omitted, the value returned by the first call to ‘map’ will be utilised as the initial value.

However, if the policy engine matches no files and initial value is omitted the result of the MapReduce is None.

None Values

The API converts Files or folders with an attribute value of NULL (such as the creation time for a filesystem’s root folder) to a value of None.

Similarly, if the policy engine matches no rules and an initial MapReduce value has not been specified the return value will be None.

Code should be designed to be able to handle these None values, or else errors may result.

Escaping

The escape option can be used to sanities file names which contain unusual characters. For most cases, the escape string '%/+@#= .:' works well

>>> rule = p.rules.new(MapReduceRule, 'files', lambda f: [f.path])
>>> rule.change(escape='%/+@#= .:')

The unqote function (from urllib in python2 and urlparse in python3) can then be used to ‘un-escape’ the results. For a MapReduceRule, a good place to do this is in the output function.

>>> rule.change(output=lambda x: [unquote(f) for f in x])

Note - When using the escape option, be sure to always include the space character (’ ‘), or else you will get parsing errors during list processing execution.

Examples

The below examples are presented in their simplest working form. It is left as an exercise for the user to extend the examples appropriately. E.G. implement Exceptions to catch values of ‘None’. For further information refer to Warnings and Caveats.

Find the Difference Between Two Snapshots

By default, a list processing rule will not match data contained within filesystem snapshots (files in the .snapshots folder).

>>> p.run('mmfs1')

In order to match files from a snapshot, the policy needs needs to be run with the snapshot flag:

>>> p.run('mmfs1', snapshot='global-snapshot')

To determine the difference between two snapshots, a list processing rule can be used to scan the snapshots and return file lists in the form of Python sets

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from arcapix.fs.gpfs import ManagementPolicy, MapReduceRule

# list processing helper function
def helper(f):
    # compare files by name / path name / creation time
    # pathname.split(...) removes the snapshot directory part of the path
    return [(f.name, f.pathname.split('/', 4)[-1], f.creation)]

p = ManagementPolicy()
p.rules.new(MapReduceRule, 'snap', mapfn=helper, output=set)
p.rules.change(directories_plus=True) # include folders, etc.

# find files from snap 1 - CHANGE THIS NAME to an existing snap
oldSnap = p.run("mmfs1", snapshot="old-snap")['snap']

# find files from snap 2 - CHANGE THIS NAME to an existing snap
newSnap = p.run("mmfs1", snapshot="new-snap")['snap']

We can then find differences between the snapshots as

18
19
deleted = sorted(i[0] for i in (oldSnap - newSnap))
created = sorted(i[0] for i in (newSnap - oldSnap))

In the above example (oldSnap-newSnap) returns the files and directories in set oldSnap minus any files and directories that are present in both oldSnap and newSnap. Therefore the values returned to list deleted are files and directories that have been deleted from the filesytem between the creation of snapshots old-snap and new-snap.

Similarly, (newSnap-oldSnap) returns the files and directories in set newSnap minus any files and directories that are present in both newSnap and oldSnap. Therefore the values returned to list created are files and directories that have been added to the filesytem between the creation of snapshots old-snap and new-snap.

Plot a Pie Chart of Space Usage by File Type

To plot a pie chart displaying the disk space used by different file types, a ListProcessingRule can be used with the third-party library MatPlotLib

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from arcapix.fs.gpfs import ManagementPolicy, ListProcessingRule
import matplotlib.pyplot as plt
from collections import Counter
from os.path import splitext

p = ManagementPolicy()

def type_sizes(file_list):
    c = Counter()
    for f in file_list:
        c.update({splitext(f.name)[1]: f.filesize})
    return c

r = p.rules.new(ListProcessingRule, 'types', type_sizes)

result = p.run('mmfs1')['types']

plt.pie(result.values(), labels=result.keys(), autopct='%1.1f%%')
plt.axis('equal')

plt.show()
Example File Sizes Pie Chart

Plot a Histogram of File Ages

To plot a histogram displaying the distribution of the ages of files on the system using a MapReduceRule and MatPlotLib:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from arcapix.fs.gpfs import ManagementPolicy, MapReduceRule
import matplotlib.pyplot as plt
from datetime import datetime

p = ManagementPolicy()

def file_age(gen):
   return [(datetime.today() - f.creation).days]

p.rules.new(MapReduceRule, listname='age', mapfn=file_age)

h = p.run('mmfs1')['age']

plt.hist(h, max(h), facecolor='green')

plt.xlabel('File Age (days)')
plt.ylabel('Count')
plt.grid(True)
plt.show()
Example File Ages Histogram