Guide

Runnel is a distributed event processing framework for Python based on Redis Streams.

It allows you to easily create scalable stream processors, which operate on strictly ordered partitions of event streams. Runnel takes care of assigning partitions to workers, and acknowledging events automatically, so you can focus on your application logic.

Whereas traditional job queues do not provide ordering guarantees, Runnel is designed to process partitions of your event stream strictly in the order events are created.

The App

from runnel import App


app = App(name="myapp", redis_url="redis://127.0.0.1")

The app is the starting point for your Runnel project. You need provide a name and a Redis URL. You will use it to define event streams and processors. Workers will run your app.

The full set of kwargs can be found at runnel.settings.Settings. The settings can be provided as environment variables, e.g. RUNNEL_LOG_LEVEL=debug or as kwargs to the app instance. Many of the settings are defaults and can be overridden on the stream or processor objects you will create.

Defining your event stream

Structured records

from datetime import datetime

from runnel import Record


class Order(Record):
    order_id: int
    created_at: datetime
    amount: float

The Record class allows you to specify structured event types to be stored in your stream. It is implemented using Pydantic and they can be nested arbitrarily:

class Item(Record):
    id: int
    name: str


class Order(Record):
    id: int
    name: str
    items: List[Item]

Redis streams are key-value data structures, storing both as bytes. By default, your records will be JSON-serialized to bytes and stored under a single ‘data’ key in the stream. This is a flexible solution and allows you to optionally compress your records too.

You can provide your own serialisation and compression implementations (see runnel.interfaces)

Primitive records

class Order(Record, primitive=True):
    order_id: int
    complete: bool
    purchased_by: str

You can opt out of complex records and use the native Redis key-value stream type by setting primitive=True. This allows you to benefit from optimisations such as delta compression (see http://antirez.com/news/128), at the cost of not supporting nested values. Primitive records only support fields of int, float, bool, str, and bytes.

The stream

orders = app.stream("orders", record=Order, partition_by="order_id")

An example stream is defined above. You need to give it a name, specify the Record type, and pick a field to partition by. When you create an event, Runnel will use your stream’s hasher to hash the chosen partition key and compute which partition to send it to.

You can specify the stream’s partition_count. This controls the degree of parallelism in your processing. Every partition will be processed by a separate instance of your processor code. If you run multiple workers, the partitions will be distributed evenly between them. To maintain strictly ordered processing, only one processor will process a partition at one time.

The stream’s partition_size controls the maximum length of each partition’s stream. It is implemented using the MAXLEN option to XADD (see the Redis docs).

How to send events

Events can be sent as follows:

from datetime import datetime

from runnel import App, Record


app = App(name="myapp", redis_url="redis://127.0.0.1")


class Order(Record):
    order_id: int
    created_at: datetime
    amount: float


orders = app.stream("orders", record=Order, partition_by="order_id")

await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))

You can also use the command line interface to send events:

$ runnel send example:orders "{\"order_id\": 1, \"created_at\": \"2020-07-21T22:09:37Z\" , \"amount\": 99}"

Or in bulk (assuming ‘myapp/example.py’ includes a stream called ‘actions’):

$ echo "{\"user_id\": 1, \"type\": \"signup\"}" >> data.jsonl
$ echo "{\"user_id\": 2, \"type\": \"signup\"}" >> data.jsonl
$ runnel sendmany myapp.example:actions data.jsonl

Creating a processor

Basic

@app.processor(orders)
async def printer(events):
    async for order in events.records():
        print(order.amount)

This is where your application logic is implemented. By specifying events.records() you are requesting that events are automatically deserialized and received as runnel.Record objects.

Raw events

@app.processor(orders)
async def printer(events):
    async for order in events:
        print(order.amount)

If you want access to the low-level runnel.Event object, you can omit the .records() call and iterate over the events directly.

Batching

@app.processor(orders)
async def printer(events):
    async for orders in events.take(10, within=2).records():
        assert len(orders) <= 10
        print(orders)

If you want to maximize your processing throughput and your application logic can support it, you can enable batching of events. events.take(10, within=2) means take 10 events at a time, but give up after 2 seconds and yield however many are available. This is intended to support logic which benefits from processing multiple events at once, e.g. bulk loading records into a database.

Note

Batching of events is separate to prefetching events from Redis. Prefetching of multiple events is enabled by default because it greatly benefits efficiency (you can control it via the prefetch_count setting, default=8). Prefetched events are buffered inside Runnel workers before they are yielded to your processor. Batching controls how those events are yielded: either individually (the default) or as a list of n events (using events.take(n, within=1)).

Warning

Runnel acks events after every iteration through the event generator loop. When using batching, this means the entire batch will be acked at once. As a result, you must process the batch as a single unit atomically. If you iterate over the events in a batch one-at-a-time and you fail half-way through, then the entire batch will be considered failed (and handled according to your runnel.constants.ExceptionPolicy). This will lead to duplicate processing if the batch is retried, or dropped events if the batch is ignored.

Background tasks

You can define background tasks to run at startup using runnel.App.task:

@app.task
async def on_startup():
    print("starting")

If you only want to run the task on one worker, set on_leader=True:

@app.task(on_leader=True)
async def on_startup():
    print("running once")

Runnel also supports periodic tasks via runnel.App.timer and runnel.App.crontab:

@app.timer(interval=10)
async def every_10_seconds():
    print("10 seconds passed")
@app.crontab("45 17 * * *")
async def every_5_45_pm():
    print("It is 5:45pm UTC")

They can also be configured to run only on the lead worker.

