Policy

PixStor provides two types of Policy:

  • Placement Policy: tells its associated Filesystem where to place new files
  • Management Policy: can be used to move, delete, etc. files in a Filesystem or directory

All policies are made up of Rules.

Description

class arcapix.fs.gpfs.policy.PlacementPolicy(filesystem)

“A placement policy is a set of rules for defining how to handle new files on a filesystem

Parameters:filesystem (Filesystem or str) – The filesystem that the PlacementPolicy applies to
id

Returns the identifier for this policy

Synonym for filesystem

Return type:str
filesystem

Returns the name of the filesystem the policy is applied to

Return type:str
save(testOnly=False, cleanup=True)

Save (apply) the policy to the filesystem (using mmchpolicy)

Parameters:
  • testOnly (bool) – Runs mmchpolicy in test mode (default=False)
  • cleanup (bool) – Remove temporary policy file on completion (default=True)

Warning

In order to save changes to the placement policy, a temporary file needs to be written in your /<filesystem_default_mount>/.policytmp folder.

If you don’t have write permission for this folder, an error will be raised.

You can change the temporary directory to one you do have permission for using filesystem.setTempFilePath

validate()

Checks that the policy is valid.

Note

This won’t necessarily catch all errors that GPFS might thrown.

export_to_file(filename, **kwargs)

Export the policy to a file, e.g for backup purposes.

Parameters:
  • filename (str) – The file to export the policy to.
  • overwrite (bool) – If the specified filename exists, pass overwrite=True else an error will be raised
  • excludeDisabled (bool) – Don’t write disabled (commented out) rules to file (default=False)
  • excludeComments (bool) – Don’t write comments to file (default=False)
import_from_file(filename)

Import the contents of an existing file into the policy.

Overwrites any existing content of the PlacementPolicy object with the contents of the file.

Parameters:filename (str) – The file to import from
name

Returns the name of the policy

Either the filename or the filesystem it’s defined on, depending on the policy type.

Return type:str
rule_tree(raise_unreadable=True)

Recursive iterator over policy rules.

If an Include is present, it will be loaded and it’s rules will be yielded also.

Parameters:raise_unreadable (bool) – if an Include can’t be loaded and this setting is True, the exception will be raised. If False, the Include itself will be yielded instead.

Can be used to create a policy from another with includes resolved recursively - e.g.

>>> for r in p.rule_tree():
...     q.rules.insert(r)
rules

Returns the collection of policy rules

Return type:Rules
sameas(tocompare)

Verifies whether the policy has been modified vs the file given.

