Source code for runnel.app

from typing import Callable, Coroutine, Dict, Set

import anyio
import structlog
from aredis import StrictRedis

from runnel.exceptions import Misconfigured
from runnel.logging import init_logging
from runnel.processor import Processor
from runnel.settings import Settings
from runnel.stream import Stream
from runnel.utils import seconds_until
from runnel.worker import Worker

logger = structlog.get_logger(__name__)


[docs]class App: """ This is the main abstraction provided by Runnel. Use the :meth:`~.stream` and :meth:`~.processor` methods to define partitioned event streams and processors respectively. Apps will be run by workers via the CLI. Parameters ---------- name : str An app identifier. Will be used as a component of the Redis keys for the under-the-hood data structures. kwargs : Dict Any of the settings found in :mod:`runnel.settings` to override any options provided by environment variables. Examples -------- >>> from runnel import App, Record ... >>> app = App( ... name="example", ... log_level="info", ... redis_url="127.0.0.1:6379", ... ) Specify your event types using the Record class: >>> class Order(Record): ... order_id: int ... created_at: datetime ... amount: int ... item_ids: List[int] Streams are configured to be partitioned by a chosen key. >>> orders = app.stream("orders", record=Order, partition_by="order_id") Processor functions iterate over the event stream and can rely on receiving events in the order they were created per key. >>> @app.processor(orders) ... async def printer(events): ... async for order in events.records(): ... print(order.amount) Under the hood, Runnel will take care of partitioning the stream to enable scalable distributed processing. We also take care of concurrently running the async processor functions -- one for every partition in your stream. This processing may be distributed across many workers on different machines and Runnel will coordinate ownership of partitions dynamically as workers join or leave. """ def __init__(self, name: str, **kwargs): self.name: str = name self.settings: Settings = Settings(**kwargs) self.redis = StrictRedis.from_url(self.settings.redis_url) self.workers: Set[Worker] = set() self.tasks: Set[Coroutine] = set() self.processors: Dict[str, Coroutine] = {} self.scripts: Dict[str, Callable] = {} init_logging( level=self.settings.log_level, format=self.settings.log_format, )
[docs] def stream( self, name, record, partition_by, serializer=None, partition_count=None, partition_size=None, hasher=None, ): """ A set of partitioned Redis streams, containing events as structured Record types. Kwargs, if provided, will override the default settings configured on the App instance or via environment variables (see :mod:`runnel.settings`) for this stream. Parameters ---------- name : str An stream identifier. Will be used as a component of the Redis keys for the under-the-hood data structures. record: Type[Record] A class that inherits from Record, which specifies the structure of the event data this stream expects. See :class:`runnel.Record`. partition_by : Union[str, Callable[Record, Any]] A str representing an attribute of the Record type (or a callable to compute a value) which should be used to partition events. For example, if your events concern user activity and you want to process events in-order per user, you might choose the "user_id" attribute to partition by. serializer : Serializer An object implementing the :class:`runnel.interfaces.Serializer` interface which controls how records are stored in the Redis streams. partition_count : int How many partitions to create. partition_size : int The max length of each partition. (Implemented approximately via Redis' MAXLEN option to XACK.) Represents the size of the buffer in case processors are offline or cannot keep up with the event rate. hasher: Callable[Any, int] A function used to hash the partition key to decide to which partition to send a record. Examples -------- >>> from runnel import App, Record, JSONSerializer ... >>> app = App(name="example") ... >>> class Order(Record): ... order_id: int ... amount: int Streams are configured to be partitioned by a chosen key. >>> orders = app.stream( ... name="orders", ... record=Order, ... partition_by="order_id", ... partition_count=16, ... serializer=JSONSerializer(), ... ) """ if serializer is None and not record._primitive: serializer = self.settings.default_serializer return Stream( app=self, name=name, record=record, partition_by=partition_by, serializer=serializer, partition_count=partition_count or self.settings.default_partition_count, partition_size=partition_size or self.settings.default_partition_size, hasher=hasher or self.settings.default_hasher, )
[docs] def processor( self, stream, *, name=None, exception_policy=None, middleware=None, lock_expiry=None, read_timeout=None, prefetch_count=None, assignment_attempts=None, assignment_sleep=None, grace_period=None, pool_size=None, join_delay=None ): """ A wrapper around an async Python function which iterates over a continuous event stream. Kwargs, if provided, will override the default settings configured on the App instance or via environment variables (see :mod:`runnel.settings`) for this processor. Notes ----- Events are acknowledged at the end of every processing loop. This means that if your processor crashed before completion, that section of work will be repeated when the processor is restarted. Therefore Runnel provides 'at least once' semantics. Parameters ---------- stream : runnel.App.stream The stream this processor will iterate over. name : str Used in the Redis keys relating to this processor. Must be unique together with the App and Stream. Default: your function's ``__name__``. exception_policy : ExceptionPolicy How to handle exceptions raised in the user-provided processor coroutine. * ``HALT``: Raise the exception, halting execution of the affected partition. * ``QUARANTINE``: Mark the affected partition as poisoned, and continue with others. * ``IGNORE``: Suppress the exception and continue processing regardless. Default: ``HALT``. middleware : List[Middleware] A list of Middleware objects for managaing the data pipeline. Can be used to implement custom exception handling (e.g. dead letter queues). lock_expiry : int (seconds) The duration of the lock on stream partitions owned by executors of this processor. This controls the worst case lag a partition's events may experience since other executors will have to wait acquire the lock in case the owner has died. read_timeout : int (milliseconds) How long to stay blocked reading from Redis via XREADGROUP. Nothing depends on this. prefetch_count : int The maximum number of events to read from Redis per partition owned by an executor. (If a single executor owns all 16 partitions in a stream and prefetch_count is 10, then 160 events may be read at once.) Purely an optimisation. assignment_attempts : int How many times to try to complete a rebalance operation (i.e. acquire our declared partitions) before giving up. assignment_sleep : float (seconds) How long to wait between attempts to complete a rebalance operation. grace_period : float (seconds) How long to wait for execution to complete gracefully before cancelling it. pool_size : int How many concurrent connections to make to Redis to read events. join_delay : int (seconds) How long to wait after joining before attempting to acquire partitions. Intended to mitigate a thundering herd problem of multiple workers joining simultaneously and needing to rebalance multiple times. Examples -------- >>> from runnel import App, Record ... >>> app = App(name="example") ... >>> class Order(Record): ... order_id: int ... amount: int ... >>> orders = app.stream(name="orders", record=Order, partition_by="order_id") ... >>> @app.processor(orders) ... async def printer(events): ... async for order in events.records(): ... print(order.amount) """ if not isinstance(stream, Stream): raise Misconfigured("You must pass a stream to the app.processor decorator") kwargs = { "exception_policy": exception_policy or self.settings.default_exception_policy, "middleware": middleware or [], "lock_expiry": lock_expiry or self.settings.default_lock_expiry, "read_timeout": read_timeout or self.settings.default_read_timeout, "prefetch_count": prefetch_count or self.settings.default_prefetch_count, "assignment_attempts": assignment_attempts or self.settings.default_assignment_attempts, "assignment_sleep": assignment_sleep or self.settings.default_assignment_sleep, "grace_period": grace_period or self.settings.default_grace_period, "pool_size": pool_size or self.settings.default_pool_size, "join_delay": join_delay or self.settings.default_join_delay, } def decorator(f): nonlocal name proc = Processor(stream=stream, f=f, name=name or f.__name__, **kwargs) logger.debug("found-processor", name=proc.name) if proc.id in self.processors: raise Misconfigured("Processor name must be unique within an App and Stream") self.processors[proc.id] = proc return proc return decorator
def _task(self, func): def decorator(f): self.tasks.add(f) return func return decorator(func)
[docs] def task(self, func=None, *, on_leader=False): """ Define an async function to run at worker startup. Parameters ---------- on_leader : bool Whether to run the function only on one worker: the elected leader. Examples -------- >>> @app.task >>> async def on_startup(): ... print("starting") If you want the task to run on only one worker: >>> @app.task(on_leader=True) >>> async def on_startup(): ... print(f"running once") """ def decorator(f): async def wrapper(worker): if not on_leader or worker.is_leader: logger.info("running-task", name=f.__name__) await f() logger.debug("found-task", name=f.__name__) return self._task(wrapper) return decorator(func) if func is not None else decorator
[docs] def timer(self, *, interval: int, on_leader=False): """ Define an async function to be run at periodic intervals. Parameters ----------- interval : float (seconds) How often the function executes in seconds. on_leader : bool Whether to run the function only on one worker: the elected leader. Examples -------- >>> @app.timer(interval=10) >>> async def every_10_seconds(): ... print("10 seconds passed") If you want the task to run on only one worker: >>> app.timer(interval=5, on_leader=True) >>> async def every_5_seconds(): ... print("5 seconds passed on the leader") """ def decorator(f): async def timer_spawner(worker) -> None: async with anyio.create_task_group() as tg: logger.debug("background-timer-task", name=f.__name__) while True: await anyio.sleep(interval) if not on_leader or worker.is_leader: logger.debug("spawning-timer-task", name=f.__name__) await tg.spawn(f) logger.debug("found-timer", name=f.__name__) return self._task(timer_spawner) return decorator
[docs] def crontab(self, spec: str, *, timezone=None, on_leader=False): """ Define an async function to be run at the fixed times, defined by the Cron format (see `<https://crontab.guru/>`_ for examples). Parameters ---------- spec : str The Cron spec defining fixed times to run the decorated function. timezone : tzinfo The timezone to be taken into account for the Cron jobs. If not set value from :attr:`runnel.settings.Settings.timezone` will be taken. on_leader : bool Whether to run the function only on one worker: the elected leader. Examples -------- >>> app.crontab("45 17 * * *") >>> async def every_5_45_pm(): ... print("It is 5:45pm UTC") If you want the task to run on only one worker: >>> app.crontab("45 17 * * *", on_leader=True) >>> async def every_5_45_pm(): ... print("It is 5:45pm UTC on the leader") With a timezone specification: >>> @app.crontab("45 17 * * *", timezone=pytz.timezone('GMT')) >>> async def every_5_45_pm(): ... print("It is 5:45pm in London") """ def decorator(f): async def cron_spawner(worker) -> None: _tz = self.settings.timezone if timezone is None else timezone async with anyio.create_task_group() as tg: logger.debug("background-cron-task", name=f.__name__) while True: await anyio.sleep(seconds_until(spec, _tz)) if not on_leader or worker.is_leader: logger.debug("spawning-cron-task", name=f.__name__) await tg.spawn(f) logger.debug("found-cron", name=f.__name__) return self._task(cron_spawner) return decorator