Running a worker

You can run the worker from your shell as follows (assuming you app is defined in example.py):

$ runnel worker example:app

For the full set of command line options, see runnel.cli.worker.

By default, the worker will spawn concurrent tasks to run every processor that has been defined for your app. You can instead select specific processors:

$ runnel worker myapp.example:myapp --processors=myproc1,myproc2

Project layout

Runnel uses decorators to register your processors and background tasks against your app object. Those decorators must run when your Worker starts up. To ensure this happens, even if your code is spread across multiple folders, we offer an ‘autodiscover’ feature:

from runnel import App

app = App(
    name="myapp",
    redis_url="redis://127.0.0.1",
    autodiscover="myproj/**/streams.py",  # <-- Specify your glob pattern here.
)

When set, the worker will search the filesystem using Python’s pathlib.Path.glob function and import any modules that match. In the above example, you could place your processors in any file named ‘streams.py’ anywhere in your project.

This feature is disabled by default because specifying the base directory (e.g. myproj) helps make the search efficient. For small projects, you can simply place your processor code in the same file in which you define your app.

Acknowledgement

Events are acknowledged at the end of each iteration through the processing loop.

@app.processor(orders)
async def printer(events):
    async for order in events.records():
        print(order.amount)
        # This event is acked now by the Runnel system before looping
        # around and yielding a new event.

Since events are acked at the end of the processing block (not at the start), Runnel guarantees ‘at least once’ processing: if an event raises an exception midway through processing and is restarted, that section of processing will run twice.

Exception handling

Generally speaking, exception handling is harder in systems that make guarantees about processing order, like Runnel. In a traditional job queue, you can mark jobs as failed and retry them later, possibly multiple times. If your events must be processed in a strict order, this option is not available.

Runnel supports three exception policies: halt, quarantine, and ignore (see runnel.constants.ExceptionPolicy), which can be configured per processor, e.g.

@app.processor(orders, exception_policy=ExceptionPolicy.HALT)
async def failer(events):
    async for order in events.records():
        raise ValueError
  1. ExceptionPolicy.HALT

Let the exception propagate, halting execution of the affected partition. This is the default, because it guarantees that your events will not be processed out of order. The affected partition will be reassigned to another worker, which will likely experience the same exception and also halt. This will eventually bring down your cluster of workers. Manual intervention is required: you must fix the offending event and restart your workers.

  1. ExceptionPolicy.QUARANTINE

Mark the affected partition as poisoned, and continue processing all others. This is similar to halting, but only affects the partition in which a failing event was found. This option limits the impact of a single bad event, because only one partition will be stalled. Nonetheless, you must manually intervene by fixing the offending event and restarting your workers to ensure it is reassigned.

Since this policy will not kill your worker, you must have a notification system in place to alert you of the need to fix the broken partition. (This is why it’s not the default.)

  1. ExceptionPolicy.IGNORE

Suppress the exception and continue processing regardless. This option ensures that your processors always make forward progress, which is suitable for some use cases (e.g. you are implementing an approximate counter and it’s more important that it’s up-to-date than that it’s accurate).

This option can also be coupled with the builtin DeadLetterMiddleware, to forward failed events to a separate stream, to ensure that no events are lost. See below for an example:

from runnel import App, Record, ExceptionPolicy
from runnel.middleware import DeadLetterMiddleware


app = App(name="example")


class Action(Record):
    key: str
    id: int


actions = app.stream("actions", record=Action, partition_by="key")
dead_letter = DeadLetterMiddleware(source=actions)


@app.processor(actions, exception_policy=ExceptionPolicy.IGNORE, middleware=[dead_letter])
async def proc(events):
    async for event in events.records():
        pass


@app.processor(dead_letter.sink)
async def dead(events):
    async for event in events.records():
        pass

Creating middleware

The dead-letter feature above is an example of custom middleware. Middleware are objects with a handler method, which is an async generator. They are intended to support sharing common reusable logic between many processors.

The handler methods of a middleware chain form a processing pipeline which runs over events before they are yielded to the final processor function. Each handler is passed the previous handler in the pipeline (called ‘parent’).

from runnel.interfaces import Middleware

class Printer(Middleware):
    async def handler(self, parent, **kwargs):
        async for x in parent:
            print(x)
            yield x

@app.processor(mystream, middleware=[Printer()])
async def proc(events):
    async for record in events.records():
        pass

Note

Some of Runnel’s internal functionality is implemented using middleware. They can be found here.

Since processors can elect to receive a batch of events (see Batching), your middleware handlers need to support both individual runnel.Event objects and batches of them:

from runnel import Event


class Noop(Middleware):
    async def handler(self, parent, **kwargs):
        async for x in parent:
            assert isinstance(x, (Event, list))
            yield x

Post-processing logic can be added below, in a finally clause. This allows you to respond to errors further up the chain (e.g. in the final processor code):

async def handler(self, parent, **kwargs):
    async for x in parent:
        try:
            yield x
        finally:
            await logic()

This is needed because a GeneratorExit exception will be thrown into the yield point if an exception is raised in the calling code. (Also note that async generator finalization is scheduled by the Python runtime asynchronously, but the Runnel framework ensures it has finished before restarting a processor function.)

If you only want to know if an exception was raised further up the chain, you can use the following:

async def handler(self, parent, **kwargs):
    async for x in parent:
        try:
            yield x
        except GeneratorExit:
            await cleanup()
            raise

(Note: Python requires that GeneratorExit it not ignored, so it must be reraised here to avoid a RuntimeError)