aiopubsub 3.0.0

Creator: railscoderz

Last updated:

Add to Cart

Description:

aiopubsub 3.0.0

Simple publish-subscribe pattern for asyncio applications.

Why
When building big applications, separation of concerns is a great way to keep things manageable.
In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data
consumers. We went a step ahead and designed even the internals of our applications around this pattern.
We explain our thinking and the workings of aiopubsub in detail in our article
Design your app using the pub-sub pattern with aiopubsub.
We recommend reading it before using aiopubsub in your project.


Installation
aiopubsub is only compatible with Python 3.8 and higher. There are no plans to support older versions.
aiopubsub is available on PyPI and you can install it with:
pip install aiopubsub
or
poetry add aiopubsub


How to use it
The following comprehensive example is explained step-by-step
in our article
“Design your app using the pub-sub pattern with aiopubsub”.
import asyncio
import dataclasses
import decimal

import aiopubsub


@dataclasses.dataclass
class Trade:
timestamp: float
quantity: int
price: decimal.Decimal


async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key}.')


async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key} that happened in NYSE')


async def main():
# create an aiopubsub hub
hub = aiopubsub.Hub()

# create a sample of data to send
trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))

# subscriber listens on every trade and calls the `on_trade` function
subscriber = aiopubsub.Subscriber(hub, 'trades')
subscribe_key = aiopubsub.Key('*', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_trade)

# publisher has a NASDAQ prefix and sends the trade that happened on Google stock
publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
publish_key = aiopubsub.Key('trade', 'GOOGL')
publisher.publish(publish_key, trade)

# sleep so the event loop can process the action
await asyncio.sleep(0.001)

# expected output:
# Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43')) with key = ('NASDAQ', 'trade', 'GOOGL').

# sample from another stock exchange
trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))

# subscribe only for the NYSE exchange
subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)

# publish NYSE trade
publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)

# sleep so the event loop can process the action
await asyncio.sleep(0.001)

# expected output:
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL').
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE

# clean the subscriber before the end of the program
await subscriber.remove_all_listeners()

if __name__ == '__main__':
asyncio.run(main())
Aiopubsub will use logwood if it is installed, otherwise it will default
to the standard logging module. Note that logwood is required to run tests.


Architecture
Hub accepts messages from Publishers and routes them to Subscribers. Each message is routed by its
Key - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys,
where any part of the key may be replaced replaced with a * (star).

addedSubscriber and removedSubscriber messages
When a new subscriber is added the Hub sends this message
{
"key": ("key", "of", "added", "subscriber"),
"currentSubscriberCount": 2
}
under the key ('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber') (the part after addedSubscriber
is made of the subscribed key). Note the currentSubscriberCount field indicating how many subscribers are currently
subscribed.
When a subscriber is removed a message in the same format is sent, but this time under the key
('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber').



Contributing
Pull requests are welcome! In particular, we are aware that the documentation could be improved.
If anything about aiopubsub is unclear, please feel free to
simply open an issue and we will do our best
to advise and explain 🙂



fastenum was made by Quantlane, a systematic trading firm.
We design, build and run our own stock trading platform.

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.