Last updated:
0 purchases
PikaBus 1.3.7
The PikaBus library is a wrapper around pika
to make it easy to implement the messages, events and command pattern, as described in detail here:
https://pikabus.readthedocs.io/en/latest/guidelines_amqp.html
Features
Secure messaging with amqp enabled by default, which includes:
Durable and mirrored queues on all nodes.
Persistent messages, meaning no messages are lost after a node restart.
Delivery confirms with RabbitMq publisher confirms.
Mandatory delivery turned on by default to guarantee at least once delivery.
Object oriented API with short and easy-to-use interface.
Fault-tolerant with auto-reconnect retry logic and state recovery.
Installation
pip install PikaBus
Example
import pika
import datetime
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup
def MessageHandlerMethod(**kwargs):
"""
A message handler method may simply be a method with som **kwargs.
The **kwargs will be given all incoming pipeline data, the bus and the incoming payload.
"""
data: dict = kwargs['data']
bus: AbstractPikaBus = kwargs['bus']
payload: dict = kwargs['payload']
print(payload)
if payload['reply']:
payload['reply'] = False
bus.Reply(payload=payload)
# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials)
# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaBusSetup = PikaBusSetup(connParams,
defaultListenerQueue='myQueue',
defaultSubscriptions='myTopic')
pikaBusSetup.AddMessageHandler(MessageHandlerMethod)
# Start consuming messages from the queue.
pikaBusSetup.StartConsumers()
# Create a temporary bus to subscribe on topics and send, defer or publish messages.
bus = pikaBusSetup.CreateBus()
bus.Subscribe('myTopic')
payload = {'hello': 'world!', 'reply': True}
# To send a message means sending a message explicitly to one receiver.
bus.Send(payload=payload, queue='myQueue')
# To defer a message means sending a message explicitly to one receiver with some delay before it is processed.
bus.Defer(payload=payload, delay=datetime.timedelta(seconds=1), queue='myQueue')
# To publish a message means publishing a message on a topic received by any subscribers of the topic.
bus.Publish(payload=payload, topic='myTopic')
input('Hit enter to stop all consuming channels \n\n')
pikaBusSetup.StopConsumers()
Quick Start
Clone PikaBus repo:
git clone https://github.com/hansehe/PikaBus.git
Start local RabbitMq instance with Docker:
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=amqp -e RABBITMQ_DEFAULT_PASS=amqp -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Open RabbitMq admin (user=amqp, password=amqp) at:
http://localhost:15672/
Then, run the example:
pip install PikaBus
python ./Examples/basic_example.py
Try restarting RabbitMq to notice how PikaBus tolerates downtime:
docker stop rabbit
docker start rabbit
Send or publish more messages to the running PikaBus consumer with:
python ./Examples/send_example.py
python ./Examples/publish_example.py
Contribute
Issue Tracker: https://github.com/hansehe/PikaBus/issues
Source Code: https://github.com/hansehe/PikaBus
License
The project is licensed under the MIT license.
Versioning
This software follows Semantic Versioning
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.