Source code for runnel.settings

from datetime import tzinfo
from typing import Union

import pytz
from pydantic import BaseSettings, PyObject

from runnel.constants import ExceptionPolicy


[docs]class Settings(BaseSettings): """ This class should not be used directly, but acts as a container for application settings, configured via environment variables (e.g. `RUNNEL_LOG_LEVEL=debug`) or as kwargs to the App object. Many of the settings represent defaults for processors or streams and can be overridden at object initialisation time. Parameters ---------- redis_url : str The URL to pass to aredis.StrictRedis.from_url. Default: ``"127.0.0.1:6379"``. log_format : str Either ``"json"`` or ``"console"`` to specify the log output format. log_level : str The minimum log level to display: one of ``"debug"``, ``"info"``, ``"warning"`` is recommended. autodiscover : str The pattern for :func:`pathlib.Path.glob` to find modules containing Runnel app-decorated functions (e.g. processors, tasks), which the worker must import on startup. Will be called relative to current working directory. For example, use ``'myproj/**/streams.py'`` to find all modules called 'streams' inside the 'myproj' folder. Default ``None``. timezone : tzinfo The timezone to use by default for ``app.crontab`` schedules. Default: ``pytz.UTC``. leadership_poll_interval : int (milliseconds) How long to sleep between worker leadership election attempts. Default: ``4000`` testing : bool Whether we are currently running the test suite. default_serializer : Serializer An object implementing the Serializer interface which controls how records are stored in the Redis streams. Default: ``JSONSerialzer`` unless ``orjson`` is installed (use the ``runnel[fast]`` bundle to install automatically), in which case ``FastJSONSerializer``. default_partition_count : int How many partitions to create. Default: ``16``. default_partition_size : int The max length of each partition. (Implemented approximately via Redis' MAXLEN option to XACK.) Represents the size of the buffer in case processors are offline or cannot keep up with the event rate. Default: ``50_000``. default_hasher: Callable[Any, int] A function used to hash a record's partition key to decide which partition to send it to. Defaults to md5 unless `xxhash` is installed (use the `runnel[fast]` bundle to install automatically) default_exception_policy : ExceptionPolicy How to handle exceptions raised in the user-provided processor coroutine. * ``HALT``: Raise the exception, halting execution of the affected partition. * ``QUARANTINE``: Mark the affected partition as poisoned, and continue with others. * ``IGNORE``: Suppress the exception and continue processing regardless. Default: ``HALT``. default_lock_expiry : int (seconds) The duration of the lock on stream partitions owned by executors of this processor. This controls the worst case lag a partition's events may experience since other executors will have to wait acquire the lock in case the owner has died. Default: ``120``. default_read_timeout : int (milliseconds) How long to stay blocked reading from Redis via XREADGROUP. Should be smaller than the ``grace_period`` given to processors. Default: ``2000``. default_prefetch_count : int The maximum number of events to read from Redis per partition owned by an executor. (If a single executor owns all 16 partitions in a stream and prefetch_count is 10, then 160 events may be read at once.) Purely an optimisation. Default: ``8``. default_assignment_attempts : int How many times to try to complete a rebalance operation (i.e. acquire our declared partitions) before giving up. Default: ``32``. default_assignment_sleep : float (seconds) How long to wait between attempts to complete a rebalance operation. Default: ``2``. default_grace_period : float (seconds) How long to wait for execution to complete gracefully before cancelling it. Default: ``8``. default_pool_size : int How many concurrent connections to make to Redis to read events. Default: ``16``. default_join_delay : int (seconds) How long to wait after joining before attempting to acquire partitions. Intended to mitigate a thundering herd problem of multiple workers joining simultaneously and needing to rebalance multiple times. Default: ``2``. """ class Config: env_prefix = "RUNNEL_" case_insensitive = True redis_url: str = "redis://127.0.0.1:6379" log_format: str = "console" log_level: str = "info" autodiscover: str = None timezone: tzinfo = pytz.UTC leadership_poll_interval: int = 4000 testing: bool = False # Stream defaults default_partition_count: int = 16 default_partition_size = 50_000 default_serializer: Union[PyObject, str] = "runnel.serialization.default" default_hasher: Union[PyObject, str] = "runnel.hashing.default" # Processor defaults default_exception_policy: ExceptionPolicy = ExceptionPolicy.HALT default_lock_expiry: int = 60 * 2 default_read_timeout: int = 2000 default_prefetch_count: int = 8 default_assignment_attempts: int = 32 default_assignment_sleep: float = 2 default_grace_period: float = 8 default_pool_size: int = 16 default_join_delay: int = 2