Asynchronous Proxy Plugin Walk-Through

In this guide, we’ll walk through developing a PixStor Search plugin when generates proxies using the asynchronous job engine.

Introduction

The plugin we’ll develop will be a proxy plugin, which will generate proxies for HTML files.

A proxy is a sample representation of a file, for example a thumbnail or preview image.

This guide will cover:

  • writing a proxy plugin
  • submitting asynchronous jobs
  • validating the plugin
  • installing and enabling the plugin
  • performing an ingest with the plugin

This is a relatively high level, practical guide. See the Plugin Development Overview for a more detailed description of plugin development.

See also Plugin Development Walk-through for a description of developing a (synchronous) metadata plugin.

Plugin Template

We’ll start with a plugin template.

Create a new file in the user plugins directory

$ mkdir -p /opt/arcapix/usr/share/apsearch/plugins/available/user
$ vim /opt/arcapix/usr/share/apsearch/plugins/available/user/html_preview.py

We’ve put the plugin under available, not enabled - we’re not ready to enable it yet.

The plugin can’t be used in ingests until it is enabled.

Paste the following code into the new file

import os

from arcapix.config import config

from arcapix.search.metadata.helpers import Proxy
from arcapix.search.metadata.plugins.base import PluginStatus, ProxyPlugin


class SampleAsyncPreview(ProxyPlugin):

    def namespace(self):
        """Return the namespace for the metadata for this plugin."""
        return 'sample'

    def is_async(self):
        """Return whether this plugin does any operations asynchronously."""
        return True

    def handles(self, ext=None, mimetype=None):
        """Return whether the plugin can handle a given file, based on extension/mimetype

        :param ext: file extension, including a leading '.'
        :param mimetype: file mime type, e.g. image/png

        :returns: True if the plugin needs to process this file, false otherwise
        """
        return False

    def schema(self):
        """Return the schema for the metadata produced by this plugin.

        All metadata will be validated against this before inserting into the database.
        """
        return []

    def _make_proxy(self, source_path, proxy_size):
        """Private worker function to generate a proxy for a file.

        :param source_path: file to generate proxy for
        :param proxy_size: target size of proxy, tuple of (width, height)

        :return: path to generate proxy
        """
        raise NotImplementedError("TODO")

    def process_async(self, id_, source_path, proxy_size):
        """Make the proxy and add it to the proxy store/db

        This function will be submitted to the async job engine
        """
        proxy_filename = self._make_proxy(source_path, proxy_size)

        try:
            Proxy(id_, self).ingest(proxy_filename, 'preview', 'preview.png', 'image/png')
        finally:
            # clean up any left-over temporary file
            if os.path.exists(proxy_filename):
                os.remove(proxy_filename)

    def process(self, id_, file_, fileinfo=None):
        """Generate a preview for a given file and submit an update.

        :param id_: black box identifier which shall be passed to the metadata update functions
        :param file_: full path to the file
        :param fileinfo: dict of information which may have already been gathered and may be of use

        :returns: PluginStatus indicating success/failure
        """
        try:
            self.logger.info("Submitting async proxy generation job for '%s'", file_)
            self._submit(self.process_async, args=[id_, file_, config['arcapix.search.proxies.preview.size']])
            return PluginStatus.INPROGRESS

        except Exception as e:
            self.logger.error(
                "An exception was raised while processing '%s' (%s) with %s - %s: %s",
                file_, id_, self.__class__.__name__, e.__class__.__name__, e)
            self.logger.debug("Traceback while processing %r (%s)", file_, id_, exc_info=True)
            return PluginStatus.FATAL

Plugin check tool

As we develop this plugin, we’ll use the searchctl plugins check tool to check for issues.

Let’s try it now. To specify our plugin, we give it the full path and the plugin (class) name

For brevity, let’s define an environment variable for the plugin path

$ export PLUGIN_PATH=/opt/arcapix/usr/share/apsearch/plugins/available/user/html_preview.py
$ searchctl plugins check $PLUGIN_PATH::SampleAsyncPreview
...
SamplePreview :: schema
 ❗ plugin doesn't define a schema
plugin
 ❗ plugin doesn't appear to be installed in plugin path

