Last updated:
0 purchases
postage 1.2.1
Postage - a Python library for AMQP-based network components============================================================|Build Status| |Version| |PyPi Downloads|Postage is a Python library which leverages`pika <https://github.com/pika/pika>`__ and AMQP (through a broker like`RabbitMQ <http://www.rabbitmq.com/>`__) to build network-aware softwarecomponents.Through **pika** you can add to any Python program the capability ofsending and receiving messages using AMQP. For example you can listen orcommunicate with other programs through a RabbitMQ cluster (Thereference AMQP broker in this documentation is RabbitMQ).Postage is a layer built on pika, and aims to simplify theimplementation of the messaging part in your Python programs, hiding (asmuch as possible) the AMQP details. it provides the following structuresand concepts:- **Fingerprint**: an automatic network fingerprint for an application, which contains useful data to uniquely identify your program on the cluster.- **Message encoding** implemented in a stand-alone class which can easily be replaced by one of your choice. Default encoding is JSON.- A **message** implementation based on a plain Python dictionary (thus usable even without Postage). Messages can be of three types: **command**, **status** or **result**, representing the actions of asking something (command), communicating something (status) or answering a request (result). Command messages can be fire-and-forget or RPC. Result messages can further transport a success, an error, or a Python exception.- **Exchanges** can be declared and customized inheriting a dedicated class.- A generic message **producer** class: it simplifies the definition of a set of messages an exchange accepts, which helps in defining a network API of your component.- A generic message consumer, or **processor**, that implements a powerful handlers mechanism to define which incoming messages a component is interested in and how it shall answer.About microthreads==================Postage leverages a microthread library to run network components. Thecurrent implementation is very simple and largely underused, due to theblocking nature of the pika adapter being used. Future plans include areplacement with a more powerful library. This implementation is a goodstarting point if you want to understand generator-based microthreadsbut do not expect more. You can read this series of articles`here <http://lgiordani.github.io/blog/2013/03/25/python-generators-from-iterators-to-cooperative-multitasking/>`__to begin digging in the matter.About versioning================This is Postage version 1.1.0.This library is versioned with a A.B.C schema ( **A**\ PI, **B**\ OOST,**C**\ OMPLAINT ).- Any change in the COMPLAINT number is a bugfix or even a typo correction in the documentation; it is transparent to running systems (except that hopefully *that nasty bug* is no more there).- Any change in the BOOST number is an API addition. It is transparent to running systems, but you should check the changelog to check what's new, perhaps *that impossible thing* is now easy as pie.- Any change in the API number has to be taken very seriously. Sorry but for some reason the API changed, so your running code will no more work.So update to 1.0.x without hesitation, await the full-of-features 1.1.0release and beware of the frightening version 2.0.0 that will crash yoursystems! =)[The code contained in the *master* branch on GitHub before the PyPIrelease was marked with version 3.0.x. Indeed that is the real versionof the package but since previous versions were not released I wanted tobe a good releaser and start from version 1]License=======This package, Postage, a Python library for AMQP-based networkcomponents, is licensed under the terms of the GNU General PublicLicense Version 2 or later (the "GPL"). For the GPL 2 please seeLICENSE-GPL-2.0.Contributing============Any form of contribution is highly welcome, from typos corrections tocode patches. Feel free to clone the project and send pull requests.Quick start===========You can find the source code for the following examples in the``demos/`` directory.A basic echo server-------------------Let's implement a basic echo server made of two programs. The first sitsdown and waits for incoming messages with the ``'echo'`` key, the secondsends one message each time it is run.Be sure to have a running RabbitMQ system configured with a ``/``virtual host and a ``guest:guest`` user/password.The file ``echo_shared.py`` contains the definition of the exchange inuse.. code:: python from postage import messaging class EchoExchange(messaging.Exchange): name = "echo-exchange" exchange_type = "direct" passive = False durable = True auto_delete = FalseThe class attributes are the standard paramenters of AMQP exchanges, see``exchange_declare()`` in Pika`documentation <https://pika.readthedocs.org/en/0.9.13/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.exchange_declare>`__.The file ``echo_send.py``\ defines a message producer and uses it tosend a message.. code:: python from postage import messaging import echo_shared class EchoProducer(messaging.GenericProducer): eks = [(echo_shared.EchoExchange, 'echo-rk')] producer = EchoProducer() producer.message_echo("A test message")The producer has two goals: the first is to **define the standardexchange and routing key used to send the messages**, which prevents youfrom specifying both each time you send a message. The second goal is to**host functions that build messages**; this is an advanced topic, so itis discussed later.In this simple case the producer does all the work behind the curtainand you just need to call ``message_echo()`` providing it as manyparameters as you want. The producer creates a command message named``'echo'``, packs all ``*args`` and ``**kwds`` you pass to the``message_echo()`` method inside it, and sends it through the AMQPnetwork.The file ``echo_receive.py`` defines a message processor that catchesincoming command messages named ``'echo'`` and prints their payload... code:: python from postage import microthreads from postage import messaging import echo_shared class EchoReceiveProcessor(messaging.MessageProcessor): @messaging.MessageHandler('command', 'echo') def msg_echo(self, content): print content['parameters'] eqk = [(echo_shared.EchoExchange, [('echo-queue', 'echo-rk'), ])] scheduler = microthreads.MicroScheduler() scheduler.add_microthread(EchoReceiveProcessor({}, eqk, None, None)) for i in scheduler.main(): passThe catching method is arbitrarily called ``msg_echo()`` and decoratedwith ``MessageHandler``, whose parameters are the type of the message(``command``, that means we are instructing a component to do somethingfor us), and its name (``echo``, automatically set by calling the``message_echo()`` method). The ``msg_echo()`` method must accept oneparameter, besides ``self``, that is the content of the message. Thecontent is not the entire message, but a dictionary containing only thepayload; in this case, for a generic ``command`` message, the payload isa dictionary containing only the ``parameters`` key, that isSeems overkill? Indeed, for such a simple application, it is. Thefollowing examples will hopefully show how those structures heavilysimplify complex tasks.To run the example just open two shells, execute``python echo_receive.py`` in the first one and ``python echo_send.py``in the second. If you get a``pika.exceptions.ProbableAuthenticationError`` exception please checkthe configuration of the RabbitMQ server; you need to have a ``/``virtual host and the ``guest`` user shall be active with password``guest``.An advanced echo server-----------------------Let's add a couple of features to our basic echo server example. Firstof all we want to get information about who is sending the message. Thisis an easy task for Fingerprint objects.. code:: python from postage import messaging import echo_shared class EchoProducer(messaging.GenericProducer): eks = [(echo_shared.EchoExchange, 'echo-rk')] fingerprint = messaging.Fingerprint('echo_send', 'application').as_dict() producer = EchoProducer(fingerprint) producer.message_echo("A single test message") producer.message_echo("A fanout test message", _key='echo-fanout-rk')As you can see a Fingerprint just needs the name of the application(``echo_send``) and a categorization (``application``), andautomatically collect data such as the PID and the host. On receivingthe message you can decorate the receiving function with``MessageHandlerFullBody`` to access the fingerprint.. code:: python @messaging.MessageHandlerFullBody('command', 'echo') def msg_echo_fingerprint(self, body): print "Message fingerprint: %s", body['fingerprint']The second thing we are going to add is the ability to send fanoutmessages. When you connect to an exchange you can do it with a sharedqueue, i.e. a queue declared with the same name by all the receivers, orwith a private queue, that is a unique queue for each receiver. Thefirst setup leads to a round-robin consumer scenario, with the differentreceivers picking messages from the same queue in turn. The secondsetup, on the other hand, makes all the receivers get the same messagesimultaneously, acting like a fanout delivery.The file ``echo_shared.py`` does not change, since the Exchange has thesame difinition. In ``echo_receive.py`` we make the greatest number ofchanges:: from postage import microthreads from postage import messaging import echo_shared class EchoReceiveProcessor(messaging.MessageProcessor): def __init__(self, fingerprint): shared_queue = 'echo-queue' private_queue = 'echo-queue-{0}{1}'.format(fingerprint['pid'], fingerprint['host']) eqk = [ (echo_shared.EchoExchange, [ (shared_queue, 'echo-rk'), (private_queue, 'echo-fanout-rk') ]), ] super(EchoReceiveProcessor, self).__init__(fingerprint, eqk, None, None) @messaging.MessageHandler('command', 'echo') def msg_echo(self, content): print content['parameters'] @messaging.MessageHandlerFullBody('command', 'echo') def msg_echo_fingerprint(self, body): print "Message fingerprint: %s", body['fingerprint'] fingerprint = messaging.Fingerprint('echo_receive', 'controller').as_dict() scheduler = microthreads.MicroScheduler() scheduler.add_microthread(EchoReceiveProcessor(fingerprint)) for i in scheduler.main(): passAs you can see the ``EchoReceiveProcessor`` redefines the ``__init__()``method to allow passing just a Fingerprint; as a side-effect, ``eqk`` isnow defined inside the method, but its nature does not change. Itencompasses now two queues for the same exchange; the first queue ischared, given that every instance of the reveiver just names it``echo-queue``, while the second is private because the name changeswith the PID and the host of the current receiver, and those valuestogether are unique in the cluster.So we expect that sending messages with the ``echo`` key will result inhitting just one of the receivers at a time, in a round-robin fashion,while sending messages with the ``echo-fanout`` queue will reach everyreceiver.We defined two different functions to process the incoming ``echo``message, ``msg_echo()`` and ``msg_echo_fingerprint``; this shows thatmultiple functions can be set as handler for the same messages. In thissimple case the two functions could also be merged in a single one, butsometimes it is better to separate the code of differentfunctionalities, not to mention that the code could also be loaded atrun-time, through a plugin system or a live definition.An RPC echo server------------------The third version of the echo server shows how to implement RPCmessaging. As before the exchange does not change its signature, so``echo_shared.py`` remains the same. When sending the message we mustspecify the we want to send the RPC form using ``rpc_echo()`` instead of``message_echo()``.. code:: python from postage import messaging import echo_shared class EchoProducer(messaging.GenericProducer): eks = [(echo_shared.EchoExchange, 'echo-rk')] fingerprint = messaging.Fingerprint('echo_send', 'application').as_dict() producer = EchoProducer(fingerprint) reply = producer.rpc_echo("RPC test message") if reply: print reply.body['content']['value'] else: print "RPC failed"Remember that RPC calls are blocking, so your program will hang at theline ``reply = producer.rpc_echo("RPC test message")``, waiting for theserver to answer. Once the reply has been received, it can be tested andused as any other message; Postage RPC can return success, error orexception replies, and their content changes accordingly... code:: python from postage import microthreads from postage import messaging import echo_shared class EchoReceiveProcessor(messaging.MessageProcessor): def __init__(self, fingerprint): eqk = [ (echo_shared.EchoExchange, [ ('echo-queue', 'echo-rk'), ]), ] super(EchoReceiveProcessor, self).__init__(fingerprint, eqk, None, None) @messaging.RpcHandler('command', 'echo') def msg_echo(self, content, reply_func): print content['parameters'] reply_func(messaging.MessageResult("RPC message received")) fingerprint = messaging.Fingerprint('echo_receive', 'controller').as_dict() scheduler = microthreads.MicroScheduler() scheduler.add_microthread(EchoReceiveProcessor(fingerprint)) for i in scheduler.main(): passThe receiver does not change severely; you just need to change thehandler dadicated to the incoming ``echo`` message. The decorator is now``RpcHandler`` and the method must accept a third argument, that is thefunction that must be called to answer the incoming message. You have topass this function a suitable message, i.e. a ``MessageResult`` ifsuccessfull, other messages to signal an error or an exception. Pleasenote that after you called the reply function you can continue executingcode.API Documentation=================Here you find a description of the messaging part of Postage. BeingPostage based on AMQP, this help presumes you are familiar withstructures defined by this latter (exchanges, queues, bindings, virtualhosts, ...) and that you already have a working messaging system (forexample a RabbitMQ cluster).In the code and in the following text you will find the two terms"application" and "component" used with the same meaning: a Pythonexecutable which communicates with others using AMQP messages throughPostage. Due to the nature of AMQP you can have components written inseveral languages working together: here we assumer both producers andconsumers are written using Postage, but remember that you can makePostage components work with any other, as far as you stick to itsrepresentation of messages (more on that later).Environment variables---------------------Postage reads three environment variables, ``POSTAGE_VHOST``,``POSTAGE_USER``, and ``POSTAGE_PASSWORD``, which contain the RabbitMQvirtual host in use, the user name and the password. The default valuesfor them are ``/``, ``guest``, ``guest``, i.e. the default values youcan find in a bare RabbitMQ installation. Previous versions used``POSTAGE_RMQ_USER`` and ``POSTAGE_RMQ_PASSWORD``, which are stillsupported but deprecated.Using the environment variables, especially ``POSTAGE_VHOST``, you caneasily setup production and development environment and to switch youjust need to set the variable before executing your Python components.. code:: sh POSTAGE_VHOST=development mycomponent.pyYou obviously need to configure RabbitMQ according to your needs,declaring the virtual hosts you want.Setting up separate environment enables your components to exchangemessages without interfering with the production systems, thus avoidingyou to install a separate cluster to test software. The HUP acronym isused somewhere in the code to mean Host, User, Password, that is thetuple needed to connect to RabbitMQ plus the virtual host.A last environment variable, ``POSTAGE_DEBUG_MODE``, drives the debugoutput if set to ``true``. It is intended for Postage debugging useonly, since its output is pretty verbose.Fingerprint-----------When componentized system become large you need a good way to identifyyour components, so a simple ``Fingerprint`` object is provided toencompass useful values, which are:- ``name``: the name of the component or executable- ``type``: a rough plain categorization of the component- ``pid``: the OS pid of the component executable- ``host``: the host the component is running on- ``user``: the OS user running the component executable- ``vhost``: the RabbitMQ virtual host the component is running onThis object is mainly used to simplify the management of all thosevalues, and to allow writing compact code. Since Postage messages aredictionaries (see below) the object provides a ``as_dict()`` method toreturn its dictionary form, along with a ``as_tuple()`` method toprovide the tuple form.You can use any class to encompass the values you need to identify yourcomponents: Postage ALWAYS uses the dictionary form of fingerprints, soyou need a way to give a meaningful dictionary representation of yourclass of choice.Obviously to uniquely identify a component on a network you need justhost and pid values, but a more complete set of values can greatlysimplify management.Fingerprint objects can automatically retrieve all values from the OS,needing only the name and type values; if not passed those are ``None``... code:: python fingerprint = Fingerprint(name="mycomponent") print fingerprint.as_dict()Encoder-------Postage messages are Python dictionaries serialized in JSON. The``JsonEncoder`` object provides the ``encode()`` and ``decode()``methods and the correct type ``application/json``. Encoder class can beeasly replaced in your components, provided that it sticks to thisinterface.Messages--------To manage the different types of messages, appropriate objects have beendefined. The base object is ``Message``: it has a **type**, a **name**and a **category**. It can encompass a **fingerprint** and a**content**, which are both dictionaries.The type of the message is free, even if some have been already definedin Postage: **command**, **status**, and **result**. This categorizationallows the consumers to filter incoming messages according to the actionthey require.The category of the message is not free, and must be one of **message**and **rpc** (this nomenclature is somewhat misleading, since RPC aremessages just like the standard ones; future plans include a review ofit). The first type marks fire-and-forget messages, while the secondsignals RPC ones.The dictionary form of the message is the following:.. code:: python message = { 'type': message_type, 'name': message_name, 'category': message_category, 'version': '2', 'fingerprint': {...}, 'content': {...}, '_reserved': {...} }The ``content`` key contains the actual data you put in your message,and its structure is free.**Command** messages send a command to another component. The commandcan be a fire-and-forget one or an RPC call, according to the messagecategory; the former is implemented by the ``MessageCommand`` class,while the latter is implemented by ``RpcCommand``. Both classes need thename of the command and an optional dictionary of parameters, which areimposed by the actual command. The message fingerprint can be set withits ``fingerprint(**kwds)`` method... code:: python m = messaging.MessageCommand('sum', parameters={a=5, b=6}) f = Fingerprint(name='mycomponent') m.fingerprint(f.as_dict())**Status** messages bear the status of an application, along with theapplication fingerprint. The class which implements this type is``MessageStatus``. This object needs only a single parameter, which isthe status itself. Not that as long as the status is serializable, itcan be of any nature... code:: python m = messaging.MessageStatus('online')**Result** messages contain the result of an RPC call: three classeshave this type, ``MessageResult``, ``MessageResultError``,``MessageResultException``. The first is the result of a successfulcall, the second is the result of an error in a call, while the thirdsignals that an exception was raised by the remote component. This errorclassification has been inspired by Erlang error management, which Ifind a good solution. All three classes contain a **value** and a**message**, but for errors the value is ``None`` and for exceptions itis the name of the Python exception... code:: python try: result = some_operation() m = messaging.MessageResult(result) except Exception as exc: m = messaging.MessageResultException(exc.__class__.__name__, exc.__str__())Exchange--------The ``Exchange`` class allows to declare exchanges just by customizingthe class parameters. It provides a ``parameters`` class property thatgives a dictionary representation of the exchange itself, as required bythe ``exchange_declare()`` method of the AMQP channel.To declare your own exchange you just need to inherit ``Exchange``.. code:: python from postage import messaging class MyExchange(messaging.Exchange): name = "my-custom-exchange" exchange_type = "topic" passive = False durable = True auto_delete = FalseGenericProducer---------------When you use AMQP you are free to use any format for your messages andany protocol for sending and receiving data. Postage gives you apredefined, though extensible, message format, the ``Message`` object.Moreover, through ``GenericProducer``, it gives you a way to easilydefine an API, i.e. a set of shortcut functions that create and sendmessages, through which you can interact with your system.To better introduce the simplification implemented by``GenericProducer`` let us recap what a component shall do to send amessage using pika and the ``Message`` object.1. a ``Message`` object has to be declared and filled with the information we want to send, according to a given predefined format (the message API of our system). The message must contain the correct fingerprint and be encoded using the encoder of your choice (choice that must be shared by all other components in the system).2. A connection to the AMQP broker must be established, then all the target exchanges must be declared.3. For each exchange you want to receive the message you shall publish it giving the correct routing key for that exchange: the keys you can use are part of your messaging API, so you have to "document" them when you publish the specification for your exchanges.As you can see this can quickly lead to a bunch o repeated code, as theset of operation you need are often the same or very similar; moreover,it needs a source of documentation outside the code, that is, the APIdoes not document itself (here I mean: there is no way to get a grasp onthe set of messages you are defining in your API).Let us see how ``GenericProducer`` solves these issues. First of all youneed to define an exchange:.. code:: python class LoggingExchange(messaging.Exchange): name = logging-exchange" exchange_type = "direct" passive = False durable = True auto_delete = FalseThen you need to define a producer, i.e. an object that inherits from``GenericProducer``:.. code:: python class LoggingProducer(messaging.GenericProducer): passsince the aim of the producer is that of simplify sending messages to anexchange you can here specify a set of exchanges/key couples (EKs) whichwill be used by default (more on this later)... code:: python class LoggingProducer(messaging.GenericProducer): eks = [(LoggingExchange, 'log')]Now you have to define a function that builds a ``Message`` containingthe data you want to send.. code:: python class LoggingProducer(messaging.GenericProducer): eks = [(LoggingExchange, "log")] def build_message_status_online(self): return messaging.MessageStatus('online')This allows you to write the following code.. code:: python producer = LoggingProducer() producer.message_status_online()which will build a ``MessageStatus`` containing the ``'online'`` statusstring and will send it to the exchange named ``logging-exchange`` with``'log'`` as routing key.Magic methods~~~~~~~~~~~~~As you can see ``GenericProducer`` automatically defines a``message_name()`` method that wraps each of the``build_message_name()`` methods you defines. The same happens with RPCmessages, where the ``rpc_name()`` method is automatically created towrap ``build_rpc_name()``.``message_*()`` methods accept two special keyword arguments, namely***key\ **, ***\ eks**, that change the way the message is sent. Thebehaviour of the two keywords follows the following algorithm:1. Calling ``message_name()`` sends the message with the predefined ``eks``, i.e. those defined in the producer class. This means that the message is sent to each exchange listed in the ``eks`` list of the class, with the associated key.2. Calling ``message_name(_key='rk')`` sends the message to the first exchange in ``eks`` with the key ``rk``.3. Calling ``message_name(_eks=[(exchange1, rk1), (exchange2, rk2)])`` uses the specified eks instead of the content of the default ``eks`` variable; in this case sends the message to ``exchange1`` with routing key ``rk1`` and to ``exchange2`` with routing key ``rk2``.If you speficy both ``_eks`` and ``_key`` the latter will be ignored.This system allows you to specify a default behaviour when writing theproducer and to customize the routing key or even the exchange on thefly.RPC messages accept also ``_timeout`` (seconds), ``_max_retry`` and``_queue_only`` to customize the behaviour of the producer when waitingfor RPC answers (more on that later).Fingerprint~~~~~~~~~~~When a ``GenericProducer`` is instanced a ``Fingerprint`` in itsdictionary form can be passed as argument and this is included in eachmessage object the producer sends. If not given, a bare fingerprint iscreated inside the object... code:: python f = Fingerprint(name='mycomponent') producer = LoggingProducer(fingerprint=f.as_dict()) producer.message_status_online()Generic messages~~~~~~~~~~~~~~~~You can use a producer to send generic messages using the ``message()``method.. code:: python p = messaging.GenericProducer() p.message(1, "str", values={1, 2, 3, "numbers"}, _eks=[(MyExchangeCls, "a_routing_key")])RPC calls~~~~~~~~~RPC calls are blocking calls that leverage a very simple mechanism: thelow level AMQP message is given a (usually temporary and private) queuethrough its ``reply_to`` property, and this is explicitely used by thereceiver to send an answer.In Postage an RPC message is defined by a ``build_rpc_name()`` method ina ``GenericProducer`` and called with ``rpc_name()``; it returns aresult message as sent by the component that answered the call and thusits type should be one of ``MessageResult``, ``MessageResultError`` or``MessageResultException`` for plain Postage.RPC messages accept the following parameters: ``_timeout`` (the messagetimeout, defaults to 30 seconds), ``_max_retry`` (the maximum number oftimes the message shall be sent again when timing out, default to 4),and ``_queue_only`` (the call returns the temporary queue on which theanswer message will appear, instead of the message itself).When the maximum number of tries has been reached the call returns a``MessageResultException`` with the ``TimeoutError`` exception.GenericConsumer---------------The ``GenericConsumer`` class implements a standard AMQP consumer, i.e.an object that can connect to exchanges through queues and fetchmessages.A class that inherits from ``GenericConsumer`` shall define an ``eqk``class attribute which is a list of tuples in the form``(Exchange, [(Queue, Key), (Queue, Key), ...])``; each tuple means thatthe given exchange will be subscribed by the listed queues, each of themwith the relative routing key. The Queue may be defined as a plainstring (the name of the queue) or as a dictionary with the 'name' and'flags' keys; the second key will identify a dictionary of flags, suchas ``{'auto_delete':True}``... code:: python class MyConsumer(GenericConsumer): eqk = ( PingExchage, [('ping_queue', 'ping_rk')], LogExchange, [('log_queue', 'log')] )Apart from declaring bindings in the class you can use the``queue_bind()`` method that accept an exchange, a queue and a key. Thiscan be useful if you have to declare queues at runtime or if parameterssuch as routing key depend on some value you cannot access atinstantiation time.MessageProcessor----------------``MessageProcessor`` objects boost ``GenericConsumer`` to full power =)A ``MessageProcessor`` is a ``MicroThread`` with two main attributes:``self.consumer`` (a ``GenericConsumer`` or derived class) and a``self.fingerprint`` (a ``Fingerprint`` in its dictionary form).Inside a ``MessageProcessor`` you can define a set of methods called"message handlers" that process incoming messages. The methods can befreely called and have to be decorated with the ``@MessageHandler``decorator; this needs two parameters: the type of the message and thename. So defining.. code:: python @MessageHandler('command', 'quit') def msg_quit(self, content): [...]you make the method ``msg_quit()`` process each incoming message whichtype is ``command`` and name is ``quit``. You can define as many messagehandlers as you want for the same message type/name, but beware thatthey are all executed in random order. As you can see from the example amessage handler method must accept a parameter which receives thecontent of the processed message.You can also decorate a method with the ``@RpcHandler`` decorator; inthat case the method must accept two parameters, the first being thecontent of the received message, the second a reply function. The methodhas the responsibility of calling it passing a ``MessageResult`` orderived object. This mechanism allows the handler to do some cleanupafter sending the reply.Message handlers can also be defined as classes inside a``MessageProcessor`` and have to inherit from ``Handler`` and define a``call()`` method which accepts only self; it can then access the``self.data`` and ``self.reply_func`` attributes that contain theincoming message and the return function. The difference between themethod and class version of the message handlers is that the classversion can access the underlying ``MessageProcessor`` through its``self.processor`` attribute. This is useful to access the fingerprintof the message or any other attribute that is included in the processor.A class is then in general richer than a simple method, thus giving morefreedom to the programmer.The last available decorator is ``MessageHandlerFullBody`` that passesto the decorated method or class the full body of the incoming messageinstead that only the value of the ``content`` key like``MessageHandler`` and ``RpcHandler`` do.Default handlers~~~~~~~~~~~~~~~~``MessageProcessor`` objects define two default message handlers toprocess incoming command ``quit`` and command ``restart``. The first, asyou can easily guess from the name, makes the component quit; actuallyit makes the consumer stop consuming messages and the microthread quit,so the program executes the code you put after the scheduler loop. Ifyou put no code, the program just exits. The second command makes thecomponent restart, i.e. it replaces itself with a new execution of thesame program. This makes very easy to update running systems; justreplace the code and send a ``restart`` to your components.Message filters~~~~~~~~~~~~~~~**New in version 1.1.0** The ``MessageFilter`` class may be used todecorate a message handler and accepts a callable as parameter. Theprovided callable is called on a copy of each incoming message thatwould be processed by that handler. Any exception raised by the callableresults in the message being discarded without passing through thehandler.You may use this feature to manage changes in the format of a message,and providing a filter that transforms old-style messages into new-styleones.GenericApplication~~~~~~~~~~~~~~~~~~**New in version 1.2.0** The ``generic_application.py`` module containsthe ``GenericApplication`` class which is a basic unspecializedcomponent based on ``messaging.messageProcessor``.``GenericApplication`` may be used to build message-driven programs inPython that interact through the RabbitMQ system.``GenericApplication`` is a microthread that may use ``MessageHandler``and derived classes to get messages from the RabbitMQ exchanges itconnects to. The standard exchange used by this class is``generic_application.GenericApplicationExchange``. In the followingparagraphs the names "system" and "network" both mean a givenvirtualhost on a set of clustered RabbitMQ nodes.A ``GenericApplication`` is identified by a ``name``, an operatingsystem ``pid`` and a running ``host``. From those values three queuesare defined inside each instance: ``self.sid``, ``self.hid`` and``self.uid``.- ``self.sid`` is the system-wide queue, which is shared among all microthreads with the same ``name``.- ``self.hid`` is the host-wide queue, which is shared by all microthreads with the same ``name`` and the same ``host``.- ``self.uid`` is an unique queue on the whole system. Being linked to the OS PID and the running host this queue is owned by a single application instance.The ``GenericApplication`` class defines several routing keys throughwhich the above queues are connected to the exchange, namely:- ``{name}`` is a fanout that delivers messages to every application with the given name. For example sending a message with the ``monitor`` key will reach all microthreads running with the ``monitor`` name.- ``{name}/rr`` delivers messages in round robin to every application with the given name. Round robin keys leverage the basic AMQP load balancing mechanism: the queue is shared among consumers and messages are fairly divided among them.- ``@{host}`` is a fanout to every application running on the same host.- ``{name}@{host}`` is a fanout to every application running on the same host and with the same name.- ``{name}@{host}/rr`` is the round robin version of the previous key. It balances message delivering to applications that share name and host.- ``{pid}@{host}`` delivers a message only the the unique application that has the given pid on the given host.A ``GenericApplication`` may join one or more groups. The list of groupscan be specified when instancing the class or dynamically through amessage. In the first case two keys are available to send messages- ``{name}#{group}`` which is a fanout to every application with the same name in the same group.- ``{name}#{group}/rr`` which is a round robin to the same set of applications.If the application joins a group later in its lifecyle, through a``join_group`` message, only the fanout key is available. The technicalreason for this limitation is described in the source code of the``msg_join_group()`` message handler.Credits~~~~~~~First of all I want to mention and thank the `Erlang <www.erlang.org>`__and `RabbitMQ <www.rabbitmq.com>`__ teams and the maintainer of`pika <https://github.com/pika/pika>`__, Gavin M. Roy, for their hardwork, and for releasing such amazing pieces of software as open source.Many thanks to `Jeff Knupp <http://www.jeffknupp.com/about-me/>`__ forhis post `Open Sourcing a Python Project the RightWay <http://www.jeffknupp.com/blog/2013/08/16/open-sourcing-a-python-project-the-right-way/>`__and to `Audrey M. Roy <http://www.audreymroy.com/>`__ for her`cookiecutter <https://github.com/audreyr/cookiecutter>`__ and`cookiecutter-pypackage <https://github.com/audreyr/cookiecutter-pypackage>`__tools. All those things make Python packaging a breeze... |Build Status| image:: https://travis-ci.org/lgiordani/postage.png?branch=master :target: https://travis-ci.org/lgiordani/postage.. |Version| image:: https://badge.fury.io/py/postage.png :target: http://badge.fury.io/py/postage.. |PyPi Downloads| image:: https://pypip.in/d/postage/badge.png :target: https://crate.io/packages/postage?version=latestHistory-------1.0.0 (2013-12-03)++++++++++++++++++* First release on PyPI.1.0.1 (2014-06-05)++++++++++++++++++* Queues created through queue_bind() were declared with auto_delete=True, which made the queue disappear as soon as no more consumers were reading from it. This made the consumer lose all messages waiting in the queue. Fixed by removing the auto_delete=True parameter.1.0.2 (2014-08-05)++++++++++++++++++* Now EQKs may contain queue flags to toggle AMQP parameters such as auto_delete on specific queues1.1.0 (2014-12-10)++++++++++++++++++* Added filters to alter/manage incoming messages before processing them1.2.0 (2015-02-27)++++++++++++++++++* Added `generic_application.py` with the `GenericApplication` class1.2.1 (2015-02-27)++++++++++++++++++* Fixed wrong indentation in `generic_application.py`
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.