amqp-mock 0.6.2

Creator: bradpython12

Last updated:

Add to Cart

Description:

amqpmock 0.6.2

AMQP Mock





Installation
Overview

Test Publishing
Test Consuming


Mock Server

Start server
Publish message
Get queue message history
Get exchange messages
Delete exchange messages
Reset



Installation
pip3 install amqp-mock

Overview
Test Publishing
from amqp_mock import create_amqp_mock

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")

# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]

Full code available here: ./examples/publish_example.py
Test Consuming
from amqp_mock import create_amqp_mock, Message, MessageStatus

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))

# 3. Consume message via "system under test"
consume_message("queue")

# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED

Full code available here: ./examples/consume_example.py
Mock Server
Start server
import asyncio

from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock


async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()

asyncio.run(run())

or via docker
docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock

Publish message
POST /queues/{queue}/messages
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}

HTTP

$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json



Python

from amqp_mock import AmqpMockClient, Message

mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)



Get queue message history
GET /queues/{queue}/messages/history
HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8

[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]



Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]



Get exchange messages
GET /exchanges/{exchange}/messages
HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8

[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]



Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
# <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]



Delete exchange messages
DELETE /exchanges/{exchange}/messages
HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json



Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")



Reset
DELETE /
HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json



Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.reset()

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.