As we can see, it’s detected that the plugin isn’t enabled and installed. Since we’re not ready to enable the plugin yet, we can ignore this for now.

Filling in the template

Now we can start to fill in the plugin template.

First we’ll rename the plugin class to HTMLPreview, and set the namespace to image. Setting the namespace to image indicates that the preview itself is an image (not the source file).

This is an asynchronous plugin, so we don’t need to change the to_async method from True.

class HTMLPreview(ProxyPlugin):

    def namespace(self):
        return 'image'

    def is_async(self):
        return True

    # etc.

(docstrings have been omitted for clarity).

Handles

For handles we want to match files with mime type text/html and extensions .html or .htm

So lets add these to our handles method

EXTENSIONS = ['.html', '.htm']
MIME_TYPES = ['text/html']

class HTMLPreview(ProxyPlugin):

    def handles(self, ext=None, mimetype=None):
        return ext in EXTENSIONS or mimetype in MIME_TYPES

Schema

There is a standard schema for ‘preview’ proxies

def schema(self):
return [{

“name”: “preview”, “prompt”: “HTML preview”, “value”: {

“datatype”: “Proxy”

}

}]

Review

Before we move on to implementing proxy generation, lets review what we have so far.

Here’s the full, updated plugin

import os

from arcapix.config import config

from arcapix.search.metadata.helpers import Proxy
from arcapix.search.metadata.plugins.base import PluginStatus, ProxyPlugin

EXTENSIONS = ['.html', '.htm']
MIME_TYPES = ['text/html']


class HTMLPreview(ProxyPlugin):

    def namespace(self):
        """Return the namespace for the metadata for this plugin."""
        return 'html'

    def is_async(self):
        """Return whether this plugin does any operations asynchronously."""
        return True

    def handles(self, ext=None, mimetype=None):
        """Return whether the plugin can handle a given file, based on extension/mimetype

        :param ext: file extension, including a leading '.'
        :param mimetype: file mime type, e.g. image/png

        :returns: True if the plugin needs to process this file, false otherwise
        """
        return ext in EXTENSIONS or mimetype in MIME_TYPES

    def schema(self):
        """Return the schema for the metadata produced by this plugin.

        All metadata will be validated against this before inserting into the database.
        """
        return [{
            "name": "preview",
            "prompt": "HTML preview",
            "value": {
                "datatype": "Proxy"
            }
        }]

    def _make_proxy(self, source_path, proxy_size):
        """Private worker function to generate a proxy for a file.

        :param source_path: file to generate proxy for
        :param proxy_size: target size of proxy, tuple of (width, height)

        :return: path to generate proxy
        """
        raise NotImplementedError("TODO")

    def process_async(self, id_, source_path, proxy_size):
        """Make the proxy and add it to the proxy store/db

        This function will be submitted to the async job engine
        """
        proxy_filename = self._make_proxy(source_path, proxy_size)

        try:
            Proxy(id_, self).ingest(proxy_filename, 'preview', 'preview.png', 'image/png')
        finally:
            # clean up any left-over temporary file
            if os.path.exists(proxy_filename):
                os.remove(proxy_filename)

    def process(self, id_, file_, fileinfo=None):
        """Generate a preview for a given file and submit an update.

        :param id_: black box identifier which shall be passed to the metadata update functions
        :param file_: full path to the file
        :param fileinfo: dict of information which may have already been gathered and may be of use

        :returns: PluginStatus indicating success/failure
        """
        try:
            self.logger.info("Submitting async proxy generation job for '%s'", file_)
            self._submit(self.process_async, args=[id_, file_, config['arcapix.search.proxies.preview.size']])
            return PluginStatus.INPROGRESS

        except Exception as e:
            self.logger.error(
                "An exception was raised while processing '%s' (%s) with %s - %s: %s",
                file_, id_, self.__class__.__name__, e.__class__.__name__, e)
            self.logger.debug("Traceback while processing %r (%s)", file_, id_, exc_info=True)
            return PluginStatus.FATAL

Let’s check the plugin with searchctl - remember that we renamed the plugin class

