pgqueuer 0.8.5

Creator: railscoderz

Last updated:

Add to Cart

Description:

pgqueuer 0.8.5

Readme
🚀 pgqueuer - Building Smoother Workflows One Queue at a Time 🚀





📚 Documentation: Explore the Docs 📖
🔍 Source Code: View on GitHub 💾
💬 Join the Discussion: Discord Community

pgqueuer
pgqueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, pgqueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
Features

Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
Efficient Concurrency Handling: Utilizes PostgreSQL's FOR UPDATE SKIP LOCKED for reliable and concurrent job processing.
Real-time Notifications: Leverages LISTEN and NOTIFY for real-time updates on job status changes.

Installation
To install pgqueuer, simply install with pip the following command:
pip install pgqueuer

Example Usage
Here's how you can use pgqueuer in a typical scenario processing incoming data messages:
Write and run a consumer
Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run will setup signals to
ensure this.
from __future__ import annotations

import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager


async def main() -> QueueManager:
connection = await asyncpg.connect(dsn())
driver = AsyncpgDriver(connection)
qm = QueueManager(driver)

# Setup the 'fetch' entrypoint
@qm.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job}")

return qm

python3 -m pgqueuer run tools.consumer.main

Write and run a producer
Start a short-lived producer that will enqueue 10,000 jobs.
from __future__ import annotations

import asyncio
import sys

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries


async def main(N: int) -> None:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
["fetch"] * N,
[f"this is from me: {n}".encode() for n in range(1, N+1)],
[0] * N,
)


if __name__ == "__main__":
print(sys.argv)
N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
asyncio.run(main(N))

python3 tools/producer.py 10000

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.