User Guide

Introduction

The Slurry microframework is a foundation for building reactive data processing applications. Slurry provides building blocks that allow you to take one or more asynchronous event-based data sources, and process those sources using a variety of configurable components, called sections, and feeding the processed data to one or more consumers. One of the key features of Slurry, is that processing occurs asynchronously in a task based fashion. This allows advanced time-based components to be build, like for example sections that group input based on timers.

Slurry is inspired by a mix of different ideas from programming paradigms such as functional programming, IoT data processing frameworks like Node-RED, and graphical data science frameworks like KNIME and Alteryx. From the python world, Slurry takes inspiration from iteration libraries like the built-in module itertools. When combined with asynchronous features, those libraries has lead to more advanced tools like aiostream, eventkit and asyncitertools. Especially aiostream deserves credit as an inspiration for a lot of the basic stream processing sections, that Slurry provides out of the box.

Slurry is build on top of the Trio asynchronous concurrency library. Most data processing components are expected to use the new async/await features of python 3.5, and coroutines run inside a Trio task. However, functionality is provided that allows synchronous blocking operations to occur, either in threads or in separate processes (with some standard limitations).

Slurry does not try to extend the python syntax by overloading operators. Everything is plain python with async/await syntax.

Oh, one final thing.. Maybe you are wondering - ‘Why Slurry?’ Well, wikipedia says - “A slurry is a mixture of solids denser than water suspended in liquid, usually water”, and if you think about it, data is kind of like solids of different shapes and sizes, and it flows through pipelines, kind of like a slurry, so there we go.

Pipelines

The Pipeline class is a composable stream processor. It consists of a chain of PipelineSection compatible objects, which each handle a single stream processing operation. Slurry contains a helper function called weld that takes care of composing (welding) a pipeline out of individual sections. A PipelineSection is any object that is valid input to the weld function. This currently includes the following types:

AsyncIterables

Async iterables are valid only as the very first PipelineSection. Subsequent sections will use this async iterable as input source. Placing an AsyncIterable into the middle of a sequence of pipeline sections, will cause a ValueError.

Sections

Any Section abc subclass is a valid PipelineSection, at any position in the pipeline.

Tuples

Pipeline sections can be nested to any level by supplying a Tuple[PipelineSection, ...] containing one or more pipeline section-compatible objects. Output from upstream sections are automatically used as input to the nested sequence of pipeline sections.

Note

The weld function is part of the developer api. See slurry.sections.weld.weld() for more information.

The stream processing results are accessed by calling Pipeline.tap() to create an output channel. Each pipeline can have multiple open taps, each receiving a copy of the output stream.

The pipeline can also be extended dynamically with new pipeline sections with Pipeline.extend(), adding additional processing.

class slurry.Pipeline(*sections: Section | Tuple[PipelineSection, ...], nursery: Nursery, enabled: Event)

The main Slurry Pipeline class.

Note

Do not instantiate a Pipeline class manually. Use create() instead. It returns an async context manager which manages the pipeline lifetime.

Fields:

  • sections: The sequence of pipeline sections contained in the pipeline.

  • nursery: The trio.Nursery that is executing the pipeline.

classmethod create(*sections: Section | Tuple[PipelineSection, ...]) AsyncGenerator[Pipeline, None]

Creates a new pipeline context and adds the given section sequence to it.

Parameters:

*sections (PipelineSection) – One or more PipelineSection compatible objects.

extend(*sections: Section | Tuple[PipelineSection, ...], start: bool = False) Pipeline

Extend this pipeline into a new pipeline.

An extension will add a tap to the existing pipeline and use this tap as input to the newly added pipeline.

Extensions can be added dynamically during runtime. The data feed will start at the current position. Old events won’t be replayed.

Parameters:
  • *sections (PipelineSection) – One or more pipeline sections.

  • start (bool) – Start processing when adding this extension. (default: False)

tap(*, max_buffer_size: int = 0, timeout: float = inf, retrys: int = 0, start: bool = True) MemoryReceiveChannel[Any]

Create a new output channel for this pipeline.

Multiple channels can be opened and will receive a copy of the output data.