$ searchctl plugins check $PLUGIN_PATH::HTMLPreview
...
plugin
❗ plugin doesn't appear to be installed in plugin path

So far, so good; we haven’t broken anything.

Let’s check it against a sample file - here we’ve saved a copy of the example.com web page

$ searchctl plugins check $PLUGIN_PATH::HTMLPreview /mmfs1/data/example.html
WARNING:arcapix.search.metadata.plugins.validation:plugin: plugin doesn't appear to be installed in plugin path
WARNING:arcapix.search.metadata.plugins.validation:Skipping inline check of async process, some plugin issues might be missed.
plugin
❗ plugin doesn't appear to be installed in plugin path

The plugin check tool won’t submit any asynchronous jobs. To properly validate the proxy generation, we can use the --inline flag to force the check tool to perform the proxy generation synchronously.

$ searchctl plugins check $PLUGIN_PATH::HTMLPreview /mmfs1/data/example.html --inline
...
WARNING:arcapix.search.metadata.plugins.validation:Plugin returned PluginStatus.FATAL
ERROR:arcapix.search.plugins.html_preview.HTMLPreview:An exception was raised while processing '/mmfs1/data/google.html' (0) with HTMLPreview - NotImplementedError: TODO
PluginStatus.FATAL
ERROR:arcapix.search.metadata.plugins.validation:An error occured while processing "/mmfs1/data/google.html"
Traceback (most recent call last):
File "/home/coates/development/repos/git/workspace/easysearch/src/arcapix/search/metadata/plugins/validation.py", line 710, in _check_process_no_errors_raised
    raising_process(self.plugin, MetadataId.from_string(0), file_)
File "process", line 12, in process
File "/home/coates/development/repos/git/workspace/easysearch/src/arcapix/search/metadata/utils.py", line 184, in submit_inline
    return async_function(*args)
File "/opt/arcapix/usr/share/apsearch/plugins/available/user/html_preview.py", line 60, in process_async
    proxy_filename = self._make_proxy(source_path, proxy_size)
File "/opt/arcapix/usr/share/apsearch/plugins/available/user/html_preview.py", line 53, in _make_proxy
    raise NotImplementedError("TODO")
NotImplementedError: TODO
ERROR:arcapix.search.metadata.plugins.validation:process: exception NotImplementedError: "TODO" was caught during process
HTMLPreview :: /mmfs1/data/google.html :: process
❌ exception NotImplementedError: "TODO" was caught during process

Okay, now we can write the proxy generation code

Generating proxies

Installing dependencies

To generate proxies, we’re going to use a 3rd party library: imgkit

For this we will need to install wkhtmltopdf. Instructions for installing on PixStor (Centos 7) can be found here. We also need to install the imgkit library in the PixStor Search virtual environment

/usr/share/arcapix/apsearch/bin/pip install imgkit

Proxy generation

Replace the _make_proxy method with the following

# import imgkit

def _make_proxy(self, source_path, proxy_size):
    proxy_filename = self.generate_temp_filename(suffix='.png')

    imgkit.from_file(
        source_path,
        proxy_filename,
        options={
            'width': proxy_size[0],
            'height': proxy_size[1],
            'zoom': '0.5',
        }
    )

    return proxy_filename

The generate_temp_filename method returns a temporary filename within a special temporary work directory in the PixStor Search proxy store. By default this work directory is located at /mmfs1/apsearch/proxies/.proxytmp/ (or under the equivalent filesystem mount point).

Validate the proxy

Let’s run the check tool again

$ searchctl plugins check $PLUGIN_PATH::HTMLPreview /mmfs1/data/example.html --inline
plugin
❗ plugin doesn't appear to be installed in plugin path

Still no issues! (except that the plugin still isn’t installed).

Let’s increase the logging level to notify, so we can see what it’s actually generating

