Policy¶
PixStor provides two types of Policy:
- Placement Policy: tells its associated
Filesystem
where to place new files- Management Policy: can be used to move, delete, etc. files in a
Filesystem
or directory
All policies are made up of Rules
.
Description¶
-
class
arcapix.fs.gpfs.policy.
PlacementPolicy
(filesystem)¶ “A placement policy is a set of rules for defining how to handle new files on a filesystem
Parameters: filesystem (Filesystem or str) – The filesystem that the PlacementPolicy applies to -
save
(testOnly=False, cleanup=True)¶ Save (apply) the policy to the filesystem (using mmchpolicy)
Parameters: Warning
In order to save changes to the placement policy, a temporary file needs to be written in your
/<filesystem_default_mount>/.policytmp
folder.If you don’t have write permission for this folder, an error will be raised.
You can change the temporary directory to one you do have permission for using
filesystem.setTempFilePath
-
validate
()¶ Checks that the policy is valid.
Note
This won’t necessarily catch all errors that GPFS might thrown.
-
export_to_file
(filename, **kwargs)¶ Export the policy to a file, e.g for backup purposes.
Parameters: - filename (str) – The file to export the policy to.
- overwrite (bool) – If the specified filename exists, pass overwrite=True else an error will be raised
- excludeDisabled (bool) – Don’t write disabled (commented out) rules to file (default=False)
- excludeComments (bool) – Don’t write comments to file (default=False)
-
import_from_file
(filename)¶ Import the contents of an existing file into the policy.
Overwrites any existing content of the PlacementPolicy object with the contents of the file.
Parameters: filename (str) – The file to import from
-
name
¶ Returns the name of the policy
Either the filename or the filesystem it’s defined on, depending on the policy type.
Return type: str
-
rule_tree
(raise_unreadable=True)¶ Recursive iterator over policy rules.
If an Include is present, it will be loaded and it’s rules will be yielded also.
Parameters: raise_unreadable (bool) – if an Include can’t be loaded and this setting is True, the exception will be raised. If False, the Include itself will be yielded instead. Can be used to create a policy from another with includes resolved recursively - e.g.
>>> for r in p.rule_tree(): ... q.rules.insert(r)
-
sameas
(tocompare)¶ Verifies whether the policy has been modified vs the file given.
Parameters: tocompare – name of policy file to compare this policy object to Raises: Exception if the file does not exist. Returns: True if the current policy is the same as the file (NB. The comparison ignores white space etc. Return type: bool
-
-
class
arcapix.fs.gpfs.policy.
ManagementPolicy
(filename=None)¶ A management policy is a set of rules for manipulating existing files on a filesystem or directory
Parameters: filename (str) – Name of a file from which to an existing policy, or to which you’d like to export the policy. -
id
¶ Returns the ‘id’ of the object, which must be unique for that type.
Typically the name or snapshot ID
-
summary
(target=None, refresh=False, **kwargs)¶ The response from running a management policy.
This is parsed into a dictionary summarising the action and applicability of the policy to the target filesystem or directory
This includes information like the number of files chosen by each rule in the policy or how pool occupancy will be affected by the policy.
Parameters: Return type:
-
stats
()¶ Return profiling stats for list processing rules in the policy.
To generate stats, you need to either enable profiling on one or more of the list processing rules
rule.change(profiling=True)
or run the policy inside theAPProfile
context manager>>> with APProfile(): ... policy.run('mmfs1') ... >>> policy.stats()
Note - If the policy is run multiple times, this will only return stats for the most recent run.
Returns: dict of {rule.listname: pstats.Stats
}
-
save
(**kwargs)¶ Export the policy object to a (text) file, e.g. for backup purposes
Parameters: - filename (str) – Name of the file to save to
- overwrite (bool) – If filename already exists, specify True to force overwrite else an error will be raised (default=False)
- excludeDisabled (bool) – Don’t write disabled (commented out) rules to file (default=False)
- excludeComments (bool) – Don’t write comments to file (default=False)
-
run
(target, **kwargs)¶ Run the policy on the specified target (using mmapplypolicy)
Parameters: - target (str) – target filesystem or directory
- nodes (list of str) – list of names of nodes to run on (default=``defaultHelperNodes``)
- snapshot (str) – run the policy against the named snapshot
- scope (str) – limit the policy scan to one of
filesystem
,inodespace
,fileset
- inputFileList (str) – run the policy against files and directories listed in the input file
- action (str) – One of
yes
,defer
,test
,prepare
(default=``yes``) - localWorkDirectory (str) – Specifies the directory to be used for temporary storage (default=’<filesystem_default_mount>/.policytmp’)
- globalWorkDirectory (str) – Specifies a global directory to be used for temporary storage (must be within the shared filesystem)
- threadLevel (int) – Number of threads used in policy the execution phase (default=24).
- dirThreadLevel (int) – Number of threads used during the directory scan phase (default=24).
- iscanBuckets (int) – number of buckets of inode numbers to be created by the parallel directory scan
- iscanThreads (int) – number of threads and sort pipelines each node will run during the parallel inode scan (default=2)
- maxFiles (int) – Specifies how many files are passed for each invocation of an EXEC script (default=8000).
- asTime (datetime) – evaluate policy rules as if it were running on the date and time specified
- substitution (dict) – string substitutions that will be made in the text of the policy rules before the rules are interpreted
- fileListPrefix (str) – directory or filename-prefix for lists generated by external list rules with action
prepare
ordefer
- fileListPathname (str) – run the policy with files generated by
action='prepare'
- infoLevel (int) – controls policy output level, 0 to 6 (default=1). Higher value = more verbose. Note, output goes to python logging.
- singleInstance (bool) – Only one policy with the singleInstance option can execute at one time on a filesystem (default=False)
- reevaluate (bool) – re‐evaluate and revalidate the policy before running. For use with
action='prepare'
+fileListPathname
- roundRobin (bool) – dispatch files from
inputFilePathname
in a round-robin fashion. - splitFileListByWeight (bool) – specifies that each of the generated file lists contain elements with the same WEIGHT value
- choiceAlgorithm (str) – algorithm for candidate selection. One of
best
,exact
,fast
- sortBufferSize (str) – buffer size that is passed to the sort command (default=8%)
- sortCommand (str) – fully‐qualified path name for a Posix‐compliant sort command to be used instead of the os default
- splitMargin (float) – percentage the fast‐choice algorithm is allowed to deviate from LIMIT and THRESHOLD targets (default=0.2)
- maxMergeFiles (int) – maximum number of files to be passed as input to the sort command (default=12)
- maxSortBytes (str) – maximum number of bytes to be passed as input files to the sort command (default=411 MB)
- otherSortOptions (str) – Options to pass through to the sort command
- qosClass (str) – Quality of Service class the policy scan is assigned to. One of
maintenance
,other
- maxDepth (int) – how deeply in the directory hierarchy to apply the policy scan
- cleanup (bool) – remove any temporary policy file after completion (default=True)
- output_processor (callable) – function to process lines of output (stdout/stderr) generated by ExternalListRule scripts and list processing rule functions. See Output Processing for more info.
Warning
If you haven’t saved your policy to a file, it will be saved to a temporary file in your
/<filesystem_default_mount>/.policytmp
folder.If you don’t have write permission for this folder, an error will be raised.
In this case you should call
save()
before callingrun()
.Alternatively, you can change the temporary directory to one you do have permission for using
filesystem.setTempFilePath
-
name
¶ Returns the name of the policy
Either the filename or the filesystem it’s defined on, depending on the policy type.
Return type: str
-
rule_tree
(raise_unreadable=True)¶ Recursive iterator over policy rules.
If an Include is present, it will be loaded and it’s rules will be yielded also.
Parameters: raise_unreadable (bool) – if an Include can’t be loaded and this setting is True, the exception will be raised. If False, the Include itself will be yielded instead. Can be used to create a policy from another with includes resolved recursively - e.g.
>>> for r in p.rule_tree(): ... q.rules.insert(r)
-
sameas
(tocompare)¶ Verifies whether the policy has been modified vs the file given.
Parameters: tocompare – name of policy file to compare this policy object to Raises: Exception if the file does not exist. Returns: True if the current policy is the same as the file (NB. The comparison ignores white space etc. Return type: bool
-
validate
()¶ Checks that the policy is valid.
Note
This won’t necessarily catch all errors that GPFS might throw.
-
Output Processing¶
When a management policy is run, which includes any ExternalListRule
or list processing rules,
and output generated to stdout or stderr - such as print statements or logging - will be captured.
An output processor can be passed to ManagementPolicy.run
to process lines of output as they’re generated.
Note
This output only includes sub-process messages, not the full mmapplypolicy
output.
The payload passed to the processor includes fields:
type
: the type of payload - eithermessage
ortraceback
identifier
: a numerical identifier used to identify which node generated the message - e.g.<1>
node
: the actual name of the node which generate the messagemessage
: the message extracted from the output line(s)raw
: the raw output line(s), including identifier and trailing newline
Note - the local node (from which the policy was started) will be identified as "node": "localhost"
Anything returned by the processor function will be ignored.
def printer(payload):
print("{node}: {message}".format(**payload))
policy.run('mmfs1', output_processor=printer)
Limitations:
- stdout and stderr gets merged - the source of a given messages can’t be determined.
- it’s not possible to distinguish between messages generated by separate threads (
threadLevel
) on the same node.- if there are multiple rules in the policy, it’s not possible to determine which rule generated a given message.
By default, if no processor is specified, the output will be logged at DEBUG
level.
If you’re not interested in the output at all, a ‘null’ processor can be used
def ignore(payload):
pass
p.run('mmfs1', output_processor=ignore)
Examples¶
Defining a default Placement Policy¶
>>> from arcapix.fs.gpfs import Filesystem, SetPoolRule
>>>
>>> # Get the placement policy object for Filesystem 'mmfs1'
... mypolicy = Filesystem('mmfs1').policy
>>>
>>> # Create a new 'Set Pool' rule
... mypolicy.rules.new(SetPoolRule, name='default', target='system')
>>>
>>> # Validate
... mypolicy.validate()
>>>
>>> # Apply policy to the filesystem
... mypolicy.save()
Load the running Placement Policy and export to file¶
>>> from arcapix.fs.gpfs import Filesystem
>>>
>>> # Get the placement policy object for Filesystem 'mmfs1'
... mypolicy = Filesystem('mmfs1').policy
>>>
>>> # Print default placement rule string
... print(mypolicy.rules['default'].toGpfsString())
"RULE 'default' SET POOL 'system'"
>>>
>>> # Export policy to file
... mypolicy.export_to_file('mmfs1-running.pol')
Creating and run a Management Policy to delete temporary files¶
>>> from arcapix.fs.gpfs import Cluster, ManagementPolicy, Criteria, DeleteRule
>>>
>>> # Create a management policy object
... mypolicy = ManagementPolicy()
>>>
>>> # Create a new 'Delete' rule
... r = mypolicy.rules.new(DeleteRule, source='pool_1')
>>>
>>> # Add criteria that select temporary files
... r.criteria.new(Criteria.like('name', '*.tmp'))
>>>
>>> # Get target filesystem mmfs1
... mmfs1 = Cluster().filesystems['mmfs1']
>>>
>>> # Run the policy on the filesystem
... Cluster().runPolicy(mypolicy, mmfs1)
Load and modify an existing Management Policy file¶
>>> from arcapix.fs.gpfs import ManagementPolicy, Criteria, ExcludeRule
>>>
>>> # Load a management policy object from file
... mypolicy = ManagementPolicy('tidyup.pol')
>>>
>>> # Create a new 'Exclude' rule
... r = mypolicy.rules.new(ExcludeRule)
>>>
>>> # Add criteria to select files less than 30 days old for exclusion
... r.criteria.new(Criteria.lt('creation', 30))
>>>
>>> # Save changes to file
... mypolicy.save()
Resolve Includes¶
>>> from arcapix.fs.gpfs import ManagementPolicy
>>>
>>> # load an existing policy with includes
... p = ManagementPolicy('example.pol')
>>>
>>> # create a new policy
... q = ManagementPolicy()
>>>
>>> # iterate over rules in existing policy
... # including rule in includes (if possible)
... for r in p.rule_tree(raise_unreadable=False):
... # and add them to the new policy
... q.rules.insert(r)
...
>>> # save the new policy
... q.save(filename='example-inc.pol')
Capture MapReduceRule Tracebacks¶
>>> # define an output processor to collect tracebacks
>>>
>>> # we use a 'set' to only collect unique tracebacks
>>> tracebacks = set()
>>>
>>> def collector(payload):
... if payload['type'] == 'traceback':
... tracebacks.add(payload['message'])
...
>>> # try to run the policy
>>> try:
... res = policy.run('mmfs1', output_processor=collector)
... except Exception:
... # if policy fails, print the captured tracebacks
... for tb in tracebacks:
... print(tb)
... raise
...