Architecture¶
This document describes the under-the-hood architecture in detail. For a guide on how to use this library, see the Guide instead.
Sending Events¶
Events are stored in a set of Redis keys. One runnel.App.stream
corresponds to
many Redis stream data structures, controlled by the partition_count
setting. They
can be sent from your Python application code via runnel.Stream.send()
or from the
shell via the CLI runnel.cli.send
or runnel.cli.sendmany
commands. For
every record to send, the correct partition is computed by hashing the partition_key
.
Each partition has a maximum length, controlled by the partition_size
setting. It is
implemented using the MAXLEN
option to XADD
(see the Redis docs).
Processing Events¶
Events are then processed by your runnel.App.processor
functions, which are run
by workers. This is the detailed breakdown of entities spawned by a single worker (each
box is implemented as an asyncio task):
Worker: Created by running $ runnel worker
in your shell. By default, will create an
Executor for every processor you have created in your runnel application. (But you can
specify a subset of processors if you wish, see runnel.cli.worker
.)
Leadership: A periodic task to choose a worker to become the ‘leader’, which means it is
responsible for running background tasks for which on_leader=True
.
Executors: Responsible for executing a user-provided runnel.App.processor
function over partitions of the stream. Newly created executors will trigger a
Rebalance so that partitions are evenly distributed between workers. Will acquire
locks implemented in Redis for every partition it is assigned.
Heartbeat: A periodic task to emit a heartbeat (stored in Redis) every so often, to communicate to other workers that it is alive and well. If an executor’s heartbeat expires, it will be considered dead and its partitions will be reassigned to other workers. Also responsible for extending the executors’ partition locks.
Maintenance: A periodic task to check whether any other workers have died, in which case a Rebalance is triggered.
Control: Listens for messages in a ‘control’ stream in Redis which announces the creation and departure of workers. Notifies the executor that a Rebalance should begin.
Runner: Responsible for concurrently running one Fetcher and multiple Consumers (one for every partition of the stream currently owned by the Executor).
Fetcher: A long-running task to retrieve events from Redis. Spawns multiple Fetch tasks.
Fetch: A task which calls the Redis XREADGROUP command to retrieve events from a set of partitions and store them in internal buffers. Will block if no events are currently pending.
Consumers: Each of these tasks calls the user-provided runnel.App.processor
function for a single partition of the stream, passing it an Events generator.
Responsible for implementing the exception policy (see Exception handling) in
case the processor function fails.
Processor: The user-provided processor function, responsible for iterating over the Events generator and performing arbitrary logic. Each task will receive events for a single partition of the stream.
Events: A generator of events, passed to the user-provided runnel.App.processor
function. Will retrieve events that have been fetched from Redis from an internal
buffer, pass them through any runnel.interfaces.Middleware
defined for the
stream, yield them, and then XACK them so they are
processed only once.
Waiter: A task which waits for a signal to shutdown a Consumer. It will be triggered if the consumer’s partition is no longer owned by this Executor due to a Rebalance.
Redis Keys¶
Assume that we are running the following application:
from runnel import App, Record
app = App(name="myapp", redis_url="redis://127.0.0.1")
class Order(Record):
order_id: int
amount: int
orders = app.stream("orders", record=Order, partition_by="order_id")
@app.processor(orders)
async def printer(events):
async for order in events.records():
print(order.amount)
The following Redis keys will be used:
__strm:myapp.orders.{partition_number}
The partitioned stream data structures for events. partition_number is an integer from 0 to 1-partition_count.
__memb:myapp.orders.printer
A string key holding JSON-encoded membership data for existing executors. Contains the mapping from executors to the partitions they have been assigned.
__ctrl:myapp.orders.printer
A stream for communicating control messages between executors. Used to announce joining/leaving workers which triggers a rebalance.
__lock:myapp.orders.printer.{partition_number}
A lock for every stream partition. Should be owned by the assigned executor. Must be owned before processing a partition.
__lock:myapp.orders.printer.admin
A lock to protect atomic admin operations, such as changing the partition assignments.
__beat:myapp.orders.printer.{executor_id}
An expiring string key to indicate that an executor is still alive. executor_id is a uuid.
__lead:myapp
Holds the name of the current lead worker, which is responsible for running background tasks for which
on_leader=True
.