Parameters:tocompare – name of policy file to compare this policy object to
Raises:Exception if the file does not exist.
Returns:True if the current policy is the same as the file (NB. The comparison ignores white space etc.
Return type:bool
class arcapix.fs.gpfs.policy.ManagementPolicy(filename=None)

A management policy is a set of rules for manipulating existing files on a filesystem or directory

Parameters:filename (str) – Name of a file from which to an existing policy, or to which you’d like to export the policy.
id

Returns the ‘id’ of the object, which must be unique for that type.

Typically the name or snapshot ID

filename

Name of the file the policy is read from/written to.

Synonym for id

Return type:str
summary(target=None, refresh=False, **kwargs)

The response from running a management policy.

This is parsed into a dictionary summarising the action and applicability of the policy to the target filesystem or directory

This includes information like the number of files chosen by each rule in the policy or how pool occupancy will be affected by the policy.

Parameters:
  • target (str) – filesystem or directory to run policy against
  • refresh (bool) – reload the summary (if the policy has changed since the last call to ‘run’ or ‘summary’ (default=False)
Return type:

dict

stats()

Return profiling stats for list processing rules in the policy.

To generate stats, you need to either enable profiling on one or more of the list processing rules rule.change(profiling=True) or run the policy inside the APProfile context manager

>>> with APProfile():
...     policy.run('mmfs1')
...
>>> policy.stats()

Note - If the policy is run multiple times, this will only return stats for the most recent run.

Returns:dict of {rule.listname: pstats.Stats}
save(**kwargs)

Export the policy object to a (text) file, e.g. for backup purposes

Parameters:
  • filename (str) – Name of the file to save to
  • overwrite (bool) – If filename already exists, specify True to force overwrite else an error will be raised (default=False)
  • excludeDisabled (bool) – Don’t write disabled (commented out) rules to file (default=False)
  • excludeComments (bool) – Don’t write comments to file (default=False)
run(target, **kwargs)

Run the policy on the specified target (using mmapplypolicy)

Parameters:
  • target (str) – target filesystem or directory
  • nodes (list of str) – list of names of nodes to run on (default=``defaultHelperNodes``)
  • snapshot (str) – run the policy against the named snapshot
  • scope (str) – limit the policy scan to one of filesystem, inodespace, fileset
  • inputFileList (str) – run the policy against files and directories listed in the input file
  • action (str) – One of yes, defer, test, prepare (default=``yes``)
  • localWorkDirectory (str) – Specifies the directory to be used for temporary storage (default=’<filesystem_default_mount>/.policytmp’)
  • globalWorkDirectory (str) – Specifies a global directory to be used for temporary storage (must be within the shared filesystem)
  • threadLevel (int) – Number of threads used in policy the execution phase (default=24).
  • dirThreadLevel (int) – Number of threads used during the directory scan phase (default=24).
  • iscanBuckets (int) – number of buckets of inode numbers to be created by the parallel directory scan
  • iscanThreads (int) – number of threads and sort pipelines each node will run during the parallel inode scan (default=2)
  • maxFiles (int) – Specifies how many files are passed for each invocation of an EXEC script (default=8000).
  • asTime (datetime) – evaluate policy rules as if it were running on the date and time specified
  • substitution (dict) – string substitutions that will be made in the text of the policy rules before the rules are interpreted
  • fileListPrefix (str) – directory or filename-prefix for lists generated by external list rules with action prepare or defer
  • fileListPathname (str) – run the policy with files generated by action='prepare'
  • infoLevel (int) – controls policy output level, 0 to 6 (default=1). Higher value = more verbose. Note, output goes to python logging.
  • singleInstance (bool) – Only one policy with the singleInstance option can execute at one time on a filesystem (default=False)
  • reevaluate (bool) – re‐evaluate and revalidate the policy before running. For use with action='prepare' + fileListPathname
  • roundRobin (bool) – dispatch files from inputFilePathname in a round-robin fashion.
  • splitFileListByWeight (bool) – specifies that each of the generated file lists contain elements with the same WEIGHT value
  • choiceAlgorithm (str) – algorithm for candidate selection. One of best, exact, fast
  • sortBufferSize (str) – buffer size that is passed to the sort command (default=8%)
  • sortCommand (str) – fully‐qualified path name for a Posix‐compliant sort command to be used instead of the os default
  • splitMargin (float) – percentage the fast‐choice algorithm is allowed to deviate from LIMIT and THRESHOLD targets (default=0.2)
  • maxMergeFiles (int) – maximum number of files to be passed as input to the sort command (default=12)
  • maxSortBytes (str) – maximum number of bytes to be passed as input files to the sort command (default=411 MB)
  • otherSortOptions (str) – Options to pass through to the sort command
  • qosClass (str) – Quality of Service class the policy scan is assigned to. One of maintenance, other
  • maxDepth (int) – how deeply in the directory hierarchy to apply the policy scan
  • cleanup (bool) – remove any temporary policy file after completion (default=True)
  • output_processor (callable) – function to process lines of output (stdout/stderr) generated by ExternalListRule scripts and list processing rule functions. See Output Processing for more info.

Warning

If you haven’t saved your policy to a file, it will be saved to a temporary file in your /<filesystem_default_mount>/.policytmp folder.

If you don’t have write permission for this folder, an error will be raised.

In this case you should call save() before calling run().

Alternatively, you can change the temporary directory to one you do have permission for using filesystem.setTempFilePath

name

Returns the name of the policy

Either the filename or the filesystem it’s defined on, depending on the policy type.

Return type:str
rule_tree(raise_unreadable=True)

Recursive iterator over policy rules.

If an Include is present, it will be loaded and it’s rules will be yielded also.

Parameters:raise_unreadable (bool) – if an Include can’t be loaded and this setting is True, the exception will be raised. If False, the Include itself will be yielded instead.

Can be used to create a policy from another with includes resolved recursively - e.g.

>>> for r in p.rule_tree():
...     q.rules.insert(r)
rules

Returns the collection of policy rules

Return type:Rules
sameas(tocompare)

Verifies whether the policy has been modified vs the file given.

Parameters:tocompare – name of policy file to compare this policy object to
Raises:Exception if the file does not exist.
Returns:True if the current policy is the same as the file (NB. The comparison ignores white space etc.
Return type:bool
validate()

Checks that the policy is valid.

Note

This won’t necessarily catch all errors that GPFS might throw.

Output Processing

When a management policy is run, which includes any ExternalListRule or list processing rules, and output generated to stdout or stderr - such as print statements or logging - will be captured.

An output processor can be passed to ManagementPolicy.run to process lines of output as they’re generated.

Note

This output only includes sub-process messages, not the full mmapplypolicy output.

The payload passed to the processor includes fields:

  • type: the type of payload - either message or traceback
  • identifier: a numerical identifier used to identify which node generated the message - e.g. <1>
  • node: the actual name of the node which generate the message
  • message: the message extracted from the output line(s)
  • raw: the raw output line(s), including identifier and trailing newline

Note - the local node (from which the policy was started) will be identified as "node": "localhost"

Anything returned by the processor function will be ignored.

def printer(payload):
    print("{node}: {message}".format(**payload))

policy.run('mmfs1', output_processor=printer)

Limitations:

  • stdout and stderr gets merged - the source of a given messages can’t be determined.
  • it’s not possible to distinguish between messages generated by separate threads (threadLevel) on the same node.
  • if there are multiple rules in the policy, it’s not possible to determine which rule generated a given message.

By default, if no processor is specified, the output will be logged at DEBUG level.

If you’re not interested in the output at all, a ‘null’ processor can be used

def ignore(payload):
    pass

p.run('mmfs1', output_processor=ignore)

Examples

Defining a default Placement Policy

>>> from arcapix.fs.gpfs import Filesystem, SetPoolRule
>>>
>>> # Get the placement policy object for Filesystem 'mmfs1'
... mypolicy = Filesystem('mmfs1').policy
>>>
>>> # Create a new 'Set Pool' rule
... mypolicy.rules.new(SetPoolRule, name='default', target='system')
>>>
>>> # Validate
... mypolicy.validate()
>>>
>>> # Apply policy to the filesystem
... mypolicy.save()

Load the running Placement Policy and export to file

>>> from arcapix.fs.gpfs import Filesystem
>>>
>>> # Get the placement policy object for Filesystem 'mmfs1'
... mypolicy = Filesystem('mmfs1').policy
>>>
>>> # Print default placement rule string
... print(mypolicy.rules['default'].toGpfsString())

"RULE 'default' SET POOL 'system'"

>>>
>>> # Export policy to file
... mypolicy.export_to_file('mmfs1-running.pol')

Creating and run a Management Policy to delete temporary files

>>> from arcapix.fs.gpfs import Cluster, ManagementPolicy, Criteria, DeleteRule
>>>
>>> # Create a management policy object
... mypolicy = ManagementPolicy()
>>>
>>> # Create a new 'Delete' rule
... r = mypolicy.rules.new(DeleteRule, source='pool_1')
>>>
>>> # Add criteria that select temporary files
... r.criteria.new(Criteria.like('name', '*.tmp'))
>>>
>>> # Get target filesystem mmfs1
... mmfs1 = Cluster().filesystems['mmfs1']
>>>
>>> # Run the policy on the filesystem
... Cluster().runPolicy(mypolicy, mmfs1)

Load and modify an existing Management Policy file

>>> from arcapix.fs.gpfs import ManagementPolicy, Criteria, ExcludeRule
>>>
>>> # Load a management policy object from file
... mypolicy = ManagementPolicy('tidyup.pol')
>>>
>>> # Create a new 'Exclude' rule
... r = mypolicy.rules.new(ExcludeRule)
>>>
>>> # Add criteria to select files less than 30 days old for exclusion
... r.criteria.new(Criteria.lt('creation', 30))
>>>
>>> # Save changes to file
... mypolicy.save()

Resolve Includes

>>> from arcapix.fs.gpfs import ManagementPolicy
>>>
>>> # load an existing policy with includes
... p = ManagementPolicy('example.pol')
>>>
>>> # create a new policy
... q = ManagementPolicy()
>>>
>>> # iterate over rules in existing policy
... # including rule in includes (if possible)
... for r in p.rule_tree(raise_unreadable=False):
...     # and add them to the new policy
...     q.rules.insert(r)
...
>>> # save the new policy
... q.save(filename='example-inc.pol')

Capture MapReduceRule Tracebacks

>>> # define an output processor to collect tracebacks
>>>
>>> # we use a 'set' to only collect unique tracebacks
>>> tracebacks = set()
>>>
>>> def collector(payload):
...     if payload['type'] == 'traceback':
...         tracebacks.add(payload['message'])
...
>>> # try to run the policy
>>> try:
...     res = policy.run('mmfs1', output_processor=collector)
... except Exception:
...     # if policy fails, print the captured tracebacks
...     for tb in tracebacks:
...         print(tb)
...     raise
...