If all open taps are closed, the immidiate upstream section or iterable will be closed as well, and no further items can be sent, from that point on.

Note

The output is sent by reference, so if the output is a mutable type and a consumer changes it, other consumers will see the changed output.

Parameters:
  • max_buffer_size (int) – Although not recommended in general, it is possible to set a buffer on the output channel. (default 0) See Buffering in channels for further advice.

  • timeout (float) – Timeout in seconds when attempting to send an item. (default math.inf)

  • retrys (int) – Number of times to retry sending, if the initial attempt fails. (default 0)

  • start (bool) – Start processesing when opening this tap. (default True)

Returns:

A trio MemoryReceiveChannel from which pipeline output can be pulled.

Sections

Sections are individual processing steps that can be applied to an asynchronous stream of data. Sections receive input from the previous section, or from an asynchronous iterable, processes it and sends it to the section output. To use Slurry, all the user has to do is to configure these sections, and decide on the ordering of each step in the pipeline. The Pipeline takes care of wiring together sections, so the output gets routed to the input of the subsequent section.

Behind the scenes, data is send between sections using message passing via trio memory channels. Each section is executed as a Trio task and, from the user perspective, are in principle completely non-blocking and independent of each other.

Slurry includes a library of ready made sections, with functionality inspired by other reactive frameworks. They are documented below. For more information about sections, and how to build your own sections, read the Developer Guide.

Most ready made sections support an optional source parameter. In most cases this is semantically identical to using that source as an async interable input to the pipeline, however using the source parameter instead, may sometimes be more readable.

Some sections, like slurry.sections.Zip, support multiple inputs, which must be supplied as parameters.

Transforming input

Sections for transforming an input into a different output.

class slurry.sections.Map(func, source: AsyncIterable[Any] | None = None)

Maps over an asynchronous sequence.

Map can be used as a starting section, if a source is provided.

Parameters:
  • func (Callable[[Any], Any]) – Mapping function.

  • source (Optional[AsyncIterable[Any]]) – Source if used as a starting section.

Note

Although individual sections can be thought of as running independently, this is not a guarantee. Slurry may now, or at any later time, chose to apply certain optimizations, like merging a sequence of strictly functional operations like slurry.sections.Map into a single operation.

Filtering input

Pipeline sections that filters the incoming items.

class slurry.sections.Skip(count: int, source: AsyncIterable[Any] | None = None)

Skips the first count items in an asynchronous sequence.

Skip can be used as a starting section if a source is given.

Parameters:
  • count (int) – Number of items to skip

  • source (Optional[AsyncIterable[Any]]) – Input source if starting section.

class slurry.sections.SkipWhile(pred, source: AsyncIterable[Any] | None = None)

Skips items until a predicate function evaluates to false, after which all subsequent items are passed.

The predicate function must take an item. If the return value evaluates as true, the item is skipped. Otherwise the item is passed and the predicate function is ignored from then on.

SkipWhile can be used as a starting section if a source is given.

Parameters:
  • pred (Callable[[Any], bool]) – Predicate function.

  • source (Optional[AsyncIterable[Any]]) – Input source if starting section.

class slurry.sections.Filter(func, source: AsyncIterable[Any] | None = None)

Outputs items that passes a filter function.

The filter function must take an item. If the return value evaluates as true, the item is sent, otherwise the item is discarded.

Filter can be used as a starting section, if a source is provided.

Parameters:
  • func (Callable[[Any], bool]) – Matching function.

  • source (Optional[AsyncIterable[Any]]) – Source if used as a starting section.

class slurry.sections.Changes(source: AsyncIterable[Any] | None = None)

Outputs items that are different from the last item output.

The generator stores a reference to the last outputted item. Whenever a new item arrives, it is compared to the last outputted item. If they are equal, the new item is discarded. If not, the new item is output and becomes the new reference. The first item received is always output.

Changes can be used as a starting section, if a source is provided.

Note

Items are compared using the != operator.

Parameters:

source (Optional[AsyncIterable[Any]]) – Source if used as a starting section.