$ APLOGLEVEL=notify searchctl plugins check $PLUGIN_PATH::HTMLPreview /mmfs1/data/example.html --inline
...
NOTIFY:arcapix.search.metadata.plugins.validation:Checking file "/mmfs1/data/example.html"
NOTIFY:arcapix.search.metadata.plugins.validation:File "/mmfs1/data/example.html" with extension ".html" and mimetype "text/html" is handled by HTMLPreview
NOTIFY:arcapix.search.metadata.plugins.validation:Plugin returned <PluginStatus.INPROGRESS: 3>
NOTIFY:arcapix.search.metadata.plugins.validation:Plugin process returned <PluginStatus.INPROGRESS: 3>
NOTIFY:arcapix.search.metadata.plugins.validation:Plugin processing took 0.459143 seconds
NOTIFY:arcapix.search.metadata.plugins.validation:Generated proxies:
[
    {
        "proxy_path": "/mmfs1/apsearch/proxies/.proxytmp/tmpcszkQM.png",
        "mimetype": "image/png",
        "filename": "preview.png",
        "typeidentifier": "preview"
    }
]

Looks reasonable. But what about the actual preview?

By default, the generated proxy will be cleaned-up. To keep it, so that we can validate the generated preview, we can run the check tool with the --keep-proxies flag

$ APLOGLEVEL=notify searchctl plugins check $PLUGIN_PATH::HTMLPreview /mmfs1/data/example.html --inline --keep-proxies
...
NOTIFY:arcapix.search.metadata.plugins.validation:Generated proxies:
[
    {
        "proxy_path": "/mmfs1/apsearch/proxies/.proxytmp/tmpKqt6Nn.png",
        "mimetype": "image/png",
        "filename": "preview.png",
        "typeidentifier": "preview"
    }
]

The output is mostly the same as before (except for a different proxy_path)

We can now find the generated preview image at the reported proxy_path - /mmfs1/apsearch/proxies/.proxytmp/tmpKqt6Nn.png (your path will differ)

_images/html_preview.png

Looks good to me.

Complete Plugin

Here’s the complete plugin, updated to include the proxy generation

import os

import imgkit

from arcapix.config import config

from arcapix.search.metadata.helpers import Proxy
from arcapix.search.metadata.plugins.base import PluginStatus, ProxyPlugin

EXTENSIONS = ['.html', '.htm']
MIME_TYPES = ['text/html']


class HTMLPreview(ProxyPlugin):

    def namespace(self):
        """Return the namespace for the metadata for this plugin."""
        return 'html'

    def is_async(self):
        """Return whether this plugin does any operations asynchronously."""
        return True

    def handles(self, ext=None, mimetype=None):
        """Return whether the plugin can handle a given file, based on extension/mimetype

        :param ext: file extension, including a leading '.'
        :param mimetype: file mime type, e.g. image/png

        :returns: True if the plugin needs to process this file, false otherwise
        """
        return ext in EXTENSIONS or mimetype in MIME_TYPES

    def schema(self):
        """Return the schema for the metadata produced by this plugin.

        All metadata will be validated against this before inserting into the database.
        """
        return [{
            "name": "preview",
            "prompt": "HTML preview",
            "value": {
                "datatype": "Proxy"
            }
        }]

    def _make_proxy(self, source_path, proxy_size):
        """Private worker function to generate a proxy for a file.

        :param source_path: file to generate proxy for
        :param proxy_size: target size of proxy, tuple of (width, height)

        :return: path to generate proxy
        """
        proxy_filename = self.generate_temp_filename(suffix='.png')

        imgkit.from_file(
            source_path,
            proxy_filename,
            options={
                'width': proxy_size[0],
                'height': proxy_size[1],
                'zoom': '0.5',
            }
        )

        return proxy_filename

    def process_async(self, id_, source_path, proxy_size):
        """Make the proxy and add it to the proxy store/db

        This function will be submitted to the async job engine
        """
        proxy_filename = self._make_proxy(source_path, proxy_size)

        try:
            Proxy(id_, self).ingest(proxy_filename, 'preview', 'preview.png', 'image/png')
        finally:
            # clean up any left-over temporary file
            if os.path.exists(proxy_filename):
                os.remove(proxy_filename)

    def process(self, id_, file_, fileinfo=None):
        """Generate a preview for a given file and submit an update.

        :param id_: black box identifier which shall be passed to the metadata update functions
        :param file_: full path to the file
        :param fileinfo: dict of information which may have already been gathered and may be of use

        :returns: PluginStatus indicating success/failure
        """
        try:
            self.logger.info("Submitting async proxy generation job for '%s'", file_)
            self._submit(self.process_async, args=[id_, file_, config['arcapix.search.proxies.preview.size']])
            return PluginStatus.INPROGRESS

        except Exception as e:
            self.logger.error(
                "An exception was raised while processing '%s' (%s) with %s - %s: %s",
                file_, id_, self.__class__.__name__, e.__class__.__name__, e)
            self.logger.debug("Traceback while processing %r (%s)", file_, id_, exc_info=True)
            return PluginStatus.FATAL

