brokkoly 0.3.1
Brokkoly is a framework for enqueuing messages via HTTP request for celery.
CHANGELOG
Example
tasks.py:
import brokkoly
b = brokkoly.Brokkoly('example', 'redis://localhost:6379/0')
celery = b.celery
def two_times(text: str) -> dict:
return {
'text': text * 2
}
@b.task(two_times)
def echo(text: str) -> None:
print(text)
two_times works as pre processor. It works before enqueing. It means it can return BadRequest to your client. Brokkoly validate message with typehint. Also you can have extra validation and any other process here.
Run celery -A tasks worker --loglevel=info
producer.py:
import brokkoly
import tasks # NOQA
application = brokkoly.producer()
producer is WSGI application. You need to import your tasks for put message into queue.
Run with uWSGI uwsgi --http :8080 --wsgi-file producer.py --enable-threads --thunder-lock --master
Send Test Message!
curl -X POST -d '{"message":{"text": "Hello"}}' http://localhost:8080/example/echo
producer receives your request
producer validates your message having text and the type is str or not. text`(str) is from typehint of two_times
producer validates two_times returned value having text and the type is str or not. text`(str) is from typehint of echo
producer put message {"message":{"text":"HelloHello"}} into queue.
curl receives response.
Celery calls echo
Extra
preprocessor is optional. if you don’t need it, you can:
@b.task()
def echo(text: str) -> None:
print(text)
Also you can give multiple preprocessor:
@b.task(two_times, two_times)
def echo(text: str) -> None:
print(text)
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.