class slurry.sections.RateLimit(interval, source: AsyncIterable[Any] | None = None, *, subject: Hashable | Callable[[Any], Hashable] | None = None)

Limits data rate of an input to a certain interval.

The first item received is transmitted and triggers a timer. Any other items received while the timer is active are discarded. After the timer runs out, the cycle can repeat.

Per subject rate limiting is supported by supplying either a hashable value or a callable as subject. In case of a hashable value, each received item is assumed to be a mapping, with the subject indicating the key containing the value that should be used for rate limiting. If a callable is supplied, it will be called with the item as argument and should return a hashable value.

Parameters:
  • interval (float) – Minimum number of seconds between each sent item.

  • source (Optional[AsyncIterable[Any]]) – Input when used as first section.

  • subject (Optional[]) – Subject for per subject rate limiting.

Buffering input

Pipeline sections with age- and volume-based buffers.

class slurry.sections.Window(max_size: int, source: AsyncIterable[Any] | None = None, *, max_age: float = inf, min_size: int = 1)

Window buffer with size and age limits.

Window iterates an asynchronous sequence and stores each received item in a buffer. Each time another item is received, the buffer is filtered by dumping the oldest items first, until the configured window conditions for the buffer size and item age are satisfied. After filtering, the whole buffer is output as a tuple, with the oldest item first, and the newest item last.

Note

Items are added to the right side and are removed from the left side of the buffer.

All items remain in the buffer, unless they are removed by one of the window conditions and any item can be output more than once.

Parameters:
  • max_size (int) – The maximum buffer size.

  • source (Optional[AsyncIterable[Any]]) – Input when used as first section.

  • max_age (float) – Maximum item age in seconds. (default: unlimited)

  • min_size (int) – Minimum amount of items in the buffer to trigger an output.

class slurry.sections.Group(interval: float, source: AsyncIterable[Any] | None = None, *, max_size: float = inf, mapper: Callable[[Any], Any] | None = None, reducer: Callable[[Sequence[Any]], Any] | None = None)

Groups received items by time based interval.

Group awaits an item to arrive from source, adds it to a buffer and sets a timer based on the interval parameter. While the timer is active, additional items received are added to the buffer. When the timer runs out, or if the buffer size equals max_size, the buffer is sent down the pipeline and a new empty buffer is created.

Note

The buffer is not sent at regular intervals. The timer is triggered when an item is is received into an empty buffer.

An output buffer will always contain at least one item.

The items in the buffer can optionally be mapped over, by supplying a mapper function and be reduced to a single value, by supplying a reducer function.

Parameters:
  • interval (float) – Time in seconds from when an item arrives until the buffer is sent.

  • source (Optional[AsyncIterable[Any]]) – Input when used as first section.

  • max_size (int) – Maximum number of items in buffer, which when reached, will cause the buffer to be sent.

  • mapper (Optional[Callable[[Any], Any]]) – Optional mapping function used to transform each received item.

  • reducer (Optional[Callable[[Sequence[Any]], Any]]) – Optional reducer function used to transform the buffer to a single value.

class slurry.sections.Delay(interval: float, source: AsyncIterable[Any] | None = None)

Delays transmission of each item received by an interval.

Received items are temporarily stored in an unbounded queue, along with a timestamp, using a background task. The foreground task takes items from the queue, and waits until the item is older than the given interval and then transmits it.

Parameters:
  • interval (float) – Number of seconds that each item is delayed.

  • source (Optional[AsyncIterable[Any]]) – Input when used as first section.

Generating new output

Pipeline sections that produce data streams.

class slurry.sections.Repeat(interval: float, *args, **kwargs)

Yields a single item repeatedly at regular intervals.

If used as a middle section, the input can be used to set the value that is sent. When an input is received, it is sent immidiately, and the internal timer resets.

When an input is used, closure of the input stream will cause the repeater to close as well.

Parameters:
  • interval (float) – Delay between each transmission.

  • default (Any) – Default item to send. If not supplied, will wait for an input.

Raises:

RuntimeError – If used as a first section and no default is provided.

class slurry.sections.Metronome(interval: float, *args, **kwargs)

