API Reference

runnel

class runnel.App(name: str, **kwargs)[source]

This is the main abstraction provided by Runnel. Use the stream() and processor() methods to define partitioned event streams and processors respectively. Apps will be run by workers via the CLI.

Parameters
  • name (str) – An app identifier. Will be used as a component of the Redis keys for the under-the-hood data structures.

  • kwargs (Dict) – Any of the settings found in runnel.settings to override any options provided by environment variables.

Examples

>>> from runnel import App, Record
...
>>> app = App(
...     name="example",
...     log_level="info",
...     redis_url="127.0.0.1:6379",
... )

Specify your event types using the Record class:

>>> class Order(Record):
...     order_id: int
...     created_at: datetime
...     amount: int
...     item_ids: List[int]

Streams are configured to be partitioned by a chosen key.

>>> orders = app.stream("orders", record=Order, partition_by="order_id")

Processor functions iterate over the event stream and can rely on receiving events in the order they were created per key.

>>> @app.processor(orders)
... async def printer(events):
...     async for order in events.records():
...         print(order.amount)

Under the hood, Runnel will take care of partitioning the stream to enable scalable distributed processing. We also take care of concurrently running the async processor functions – one for every partition in your stream. This processing may be distributed across many workers on different machines and Runnel will coordinate ownership of partitions dynamically as workers join or leave.

stream(name, record, partition_by, serializer=None, partition_count=None, partition_size=None, hasher=None)[source]

A set of partitioned Redis streams, containing events as structured Record types.

Kwargs, if provided, will override the default settings configured on the App instance or via environment variables (see runnel.settings) for this stream.

Parameters
  • name (str) – An stream identifier. Will be used as a component of the Redis keys for the under-the-hood data structures.

  • record (Type[Record]) – A class that inherits from Record, which specifies the structure of the event data this stream expects. See runnel.Record.

  • partition_by (Union[str, Callable[Record, Any]]) – A str representing an attribute of the Record type (or a callable to compute a value) which should be used to partition events. For example, if your events concern user activity and you want to process events in-order per user, you might choose the “user_id” attribute to partition by.

  • serializer (Serializer) – An object implementing the runnel.interfaces.Serializer interface which controls how records are stored in the Redis streams.

  • partition_count (int) – How many partitions to create.

  • partition_size (int) – The max length of each partition. (Implemented approximately via Redis’ MAXLEN option to XACK.) Represents the size of the buffer in case processors are offline or cannot keep up with the event rate.

  • hasher (Callable[Any, int]) – A function used to hash the partition key to decide to which partition to send a record.

Examples

>>> from runnel import App, Record, JSONSerializer
...
>>> app = App(name="example")
...
>>> class Order(Record):
...     order_id: int
...     amount: int

Streams are configured to be partitioned by a chosen key.

>>> orders = app.stream(
...     name="orders",
...     record=Order,
...     partition_by="order_id",
...     partition_count=16,
...     serializer=JSONSerializer(),
... )
processor(stream, *, name=None, exception_policy=None, middleware=None, lock_expiry=None, read_timeout=None, prefetch_count=None, assignment_attempts=None, assignment_sleep=None, grace_period=None, pool_size=None, join_delay=None)[source]

A wrapper around an async Python function which iterates over a continuous event stream.

Kwargs, if provided, will override the default settings configured on the App instance or via environment variables (see runnel.settings) for this processor.

Notes

Events are acknowledged at the end of every processing loop. This means that if your processor crashed before completion, that section of work will be repeated when the processor is restarted. Therefore Runnel provides ‘at least once’ semantics.

