from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
AsyncIterator,
List,
Optional,
Set,
Tuple,
Union,
)
import anyio
import structlog
from aiostream.aiter_utils import aitercontext
from runnel.exceptions import Misconfigured
from runnel.middleware import Ack, Deserialize, Take
if TYPE_CHECKING:
from runnel.runner import Runner
from runnel.types import Partition, Event
logger = structlog.get_logger(__name__)
[docs]@dataclass
class Events:
"""
An async generator which yields Events (or batches of them). This is the object
passed to user-defined processor functions.
Examples
--------
>>> from runnel import App, Record
...
>>> app = App(name="example")
...
>>> class Order(Record):
... order_id: int
... amount: int
...
>>> orders = app.stream(name="orders", record=Order, partition_by="order_id")
...
>>> @app.processor(orders)
... async def printer(events):
... async for order in events.records():
... print(order.amount)
"""
runner: "Runner"
partition: "Partition"
want: str = "events"
batch_args: Optional[Tuple] = None
failed: Set["Event"] = field(default_factory=set)
finalized: anyio.abc.Event = field(default_factory=anyio.create_event)
agen: AsyncIterator[Union["Event", List["Event"]]] = None
@property
def executor(self):
return self.runner.executor
@property
def stream(self):
return self.executor.stream
def __call__(self):
return self
[docs] def take(self, n, within):
"""
Configure the events generator to yield batches of `n` events (unless `within`
seconds pass before `n` are ready, in which case yield all pending events).
Parameters
----------
n : int
The desired batch size.
within : int (seconds)
The duration to wait for the batch size to be reached before yielding.
Examples
--------
>>> @app.processor(orders)
... async def printer(events):
... async for orders in events.take(10, within=1).records():
... # Handle orders as an atomic batch!
... assert 1 <= len(orders) <= 10
... print(orders)
Notes
-----
This method is provided for efficiency. It is intended to be used where batch
processing of events greatly increases your processing speed. For example, if
you are loading records into a database, you may want to use its bulk import API
to ingest a batch of records at a time.
Warning
-------
Runnel acks events after every iteration through the event generator loop. When
using `take`, this means the entire batch will be acked at once. As a result,
you must process the batch as a single unit atomically. If you iterate over the
events in a batch one-at-a-time and you fail half-way through, then the entire
batch will be considered failed (and handled according to your
:attr:`runnel.constants.ExceptionPolicy`). This will lead to duplicate
processing if the batch is retried, or dropped events if the batch is ignored.
"""
if within > self.executor.processor.grace_period:
raise Misconfigured("Cannot wait longer than grace_period for a batch")
self.batch_args = (n, within)
return self
[docs] def records(self):
"""
Configure the events generator to deserialize events into Record objects.
Examples
--------
>>> from runnel import Record
...
>>> @app.processor(orders)
... async def printer(events):
... async for order in events.records():
... assert isinstance(event, Record)
... print(order.amount)
If this method is omitted, you will iterate over the low-level :class:`runnel.Event`
objects, which gives you access to the raw data as ``Dict[bytes, bytes]``.
>>> from runnel import Event
...
>>> @app.processor(orders)
... async def printer(events):
... async for event in events:
... assert isinstance(event, Event)
... print(event.data)
"""
self.want = "records"
return self
def __aiter__(self):
self.agen = self.iter()
return self.agen
async def aclose(self):
assert self.agen, "Cannot close an event generator that is not running"
await self.agen.aclose()
async def iter(self):
async with self.running():
# Common kwargs to all middleware handlers.
kwargs = {"events": self}
async with AsyncExitStack() as stack:
enter = stack.enter_async_context
agen = await enter(aitercontext(self.root()))
# Construct the middleware pipeline.
if self.batch_args:
agen = await enter(aitercontext(Take(*self.batch_args).handler(agen, **kwargs)))
# User-provided middleware, which must handle and yield either single
# events or a batch.
for m in self.executor.processor.middleware:
agen = await enter(aitercontext(m.handler(agen, **kwargs)))
# Acknowledgement handling.
agen = await enter(aitercontext(Ack().handler(agen, **kwargs)))
# Automatic deserialisation.
if self.want == "records":
agen = await enter(aitercontext(Deserialize().handler(agen, **kwargs)))
# It begins.
async for x in agen:
try:
yield x
except GeneratorExit:
logger.warning("events-iter-exit")
return
async def root(self):
queue = self.runner.partitions[self.partition]
timeout = max(0.05, min(1, self.executor.processor.grace_period / 4))
while self.partition in self.executor.safe_partitions:
event = None
async with anyio.move_on_after(timeout):
event = await queue.get()
if event:
yield event
@asynccontextmanager
async def running(self):
logger.debug("events-started")
assert not self.finalized.is_set()
try:
yield
finally:
await self.finalized.set()
logger.debug("events-finally-ended")