Yields an item repeatedly at wall clock intervals.

If used as a middle section, the input can be used to set the value that is sent. When an input is received, it is stored and send at the next tick of the clock. If multiple inputs are received during a tick, only the latest is sent. The preceding inputs are dropped.

When an input is used, closure of the input stream will cause the metronome to close as well.

Parameters:
  • interval (float) – Wall clock delay between each transmission.

  • default (Any) – Default item to send.

Raises:

RuntimeError – If used as a first section and no default is provided.

class slurry.sections.InsertValue(value: Any)

Inserts a single user supplied value into the pipeline on startup and then passes through any further received items unmodified.

If no input is used, the single value will be sent, and InsertValue will close.

Parameters:

value (Any) – Item to send on startup.

Combining multiple inputs

Pipeline sections for combining multiple inputs into a single output.

class slurry.sections.Chain(*sources: Section | Tuple[PipelineSection, ...], place_input: str = 'first')

Chains input from one or more sources. Any valid PipelineSection is an allowed source.

Outputs items from each source in turn, until it is exhausted. If a source never reaches the end, remaining sources will not be iterated.

Chain can be placed as a middle section and will chain the input of the previous section.

Note

By default, the input is added as the first source. If the input is added last instead of first, it will cause backpressure to be applied upstream.

Parameters:
  • *sources (PipelineSection) – One or more PipelineSection that will be chained together.

  • place_input (string) – Iteration priority of the pipeline input source. Options: 'first' (default) | 'last'.

class slurry.sections.Merge(*sources: Section | Tuple[PipelineSection, ...])

Merges input from multiple sources. Any valid PipelineSection is an allowed source.

Sources are iterated in parallel and items are send from each source, as soon as they become available.

If Merge is used as a middle section, the input will be added to the sources.

Sources can be pipeline sections, which will be treated as first sections, with no input. Merge will take care of running the pump task for these sections.

Parameters:

*sources (PipelineSection) – One or more async iterables or sections whose contents will be merged.

class slurry.sections.Zip(*sources: Section | Tuple[PipelineSection, ...], place_input: str = 'first')

Zips the input from multiple sources. Any valid PipelineSection is an allowed source.

Sources are iterated in parallel and as soon as all sources have an item available, those items are output as a tuple.

Zip can be used as a middle section, and the pipeline input will be added to the sources.

Note

If sources are out of sync, the fastest source will have to wait for the slowest, which will cause backpressure.

Parameters:
  • *sources (PipelineSection) – One or more PipelineSection, whose contents will be zipped.

  • place_input (string) – Position of the pipeline input source in the output tuple. Options: 'first' (default) | 'last'.

class slurry.sections.ZipLatest(*sources: Section | Tuple[PipelineSection, ...], partial=True, default=None, monitor=(), place_input='first', monitor_input=False)

Zips input from multiple sources and outputs a result on every received item. Any valid PipelineSection is an allowed source.

Sources are iterated in parallel and a tuple is output each time a result is ready on any source. The tuple values will be the last received value from each source.

Using the monitor argument, one or more asynchronous sequences can be added with the property that they will not trigger an output by themselves. Their latest value will be stored and added to the output value, but will only be output if a new item arrives at one of the main sources.

ZipLatest can be used as a middle section, in which case the upstream pipeline is added as an input.

Note

If any single source is exhausted, all remaining sources will be forcibly closed, and the pipeline will stop.

Parameters:
  • *sources (PipelineSection) – One or more PipelineSection that will be zipped together.

  • partial (bool) – If True (default) output will be sent as soon as the first input arrives. Otherwise, all main sources must send at least one item, before an output is generated.

  • default (Any) – If the parameter partial is True, this value is used as the default value to output, until an input has arrived on a source. Defaults to None.

  • monitor (Optional[Union[AsyncIterable[Any], Sequence[AsyncIterable[Any]]]]) – Additional asynchronous sequences to monitor.

  • place_input (string) – Position of the pipeline input source in the output tuple. Options: 'first' (default)|``’last’``

  • monitor_input (bool) – Input is used as a monitored stream instead of a main source. Defaults to False