Last updated:
0 purchases
natspyworker 0.0.9
NATS Worker
An opinionated utility for using NATS as a worker queue with NATS.py.
Example
# pip install nats-py-worker
from nats_worker import Worker
from nats.js.api import StreamConfig, KeyValueConfig
worker = Worker(name="demo")
async def init_nats(connection):
connection.jetstream().add_stream(new StreamConfig(name="events", subjects=["inbox.*", "outbox.*"]))
connection.jetstream().create_key_value(config=KeyValueConfig(bucket="state"))
@worker.background_consumer(subject="inbox.*")
async def watch_incoming(msg):
...
@worker.background_state_machine(subject="inbox.*", bucket="state")
async def next_state(kv, msg):
...
await worker.publish_msg(subject=outbox, payload=side_effect)
...
return new_state
if __name__ == '__main__':
worker.start_as_app()
Installing
pip install nats-py-worker
Development
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
The gitlab-ci.yml is currently setup to push to PyPi when merged to main. Make sure you have updated the version other wise it will fail.
License
Unless otherwise noted, the NATS Worker source files are distributed under
the MIT license found in the LICENSE file.
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.