Developer Guide
Introduction
This part of the documentation describes how to write your own custom section. In most cases, this entails deriving
from slurry.environments.TrioSection
and implementing the slurry.sections.abc.AsyncSection.refine()
method from the AsyncSection abc:
class Squares(TrioSection):
async def refine(self, input, output):
async for i in input:
await output(i*i)
Slurry supports two basic types of sections, defined as abstract base classes,
SyncSection
and AsyncSection
.
Both types of sections define a refine
method, which, when implemented, does the actual processing for the section.
Abstract Base Classes
AsyncSection
Since Slurry is natively an async framework, we will look at this first.
- class slurry.sections.abc.AsyncSection
AsyncSection defines an abc for sections that are designed to run in an async event loop.
- abstract async refine(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])
The async section refine method must contain the logic that iterates the input, processes the indidual items, and feeds results to the output.
- Parameters:
input (Optional[AsyncIterable[Any]]) – The input data feed. Will be
None
for the firstSection
, as the firstSection
is expected to supply it’s own input.output (Callable[[Any], Awaitable[None]]) – An awaitable callable used to send output.
The default TrioSection
environment, is an implementation of
AsyncSection
.
SyncSection
Slurry also supports sections that run synchonously. Synchronous and asynchonous sections can be freely mixed and matched in the pipeline.
- class slurry.sections.abc.SyncSection
SyncSection defines an abc for sections that runs synchronous refiners.
- abstract refine(input: Iterable[Any] | None, output: Callable[[Any], None])
The
SyncSection
refine method is intended to run normal synchronous python code, including code that can block for IO for an any amount of time. Implementations ofSyncSection
should take care to design a pump method in such a way, that blocking happens transparently to the parent async event loop.The refine method is designed to have an api that is as close to the async api as possible. The input is a synchronous iterable instead of an async iterable, and the output is a synchronous callable, similar to a
Queue.put
method.- Parameters:
input (Optional[Iterable[Any]]) – The input data feed. Like with ordinary sections, this can be
None
ifSyncSection
is the first section in the pipeline.output (Callable[[Any], None]) – The callable used to send output.
Slurry includes two implementations of SyncSection
.
ThreadSection
, which runs the refine function in
a background thread, and ProcessSection
which spawns an independent process that runs the refine method.
Section
Implementations of AsyncSection
and
SyncSection
, are refered to as Environments.
Environments implements the pump()
method of the
Section
abstract base class, which acts as a bridge
between the native event loop and the AsyncSection
or
SyncSection
refine
method.
The pump()
abstract method, is scheduled to run as a task
in the native Trio event loop by the pipeline.
The pump method serves as an underlying machinery for pulling and pushing data through the section.
- class slurry.sections.abc.Section
Defines the basic environment api.
- abstract async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])
The pump method contains the machinery that takes input from previous sections, or any asynchronous iterable, processes it and pushes it to the output.
Note
If this section is the first section of a pipeline, the input will be
None
. In this case, the section is expected to produce output independently.- Parameters:
input (Optional[AsyncIterable[Any]]) – The input data feed. Will be
None
for the firstSection
, as the firstSection
is expected to supply it’s own input.output (Callable[[Any], Awaitable[None]]) – An awaitable callable used to send output.
By default, the pipeline tries to manage the input and output resource lifetime. Normally you don’t have to worry about closing the input and output after use. The exception is, if your custom section adds additional input sources, or provides it’s own input. In this case the section must take care of closing the input after use.
Note
The receiving end of the output can be closed by the pipeline, or by the downstream
section, at any time. If you try to send an item to an output that has a closed receiver,
a BrokenResourceError
will be raised. The pipeline knows about this and is prepared
to handle it for you, but if you need to do some kind of cleanup, like closing network
connections for instance, you may want to handle this exception yourself.
Although custom sections should implement either the AsyncSection
or
SyncSection
refine
api, this is not strictly a requirement but rather more
like a convention. Any class that implements the Section
abc is
technically a valid pipeline section and can be used in a pipeline.
Environments
Slurry comes with a set of premade environments, suited for both asynchronous and synchronous processing. As stated earlier, sections can be freely mixed and matched in the pipeline. The section implementation should be designed so that bridging between the synchronous and asynchronous world happens transparently to the end user, without blocking the underlying event loop.
When implementing an environment the following rule should be adhered to, if possible. Both the inputs and the output should be treated as being attached to a zero length buffer. What this means is that the output of the previous section, should block, until the input has been received on the next section. This is the default behaviour of zero capacity trio memory channels. The same behaviour can be achieved using regular queues, by joining the queue after putting an item into it, and waiting for the remote end to call task_done. Any environment that deviates from this behaviour should note so, in it’s documentation.
Trio
The Trio environment implements TrioSection
, which is a Trio-native
AsyncSection
.
- class slurry.environments.TrioSection
Since Trio is the native Slurry event loop, this environment is simple to implement. The pump method does not need to do anything special to bridge the input and output. It simply delegates directly to the refine method, as the api is identical.
- async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])
Calls refine.
Threading
The threading module implements a synchronous section that runs in a background thread.
- class slurry.environments.ThreadSection
ThreadSection defines a section interface which uses a synchronous
refine
method.- async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])
Runs the refine method in a background thread with synchronous input and output wrappers, which transparently bridges the input and outputs between the parent trio event loop and the sync world.
Note
Trio has a limit on how many threads can run simultaneously. See the trio documentation for more information.
Multiprocessing
Implements a section that runs in an independent python proces.
- class slurry.environments.ProcessSection
ProcessSection defines a section interface with a synchronous
refine
method that runs in a separate process. Slurry makes use of the python multiprocessing module to spawn the process.Note
ProcessSection
implementations must be pickleable.- async pump(input: AsyncIterable[Any] | None, output: Callable[[Any], Awaitable[None]])
The
ProcessSection
pump method works similar to the threaded version, however since communication between processes is not as simple as it is between threads, that are directly able to share memory with each other, there are some restrictions to be aware of.Data that is to be sent to the input or transmitted on the output must be pickleable.
Since
ProcessSection
uses unbounded queues to transfer data behind the scenes, they are unable to provide or receive backpressure.
Welding sections together
The weld module implements the weld
function that connects PipelineSection
objects
together and returns the async iterable output.
The weld
function could be considered the secret sauce
of Slurry. The main idea behind Slurry is to have a series of asynchronous, independent tasks, communicating
via memory channels. Setting up tasks for data processing and supplying them with communication infrastructure
can quickly become quite repetitive when using vanilla Trio code. This is where the weld
function comes in.
It automatically takes care of all the boilerplate of feeding inputs to sections, connecting sections via memory
channels and returning the output. This means the programmer can focus on designing the actual dataprocessing steps,
and not having to worry about building message passing infrastructure.
The main Pipeline
class uses the weld
function to compose the sequence of
PipelineSection
objects and return an output. Similarly, the
combiner sections use the weld
function to support defining
sub-pipelines as input sources, using the tuple notation. User defined
custom sections, can also use the weld
funcion to add the same functionality.
- slurry.sections.weld.weld(nursery, *sections: Section | Tuple[Section | Tuple[PipelineSection, ...], ...]) AsyncIterable[Any]
Connects the individual parts of a sequence of pipeline sections together and starts pumps for individual Sections. It returns an async iterable which yields results of the sequence.
- Parameters:
nursery (
trio.Nursery
) – The nursery that runs individual pipeline section pumps.*sections (PipelineSection) – Pipeline sections.