Writing Data Plugins

Data plugins are simple, but they do follow a format and expect certain inputs and outputs. The data functions are executed as new data comes in and will be executed over and over again. Unlike, for instance, ingress plugins that are only executed once to set up the flow of data.

The data plugin takes 2 functions, both are coroutines. The first function, prepare takes the name of the pipe and the inbound data as a list. The prepare coroutine function needs to return a list of arrays that can be used by a model.

Prepare Function

This example shows how to prepare data formatted in a python dict for the salt event bus. Remember that Umbra uses pop as the plugin system so the hub needs to be accepted as the first argument.

async def prepare(hub, pipe, inbound):
    '''
    Takes the raw data loaded from a salt event stream, this data is transformed
    into data that can be loaded in to the model and we save the string map dicts
    for future translation
    '''
    ret = []
    d_count = 0
    for event in inbound:
        if not hub.P[pipe]['tmap_populated']:
            for key in event['data']:
                if not isinstance(event['data'][key], str):
                    # TODO: Make this able to handle more than just strings
                    continue
                if key == '_stamp':
                    continue
                hub.P[pipe]['tmap'].append(key)
            hub.P[pipe]['tmap_populated'] = True
        x = [0 for n in hub.P[pipe]['tmap']]
        for key in event['data']:
            if not isinstance(event['data'][key], str):
                # TODO: Be able to handle more than just strings here
                continue
            if key == '_stamp':
                continue
            x[hub.P[pipe]['tmap'].index(key)] = hub.data.init.word_map(
                event['data'][key],
                hub.P[pipe]['words'],
                hub.P[pipe]['r_words'])
        ret.append(x)
    return ret

Since the data preparations need to persist data across runs, this is one of the places in umbra that we need to use the hub which has been given to us by pop.

The hub is a namespaced hierarchy used to store plugin references and variables. The hub makes it easy to persist data in a clean way across the entire application. In this case we use the hub.P dict that has already been prepared for you. Under hub.P there is a dict for the pipe we are running in, and this is the place to store data about this pipe.

With the hub already prepared we can store information that we need in future runs, like the words and reverse words dicts and we use the tmap to line up the correct keys from the events in the correct locations in the returned number array.

Refine Function

After creating the prepare function the refine coroutine is also required. The refine coroutine function is the opposite as the prepare coroutine function. It takes the data emitted by the model and reconstitutes it back into the same type of data that was originally received. This makes it easy to pipeline data in and out.

Here is the refine function that goes along with this prepare function:

async def refine(hub, pipe, data, preds):
    '''
    Take the data from the model and refine it back into salt event format
    '''
    dmap = hub.P[pipe]
    rets = []
    for ind in range(len(preds)):
        if not preds[ind]:
            continue
        ret = {}
        for t_ind in range(len(dmap['tmap'])):
            ret[dmap['tmap'][t_ind]] = dmap['r_words'][data[ind][t_ind]]
        rets.append(ret)
    return rets

The refine function takes the pipe, data and predictions. The predictions are the numbers that map to the dataset. So now we can just go over the dataset, line up the outliers and restore the data to what it was originally.