Source code for runnel.interfaces

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, AsyncIterator, List, Union

if TYPE_CHECKING:
    from runnel.types import Event


[docs]class Compressor: """ Implement this interface to customise event data compression. Examples -------- >>> import gzip ... >>> class Gzip(Compressor): ... def compress(self, value): ... return gzip.compress(value) ... ... def decompress(self, value): ... return gzip.decompress(value) Then configure your serializer to use it: >>> from runnel import App, Record, JSONSerializer ... >>> app = App(name="example") ... >>> class Order(Record): ... order_id: int ... amount: int ... >>> orders = app.stream( ... name="orders", ... record=Order, ... partition_by="order_id", ... serializer=JSONSerializer(compressor=Gzip()), ... ) """
[docs] def compress(self, value: bytes) -> bytes: """Return compressed bytes.""" raise NotImplementedError
[docs] def decompress(self, value: bytes) -> bytes: """Return decompressed bytes.""" raise NotImplementedError
[docs]@dataclass(frozen=True) class Serializer: """ Implement this interface to customise event data serialization. Examples -------- >>> import orjson # A fast JSON library written in Rust. ... >>> @dataclass(frozen=True) >>> class FastJSONSerializer(Serializer): >>> compressor = None ... ... def dumps(self, value): ... return orjson.dumps(value) ... ... def loads(self, value): ... return orjson.loads(value) Then pass it to your stream: >>> from runnel import App, Record ... >>> app = App(name="example") ... >>> class Order(Record): ... order_id: int ... amount: int ... >>> orders = app.stream( ... name="orders", ... record=Order, ... partition_by="order_id", ... serializer=FastJSONSerializer(), ... ) """ compressor: Compressor = None
[docs] def dumps(self, value: Any) -> bytes: """Return serialized bytes.""" raise NotImplementedError
[docs] def loads(self, value: bytes) -> Any: """Return deserialized Python object.""" raise NotImplementedError
[docs]class Middleware: """ Middleware are objects with a `handler` method (decribed below). Processors accept a list of user-provided middleware. The handler methods form a processing pipeline which runs over events before they are yielded to the final processor function. Each handler is passed the previous handler in the pipeline (called 'parent'). Examples -------- >>> from runnel.interfaces import Middleware ... >>> class Printer(Middleware): ... async def handler(self, parent, **kwargs): ... async for x in parent: ... print(x) ... yield x It can then be passed to the processor. >>> @app.processor(mystream, middleware=[Printer()]) ... async def proc(events): ... async for record in events.records(): ... pass Notes ----- Some of Runnel's internal functionality is implemented using middleware. They can be found `here <https://github.com/mjwestcott/runnel/tree/master/runnel/middleware>`_. """
[docs] async def handler(self, parent, **kwargs) -> AsyncIterator[Union["Event", List["Event"]]]: """ A middleware handler is an async generator. Given a parent generator that yields Events or batches of Events, it must yield the same. For example: >>> async for x in parent: ... assert isinstance(x, (Event, list)) ... yield x Post-processing logic can be added below, in a finally clause. This allows you to respond to errors further up the chain (e.g. in the final processor code): >>> async for x in parent: ... try: ... yield x ... finally: ... await logic() This is needed because a GeneratorExit exception will be thrown into the yield point if an exception is raised in the calling code. (Also note that async generator finalization is scheduled by the Python runtime asynchronously, but the Runnel framework ensures it has finished before restarting a processor function.) If you only want to know if an exception was raised further up the chain, you can use the following: >>> async for x in parent: ... try: ... yield x ... except GeneratorExit: ... await cleanup() ... raise (Note: Python requires that GeneratorExit it not ignored, so it must be reraised here to avoid a RuntimeError) """ raise NotImplementedError