Developer Guide

Introduction

This part of the documentation describes how to write your own custom section. In most cases, this entails deriving from slurry.environments.TrioSection and implementing the slurry.sections.abc.AsyncSection.refine() method from the AsyncSection abc:

class Squares(TrioSection):
    async def refine(self, input, output):
        async for i in input:
            await output(i*i)

Slurry supports two basic types of sections, defined as abstract base classes, SyncSection and AsyncSection. Both types of sections define a refine method, which, when implemented, does the actual processing for the section.

Abstract Base Classes

AsyncSection

Since Slurry is natively an async framework, we will look at this first.

class slurry.sections.abc.AsyncSection

AsyncSection defines an abc for sections that are designed to run in an async event loop.

abstract async refine(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])

The async section refine method must contain the logic that iterates the input, processes the indidual items, and feeds results to the output.

Parameters:
  • input (Optional[AsyncIterable[Any]]) – The input data feed. Will be None for the first Section, as the first Section is expected to supply it’s own input.

  • output (Callable[[Any], Awaitable[None]]) – An awaitable callable used to send output.

The default TrioSection environment, is an implementation of AsyncSection.

SyncSection

Slurry also supports sections that run synchonously. Synchronous and asynchonous sections can be freely mixed and matched in the pipeline.

class slurry.sections.abc.SyncSection

SyncSection defines an abc for sections that runs synchronous refiners.

abstract refine(input: Iterable[Any] | None, output: Callable[[Any], None])

The SyncSection refine method is intended to run normal synchronous python code, including code that can block for IO for an any amount of time. Implementations of SyncSection should take care to design a pump method in such a way, that blocking happens transparently to the parent async event loop.

The refine method is designed to have an api that is as close to the async api as possible. The input is a synchronous iterable instead of an async iterable, and the output is a synchronous callable, similar to a Queue.put method.

Parameters:
  • input (Optional[Iterable[Any]]) – The input data feed. Like with ordinary sections, this can be None if SyncSection is the first section in the pipeline.

  • output (Callable[[Any], None]) – The callable used to send output.

Slurry includes two implementations of SyncSection. ThreadSection, which runs the refine function in a background thread, and ProcessSection which spawns an independent process that runs the refine method.

Section

Implementations of AsyncSection and SyncSection, are refered to as Environments. Environments implements the pump() method of the Section abstract base class, which acts as a bridge between the native event loop and the AsyncSection or SyncSection refine method.

The pump() abstract method, is scheduled to run as a task in the native Trio event loop by the pipeline. The pump method serves as an underlying machinery for pulling and pushing data through the section.

class slurry.sections.abc.Section

Defines the basic environment api.

abstract async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])

The pump method contains the machinery that takes input from previous sections, or any asynchronous iterable, processes it and pushes it to the output.

Note

If this section is the first section of a pipeline, the input will be None. In this case, the section is expected to produce output independently.

Parameters:
  • input (Optional[AsyncIterable[Any]]) – The input data feed. Will be None for the first Section, as the first Section is expected to supply it’s own input.

  • output (Callable[[Any], Awaitable[None]]) – An awaitable callable used to send output.

By default, the pipeline tries to manage the input and output resource lifetime. Normally you don’t have to worry about closing the input and output after use. The exception is, if your custom section adds additional input sources, or provides it’s own input. In this case the section must take care of closing the input after use.

Note

The receiving end of the output can be closed by the pipeline, or by the downstream section, at any time. If you try to send an item to an output that has a closed receiver, a BrokenResourceError will be raised. The pipeline knows about this and is prepared to handle it for you, but if you need to do some kind of cleanup, like closing network connections for instance, you may want to handle this exception yourself.

Although custom sections should implement either the AsyncSection or SyncSection refine api, this is not strictly a requirement but rather more like a convention. Any class that implements the Section abc is technically a valid pipeline section and can be used in a pipeline.

Environments

Slurry comes with a set of premade environments, suited for both asynchronous and synchronous processing. As stated earlier, sections can be freely mixed and matched in the pipeline. The section implementation should be designed so that bridging between the synchronous and asynchronous world happens transparently to the end user, without blocking the underlying event loop.

When implementing an environment the following rule should be adhered to, if possible. Both the inputs and the output should be treated as being attached to a zero length buffer. What this means is that the output of the previous section, should block, until the input has been received on the next section. This is the default behaviour of zero capacity trio memory channels. The same behaviour can be achieved using regular queues, by joining the queue after putting an item into it, and waiting for the remote end to call task_done. Any environment that deviates from this behaviour should note so, in it’s documentation.

Trio

The Trio environment implements TrioSection, which is a Trio-native AsyncSection.

class slurry.environments.TrioSection

Since Trio is the native Slurry event loop, this environment is simple to implement. The pump method does not need to do anything special to bridge the input and output. It simply delegates directly to the refine method, as the api is identical.

async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])

Calls refine.

Threading

The threading module implements a synchronous section that runs in a background thread.

class slurry.environments.ThreadSection

ThreadSection defines a section interface which uses a synchronous refine method.

async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])

Runs the refine method in a background thread with synchronous input and output wrappers, which transparently bridges the input and outputs between the parent trio event loop and the sync world.

Note

Trio has a limit on how many threads can run simultaneously. See the trio documentation for more information.

Multiprocessing

Implements a section that runs in an independent python proces.

class slurry.environments.ProcessSection

ProcessSection defines a section interface with a synchronous refine method that runs in a separate process. Slurry makes use of the python multiprocessing module to spawn the process.

Note

ProcessSection implementations must be pickleable.

async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])

The ProcessSection pump method works similar to the threaded version, however since communication between processes is not as simple as it is between threads, that are directly able to share memory with each other, there are some restrictions to be aware of.

  • Data that is to be sent to the input or transmitted on the output must be pickleable.

  • Since ProcessSection uses unbounded queues to transfer data behind the scenes, they are unable to provide or receive backpressure.

Welding sections together

The weld module implements the weld function that connects PipelineSection objects together and returns the async iterable output.

The weld function could be considered the secret sauce of Slurry. The main idea behind Slurry is to have a series of asynchronous, independent tasks, communicating via memory channels. Setting up tasks for data processing and supplying them with communication infrastructure can quickly become quite repetitive when using vanilla Trio code. This is where the weld function comes in. It automatically takes care of all the boilerplate of feeding inputs to sections, connecting sections via memory channels and returning the output. This means the programmer can focus on designing the actual dataprocessing steps, and not having to worry about building message passing infrastructure.

The main Pipeline class uses the weld function to compose the sequence of PipelineSection objects and return an output. Similarly, the combiner sections use the weld function to support defining sub-pipelines as input sources, using the tuple notation. User defined custom sections, can also use the weld funcion to add the same functionality.

slurry.sections.weld.weld(nursery, *sections: Section | Tuple[Section | Tuple[PipelineSection, ...], ...]) AsyncIterable[Any]

Connects the individual parts of a sequence of pipeline sections together and starts pumps for individual Sections. It returns an async iterable which yields results of the sequence.

Parameters:
  • nursery (trio.Nursery) – The nursery that runs individual pipeline section pumps.

  • *sections (PipelineSection) – Pipeline sections.