Parameters
  • stream (runnel.App.stream) – The stream this processor will iterate over.

  • name (str) – Used in the Redis keys relating to this processor. Must be unique together with the App and Stream. Default: your function’s __name__.

  • exception_policy (ExceptionPolicy) –

    How to handle exceptions raised in the user-provided processor coroutine.

    • HALT: Raise the exception, halting execution of the affected partition.

    • QUARANTINE: Mark the affected partition as poisoned, and continue with others.

    • IGNORE: Suppress the exception and continue processing regardless.

    Default: HALT.

  • middleware (List[Middleware]) – A list of Middleware objects for managaing the data pipeline. Can be used to implement custom exception handling (e.g. dead letter queues).

  • lock_expiry (int (seconds)) – The duration of the lock on stream partitions owned by executors of this processor. This controls the worst case lag a partition’s events may experience since other executors will have to wait acquire the lock in case the owner has died.

  • read_timeout (int (milliseconds)) – How long to stay blocked reading from Redis via XREADGROUP. Nothing depends on this.

  • prefetch_count (int) – The maximum number of events to read from Redis per partition owned by an executor. (If a single executor owns all 16 partitions in a stream and prefetch_count is 10, then 160 events may be read at once.) Purely an optimisation.

  • assignment_attempts (int) – How many times to try to complete a rebalance operation (i.e. acquire our declared partitions) before giving up.

  • assignment_sleep (float (seconds)) – How long to wait between attempts to complete a rebalance operation.

  • grace_period (float (seconds)) – How long to wait for execution to complete gracefully before cancelling it.

  • pool_size (int) – How many concurrent connections to make to Redis to read events.

  • join_delay (int (seconds)) – How long to wait after joining before attempting to acquire partitions. Intended to mitigate a thundering herd problem of multiple workers joining simultaneously and needing to rebalance multiple times.

Examples

>>> from runnel import App, Record
...
>>> app = App(name="example")
...
>>> class Order(Record):
...     order_id: int
...     amount: int
...
>>> orders = app.stream(name="orders", record=Order, partition_by="order_id")
...
>>> @app.processor(orders)
... async def printer(events):
...     async for order in events.records():
...         print(order.amount)
task(func=None, *, on_leader=False)[source]

Define an async function to run at worker startup.

Parameters

on_leader (bool) – Whether to run the function only on one worker: the elected leader.

Examples

>>> @app.task
>>> async def on_startup():
...     print("starting")

If you want the task to run on only one worker:

>>> @app.task(on_leader=True)
>>> async def on_startup():
...    print(f"running once")
timer(*, interval: int, on_leader=False)[source]

Define an async function to be run at periodic intervals.

Parameters
  • interval (float (seconds)) – How often the function executes in seconds.

  • on_leader (bool) – Whether to run the function only on one worker: the elected leader.

Examples

>>> @app.timer(interval=10)
>>> async def every_10_seconds():
...     print("10 seconds passed")

If you want the task to run on only one worker:

>>> app.timer(interval=5, on_leader=True)
>>> async def every_5_seconds():
...     print("5 seconds passed on the leader")
crontab(spec: str, *, timezone=None, on_leader=False)[source]

