runnel.cli.worker(app: str, processors: str = 'all')[source]

Run a worker for all processors of the given app, or just the processors given in a comma-separated string.


Assuming ‘myapp/’ contains your Runnel app object:

$ runnel worker myapp.example:myapp

Or for specific processors:

$ runnel worker myapp.example:myapp --processors=myproc1,myproc2
runnel.cli.send(stream: str, value: str)[source]

Send a given JSON-encoded value to a stream.


Assuming ‘myapp/’ contains a stream called ‘actions’:

$ runnel send myapp.example:actions "{\"user_id\": 1, \"type\": \"signup\"}"
runnel.cli.sendmany(stream: str, file: typer.models.FileText)[source]

Send multiple JSON-encoded values to a stream in a pipelined transaction.


$ echo "{\"user_id\": 1, \"type\": \"signup\"}" >> data.jsonl
$ echo "{\"user_id\": 2, \"type\": \"signup\"}" >> data.jsonl

Assuming ‘myapp/’ contains a stream called ‘actions’:

$ runnel sendmany myapp.example:actions data.jsonl
runnel.cli.reset(name: str, start: str = '0')[source]

Reset the processor’s starting ID and event backlog.


This will destroy and recreate the Redis consumer group(s) associated with the processor and should be run once all workers have been shut down.


Assuming ‘myapp/’ contains a processor called ‘printer’:

runnel processor reset myapp.example:printer --start="0"