Enabling the plugin

To enable a plugin, first we symlink it into the enabled directory

$ mkdir -p /opt/arcapix/usr/share/apsearch/plugins/enabled/user
$ ln -s /opt/arcapix/usr/share/apsearch/plugins/available/user/html_preview.py /opt/arcapix/usr/share/apsearch/plugins/enabled/user

Next, as the plugin check keeps reminding us, we need to restart the Search server to register the plugin

$ systemctl restart apsearch-middleware

Registering the plugin updates the Search database with the plugin’s namespace and schema.

To check if it’s successfully installed, we can list all the installed plugins

$ searchctl plugins list
stat::StatPlugin
gpfs::GPFSPolicyPlugin
html_preview::HTMLPreview    # <--- that's our plugin
image::ImagePluin
...

And one last check - this time, we can use the short plugin name listed above

$ searchctl plugins check html_preview::HTMLPreview
 ✔️ No issues found!

Perfect!

Perform an ingest

Now we can ingest some files using our plugin.

Let’s try ingesting our test file

$ APLOGLEVEL=info searchctl add-file /mmfs1/data/example.html --plugins html_preview::HTMLPreview
...
INFO:arcapix.search.metadata.broker:Starting ingest of '/mmfs1/data/example.html' (3927216287062101581)
INFO:arcapix.search.metadata.broker:/mmfs1/data/example.html (3927216287062101581) default::DefaultPlugin SUCCESS
INFO:arcapix.search.metadata.broker:/mmfs1/data/example.html (3927216287062101581) html_preview::HTMLPreview INPROGRESS
NOTIFY:arcapix.search.metadata.broker:Ingested '/mmfs1/data/example.html' (3927216287062101581) with 2 plugins - 2 plugins were successful, 0 failed

We’re using the --plugins flag so the ingest only uses the html plugin. (default::DefaultPlugin is a special, internal plugin, which is always ingested).

Notice that our preview reports status INPROGRESS, since it has submitted an asynchronous job. We can check the progress of our job with the condor_q command.

Once the async job completes, open up the PixStor Search UI, and verify that the preview image appears in the preview pane for our test file.

Now we want to ingest any other html files on the file system

$ searchctl ingest mmfs1 --include '*.html,*.htm' --plugins html_preview::HTMLPreview

Here, we’re using an --include to only match files with the extensions our plugin handles.

If you have scheduled ingests enabled, and search is in ‘rich’ mode, then the plugin should be used automatically next time the ingest is run.

Summary

In this walk-through we developed a plugin which generates preview images for HTML files.

We used searchctl plugins check to validate our progress as we went along.

Finally, we installed and enabled our plugin, and ingested some files with it.

For a discussion of more advanced topics, including developing plugins for extracting metadata, see Plugin Development Overview and Plugin Development Walk-through

Appendix: Asynchronous jobs

For this walk-through, we glossed over how the asynchronous mechanism works.

If we look at the process method we can see a call to self._submit

def process(self, id_, file_, fileinfo=None):

    try:
        self.logger.info("Submitting async proxy generation job for '%s'", file_)
        self._submit(self.process_async, args=[id_, file_, config['arcapix.search.proxies.preview.size']])
        return PluginStatus.INPROGRESS

    except Exception as e:
        # ...
        return PluginStatus.FATAL

The _submit method, provided by the Plugin baseclass, handles submitting a python function to the job engine - HTCondor

The specific python function we’re submitting for asynchronous processing is process_async, which is, itself, a wrapper around the _make_proxy method we implemented, which performs the necessary database updates for the generate proxy.