Define an async function to be run at the fixed times, defined by the Cron format (see https://crontab.guru/ for examples).

Parameters
  • spec (str) – The Cron spec defining fixed times to run the decorated function.

  • timezone (tzinfo) – The timezone to be taken into account for the Cron jobs. If not set value from runnel.settings.Settings.timezone will be taken.

  • on_leader (bool) – Whether to run the function only on one worker: the elected leader.

Examples

>>> app.crontab("45 17 * * *")
>>> async def every_5_45_pm():
...     print("It is 5:45pm UTC")

If you want the task to run on only one worker:

>>> app.crontab("45 17 * * *", on_leader=True)
>>> async def every_5_45_pm():
...     print("It is 5:45pm UTC on the leader")

With a timezone specification:

>>> @app.crontab("45 17 * * *", timezone=pytz.timezone('GMT'))
>>> async def every_5_45_pm():
...     print("It is 5:45pm in London")
class runnel.Record(*args, **kwargs)[source]

This class is used to specify structured data types to store in streams.

It is a specialized version of pydantic.BaseModel, see https://pydantic-docs.helpmanual.io/ for more information.

Examples

>>> class Order(Record):
...     order_id: int
...     created_at: datetime
...     amount: int
...     item_ids: List[int]

Records can be nested arbitrarily, e.g. items: List[Item] where Item is another record. They will be serialized (according to the serializer/compressor settings) and stored as arbitrary bytes as a single value in a Redis stream entry.

Alternatively, you can use the native Redis stream key/value pairs by setting primitive=True, e.g.:

>>> class UserAction(Record, primitive=True):
...     user_id: int
...     type: str

This allows you to benefit from optimisations such as delta compression (see http://antirez.com/news/128), at the cost of not supporting nested values. Only int, float, bool, str, and bytes are allowed.

class runnel.Events(runner: Runner, partition: Partition, want: str = 'events', batch_args: Optional[Tuple] = None, failed: Set[Event] = <factory>, finalized: anyio.abc.Event = <factory>, agen: AsyncIterator[Union[Event, List[Event]]] = None)[source]

An async generator which yields Events (or batches of them). This is the object passed to user-defined processor functions.

Examples

>>> from runnel import App, Record
...
>>> app = App(name="example")
...
>>> class Order(Record):
...     order_id: int
...     amount: int
...
>>> orders = app.stream(name="orders", record=Order, partition_by="order_id")
...
>>> @app.processor(orders)
... async def printer(events):
...     async for order in events.records():
...         print(order.amount)
take(n, within)[source]

Configure the events generator to yield batches of n events (unless within seconds pass before n are ready, in which case yield all pending events).

Parameters
  • n (int) – The desired batch size.

  • within (int (seconds)) – The duration to wait for the batch size to be reached before yielding.

Examples

>>> @app.processor(orders)
... async def printer(events):
...     async for orders in events.take(10, within=1).records():
...         # Handle orders as an atomic batch!
...         assert 1 <= len(orders) <= 10
...         print(orders)

Notes

This method is provided for efficiency. It is intended to be used where batch processing of events greatly increases your processing speed. For example, if you are loading records into a database, you may want to use its bulk import API to ingest a batch of records at a time.

Warning

Runnel acks events after every iteration through the event generator loop. When using take, this means the entire batch will be acked at once. As a result, you must process the batch as a single unit atomically. If you iterate over the events in a batch one-at-a-time and you fail half-way through, then the entire batch will be considered failed (and handled according to your runnel.constants.ExceptionPolicy). This will lead to duplicate processing if the batch is retried, or dropped events if the batch is ignored.

records()[source]

Configure the events generator to deserialize events into Record objects.

Examples

>>> from runnel import Record
...
>>> @app.processor(orders)
... async def printer(events):
...     async for order in events.records():
...         assert isinstance(event, Record)
...         print(order.amount)

If this method is omitted, you will iterate over the low-level runnel.Event objects, which gives you access to the raw data as Dict[bytes, bytes].

>>> from runnel import Event
...
>>> @app.processor(orders)
... async def printer(events):
...     async for event in events:
...         assert isinstance(event, Event)
...         print(event.data)
class runnel.Event(runner: Runner, partition: Partition, group: bytes, xid: bytes, data: Dict[bytes, bytes], recovering: bool)[source]

The internal representation of an event in a stream. Will ordinarily by deserialized into a Record type before it is acted upon: e.g. via async for record in events.records():. This low-level representation is also available if necessary.

Variables
  • partition (Partition) – The partition this event came from.

  • xid (bytes) – The Redis stream ID.

  • data (Dict[bytes, bytes]) – The keys and values retrieved from the Redis stream.

Examples

>>> @app.processor(order)
... async def myproc(events):
...     async for event in events:
...         print(event.xid)
class runnel.Partition(stream: Stream, number: int, pointer: bytes = b'0-0', lock: anyio.abc.Lock = <factory>)[source]

The internal representation of a partition of the stream.

Variables
  • number (int) – Which numerical partition this is.

  • key (str) – The Redis key under which the stream data structure is stored.

class runnel.Stream(app: App, name: str, record: Type[runnel.record.Record], partition_by: Union[str, Callable], serializer: runnel.interfaces.Serializer, hasher: Callable[[Any], int], partition_count: int, partition_size: int)[source]

A set of partitioned Redis streams, together representing a single logical event stream.

Not intended to be used directly. Use runnel.App.stream instead.

async send(*records: Iterable[runnel.record.Record], stream_ids=None)[source]

Send records to partitions of the stream, according to their partition keys.

Parameters
  • records (Iterable[Record]) – The records to send.

  • stream_ids (Optional[Iterable[str]]) – A list of stream_ids corresponding to the records. Must be the same length as records. If None, then "*" will be used for all records. See https://redis.io/commands/xadd for more details.

runnel.settings

class runnel.settings.Settings(_env_file: Optional[Union[pathlib.Path, str]] = '<object object>', _env_file_encoding: Optional[str] = None, *, redis_url: str = 'redis://127.0.0.1:6379', log_format: str = 'console', log_level: str = 'info', autodiscover: str = None, timezone: datetime.tzinfo = <UTC>, leadership_poll_interval: int = 4000, testing: bool = False, default_partition_count: int = 16, default_serializer: Union[pydantic.types.PyObject, str] = 'runnel.serialization.default', default_hasher: Union[pydantic.types.PyObject, str] = 'runnel.hashing.default', default_exception_policy: runnel.constants.ExceptionPolicy = <ExceptionPolicy.HALT: 'halt'>, default_lock_expiry: int = 120, default_read_timeout: int = 2000, default_prefetch_count: int = 8, default_assignment_attempts: int = 32, default_assignment_sleep: float = 2, default_grace_period: float = 8, default_pool_size: int = 16, default_join_delay: int = 2, default_partition_size: int = 50000)[source]

This class should not be used directly, but acts as a container for application settings, configured via environment variables (e.g. RUNNEL_LOG_LEVEL=debug) or as kwargs to the App object.

Many of the settings represent defaults for processors or streams and can be overridden at object initialisation time.

Parameters
  • redis_url (str) – The URL to pass to aredis.StrictRedis.from_url. Default: "127.0.0.1:6379".

  • log_format (str) – Either "json" or "console" to specify the log output format.

  • log_level (str) – The minimum log level to display: one of "debug", "info", "warning" is recommended.

  • autodiscover (str) – The pattern for pathlib.Path.glob() to find modules containing Runnel app-decorated functions (e.g. processors, tasks), which the worker must import on startup. Will be called relative to current working directory. For example, use 'myproj/**/streams.py' to find all modules called ‘streams’ inside the ‘myproj’ folder. Default None.

  • timezone (tzinfo) – The timezone to use by default for app.crontab schedules. Default: pytz.UTC.

  • leadership_poll_interval (int (milliseconds)) – How long to sleep between worker leadership election attempts. Default: 4000

  • testing (bool) – Whether we are currently running the test suite.

  • default_serializer (Serializer) – An object implementing the Serializer interface which controls how records are stored in the Redis streams. Default: JSONSerialzer unless orjson is installed (use the runnel[fast] bundle to install automatically), in which case FastJSONSerializer.

  • default_partition_count (int) – How many partitions to create. Default: 16.

  • default_partition_size (int) – The max length of each partition. (Implemented approximately via Redis’ MAXLEN option to XACK.) Represents the size of the buffer in case processors are offline or cannot keep up with the event rate. Default: 50_000.

  • default_hasher (Callable[Any, int]) – A function used to hash a record’s partition key to decide which partition to send it to. Defaults to md5 unless xxhash is installed (use the runnel[fast] bundle to install automatically)

  • default_exception_policy (ExceptionPolicy) –

    How to handle exceptions raised in the user-provided processor coroutine.

    • HALT: Raise the exception, halting execution of the affected partition.

    • QUARANTINE: Mark the affected partition as poisoned, and continue with others.

    • IGNORE: Suppress the exception and continue processing regardless.

    Default: HALT.

  • default_lock_expiry (int (seconds)) – The duration of the lock on stream partitions owned by executors of this processor. This controls the worst case lag a partition’s events may experience since other executors will have to wait acquire the lock in case the owner has died. Default: 120.

  • default_read_timeout (int (milliseconds)) – How long to stay blocked reading from Redis via XREADGROUP. Should be smaller than the grace_period given to processors. Default: 2000.

  • default_prefetch_count (int) – The maximum number of events to read from Redis per partition owned by an executor. (If a single executor owns all 16 partitions in a stream and prefetch_count is 10, then 160 events may be read at once.) Purely an optimisation. Default: 8.

  • default_assignment_attempts (int) – How many times to try to complete a rebalance operation (i.e. acquire our declared partitions) before giving up. Default: 32.

  • default_assignment_sleep (float (seconds)) – How long to wait between attempts to complete a rebalance operation. Default: 2.

  • default_grace_period (float (seconds)) – How long to wait for execution to complete gracefully before cancelling it. Default: 8.

  • default_pool_size (int) – How many concurrent connections to make to Redis to read events. Default: 16.

  • default_join_delay (int (seconds)) – How long to wait after joining before attempting to acquire partitions. Intended to mitigate a thundering herd problem of multiple workers joining simultaneously and needing to rebalance multiple times. Default: 2.

runnel.interfaces

class runnel.interfaces.Compressor[source]

Implement this interface to customise event data compression.

Examples

>>> import gzip
...
>>> class Gzip(Compressor):
...     def compress(self, value):
...         return gzip.compress(value)
...
...     def decompress(self, value):
...         return gzip.decompress(value)

Then configure your serializer to use it:

>>> from runnel import App, Record, JSONSerializer
...
>>> app = App(name="example")
...
>>> class Order(Record):
...     order_id: int
...     amount: int
...
>>> orders = app.stream(
...     name="orders",
...     record=Order,
...     partition_by="order_id",
...     serializer=JSONSerializer(compressor=Gzip()),
... )
compress(value: bytes) → bytes[source]

Return compressed bytes.

decompress(value: bytes) → bytes[source]

Return decompressed bytes.

class runnel.interfaces.Serializer(compressor: runnel.interfaces.Compressor = None)[source]

Implement this interface to customise event data serialization.

Examples

>>> import orjson  # A fast JSON library written in Rust.
...
>>> @dataclass(frozen=True)
>>> class FastJSONSerializer(Serializer):
>>>     compressor = None
...
...     def dumps(self, value):
...         return orjson.dumps(value)
...
...     def loads(self, value):
...         return orjson.loads(value)

Then pass it to your stream:

>>> from runnel import App, Record
...
>>> app = App(name="example")
...
>>> class Order(Record):
...     order_id: int
...     amount: int
...
>>> orders = app.stream(
...     name="orders",
...     record=Order,
...     partition_by="order_id",
...     serializer=FastJSONSerializer(),
... )
dumps(value: Any) → bytes[source]

Return serialized bytes.

loads(value: bytes) → Any[source]

Return deserialized Python object.

class runnel.interfaces.Middleware[source]

Middleware are objects with a handler method (decribed below). Processors accept a list of user-provided middleware.

The handler methods form a processing pipeline which runs over events before they are yielded to the final processor function. Each handler is passed the previous handler in the pipeline (called ‘parent’).

Examples

>>> from runnel.interfaces import Middleware
...
>>> class Printer(Middleware):
...     async def handler(self, parent, **kwargs):
...         async for x in parent:
...             print(x)
...             yield x

It can then be passed to the processor.

>>> @app.processor(mystream, middleware=[Printer()])
... async def proc(events):
...     async for record in events.records():
...         pass

Notes

Some of Runnel’s internal functionality is implemented using middleware. They can be found here.

async handler(parent, **kwargs) → AsyncIterator[Union[Event, List[Event]]][source]

A middleware handler is an async generator. Given a parent generator that yields Events or batches of Events, it must yield the same.

For example:

>>> async for x in parent:
...     assert isinstance(x, (Event, list))
...     yield x

Post-processing logic can be added below, in a finally clause. This allows you to respond to errors further up the chain (e.g. in the final processor code):

>>> async for x in parent:
...     try:
...         yield x
...     finally:
...         await logic()

This is needed because a GeneratorExit exception will be thrown into the yield point if an exception is raised in the calling code. (Also note that async generator finalization is scheduled by the Python runtime asynchronously, but the Runnel framework ensures it has finished before restarting a processor function.)

If you only want to know if an exception was raised further up the chain, you can use the following:

>>> async for x in parent:
...     try:
...         yield x
...     except GeneratorExit:
...         await cleanup()
...         raise

(Note: Python requires that GeneratorExit it not ignored, so it must be reraised here to avoid a RuntimeError)

runnel.constants

class runnel.constants.ExceptionPolicy(value)[source]

An enum specifying how to handle exceptions raised in the user-provided processor coroutine.

  • HALT: Raise the exception, halting execution of the affected partition.

  • QUARANTINE: Mark the affected partition as poisoned, and continue with others.

  • IGNORE: Suppress the exception and continue processing regardless.

Examples

>>> @app.processor(orders, exception_policy=ExceptionPolicy.HALT)
... async def printer(events):
...     async for order in events.records():
...         print(order.amount)