hurry.workflow 3.0.2

Creator: bradpython12

Last updated:

Add to Cart

Description:

hurry.workflow 3.0.2

hurry.workflow
A simple but quite nifty workflow system for Zope 3.


CHANGES

3.0.2 (2018-01-16)

Update rendering of pypi description.



3.0.1 (2018-01-16)

Fix up a brown paper bag release.



3.0.0 (2018-01-16)

Update dependencies not to rely on ZODB 3 anymore.
Support python3.4, python3.5, python3.6 in addition to python2.7.



0.13.1 (2013-01-17)

Make the exceptions also display more informative messages.



0.13 (2013-01-17)

NoTransitionAvailableError gained a source and destination
attribute indicating what transition wasn’t available.
AmbiguousTransitionError also gained a source and destination
attribute indicating what transition was ambiguous.
InvalidTransitionError gained a source attribute indicating
the source state of the attempted invalid transition.
Newer bootstrap.py



0.12 (2012-02-10)

Make the info() and state() functions on the WorkflowInfo class into
classmethods as they are not of much use otherwise.
fireTransitionToward already accepted a check_security=False
argument, but it would not allow a transition that a user didn’t
have the permission for to be fired after all, because the
transition wouldn’t even be found in the first place. Now it works.



0.11 (2010-04-16)

Do IAnnotations(self.context) only once in WorkflowState.
An IWorkflowVersions implementation is now optional.
Added multiple workflows support.



0.10 (2009-11-19)

Moved to svn.zope.org for development.
Added a buildout.cfg, bootstrap.py
Minimized dependencies. Note that Workflow does not inherit from
Persistent and zope.container.contained.Contained
anymore. If you need persistent workflow, you need to subclass this
in your own code. This breaks backwards compatibility, as persistent
workflows would need to be re-initialized.



0.9.2.1 (2007-08-15)

Oops, the patches in 0.9.2 were not actually applied. Fixed them
now.



0.9.2 (2007-08-15)

zope.security changes broke imports in hurry.workflow.
localUtility directive is now deprecated, so don’t use it anymore.



0.9.1 (2006-09-22)

first cheesehop release.



0.9 (2006-06-15)

separate out from hurry package into hurry.workflow
eggification work
Zope 3.3 compatibility work



0.8 (2006-05-01)
Initial public release.



Detailed Documentation


Hurry Workflow
The hurry workflow system is a “roll my own because I’m in a hurry”
framework.

Basic workflow
Let’s first make a content object that can go into a workflow:
>>> from zope.interface import implementer, Attribute

>>> from zope.annotation.interfaces import IAttributeAnnotatable
>>> class IDocument(IAttributeAnnotatable):
... title = Attribute('Title')
>>> @implementer(IDocument)
... class Document(object):
... def __init__(self, title):
... self.title = title
As you can see, such a content object must provide IAnnotatable, as
this is used to store the workflow state. The system uses the
IWorkflowState adapter to get and set an object’s workflow state:
>>> from hurry.workflow import interfaces
>>> document = Document('Foo')
>>> state = interfaces.IWorkflowState(document)
>>> print(state.getState())
None
The state can be set directly for an object using the IWorkflowState
adapter as well:
>>> state.setState('foo')
>>> state.getState()
'foo'
But let’s set it back to None again, so we can start again in a
pristine state for this document:
>>> state.setState(None)
It’s not recommended use setState() do this ourselves, though: usually
we’ll let the workflow system take care of state transitions and the
setting of the initial state.
Now let’s define a simple workflow transition from ‘a’ to ‘b’. It
needs a condition which must return True before the transition is
allowed to occur:
>>> def NullCondition(wf, context):
... return True
and an action that takes place when the transition is taken:
>>> def NullAction(wf, context):
... pass
Now let’s construct a transition:
>>> from hurry.workflow import workflow
>>> transition = workflow.Transition(
... transition_id='a_to_b',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)
The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL
indicates user action is needed to fire the transition. AUTOMATIC
transitions fire automatically. SYSTEM is a workflow transition
directly fired by the system, and not directly by the user.
We also will introduce an initial transition, that moves an object
into the workflow (for instance just after it is created):
>>> init_transition = workflow.Transition(
... transition_id='to_a',
... title='Create A',
... source=None,
... destination='a')
And a final transition, when the object moves out of the workflow again
(for instance just before it is deleted):
>>> final_transition = workflow.Transition(
... transition_id='finalize',
... title='Delete',
... source='b',
... destination=None)
Now let’s put the transitions in an workflow utility:
>>> wf = workflow.Workflow([transition, init_transition, final_transition])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)
We can get the transition from the workflow using get_transition
should we need it:
>>> wf.getTransition('a', 'a_to_b') is transition
True
If we try to get a transition that doesn’t exist, we get an error:
>>> wf.getTransition('b', 'a_to_b')
Traceback (most recent call last):
...
hurry.workflow.interfaces.InvalidTransitionError: source: "b"

>>> from hurry.workflow.interfaces import InvalidTransitionError
>>> try:
... wf.getTransition('b', 'a_to_b')
... except InvalidTransitionError as e_:
... e = e_
>>> e.source
'b'
Workflow transitions cause events to be fired; we will put in a simple
handler so we can check whether things were successfully fired:
>>> events = []
>>> def transition_handler(event):
... events.append(event)
>>> component.provideHandler(
... transition_handler,
... [interfaces.IWorkflowTransitionEvent])
To get what transitions to other states are possible from an object,
as well as to fire transitions and set initial state, we use the
IWorkflowInfo adapter:
>>> info = interfaces.IWorkflowInfo(document)
We’ll initialize the workflow by firing the ‘to_a’ transition:
>>> info.fireTransition('to_a')
This should’ve fired an event:
>>> events[-1].transition.transition_id
'to_a'
>>> events[-1].source is None
True
>>> events[-1].destination
'a'
There’s only a single transition defined to workflow state ‘b’:
>>> info.getManualTransitionIds()
['a_to_b']
We can also get this by asking which manual (or system) transition
exists that brings us to the desired workflow state:
>>> info.getFireableTransitionIdsToward('b')
['a_to_b']
Since this is a manually triggered transition, we can fire this
transition:
>>> info.fireTransition('a_to_b')
The workflow state should now be ‘b’:
>>> state.getState()
'b'
We check that the event indeed got fired:
>>> events[-1].transition.transition_id
'a_to_b'
>>> events[-1].source
'a'
>>> events[-1].destination
'b'
We will also try fireTransitionToward here, so we sneak back the
workflow to state ‘a’ again and try that:
>>> state.setState('a')
Try going through a transition we cannot reach first:
>>> info.fireTransitionToward('c')
Traceback (most recent call last):
...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "a" destination: "c"
This error has some information available of what transition was attempted:
>>> from hurry.workflow.interfaces import NoTransitionAvailableError
>>> try:
... info.fireTransitionToward('c')
... except NoTransitionAvailableError as e_:
... e = e_
>>> e.source
'a'
>>> e.destination
'c'
Now go to ‘b’ again:
>>> info.fireTransitionToward('b')
>>> state.getState()
'b'
Finally, before forgetting about our document, we finalize the workflow:
>>> info.fireTransition('finalize')
>>> state.getState() is None
True
And we have another event that was fired:
>>> events[-1].transition.transition_id
'finalize'
>>> events[-1].source
'b'
>>> events[-1].destination is None
True


Multiple workflows
We have previously registered a workflow as a unnamed utility.
You can also register a workflow as a named utility to provide
several workflows for a site.
Let’s create a, invoice document:
>>> class IInvoiceDocument(IDocument):
... title = Attribute('Title')

>>> @implementer(IInvoiceDocument)
... class InvoiceDocument(object):
... def __init__(self, title, amount):
... self.title = title
... self.amount = amount
We define our workflow:
>>> invoice_init = workflow.Transition(
... transition_id='init_invoice',
... title='Invoice Received',
... source=None,
... destination='received')
>>>
>>> invoice_paid = workflow.Transition(
... transition_id='invoice_paid',
... title='Invoice Paid',
... source='received',
... destination='paid')

>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )
We register a new workflow utility, WorkflowState and WorkflowInfo adapters, all
named “invoice”:
>>> from hurry.workflow import workflow
>>> from zope.annotation import interfaces as annotation_interfaces
>>> component.provideUtility(invoice_wf, interfaces.IWorkflow, name="invoice")
>>> class InvoiceWorkflowInfo(workflow.WorkflowInfo):
... name="invoice"
>>> component.provideAdapter(
... InvoiceWorkflowInfo,
... (annotation_interfaces.IAnnotatable,),
... interfaces.IWorkflowInfo,
... name="invoice")
>>> class InvoiceWorkflowState(workflow.WorkflowState):
... state_key = "invoice.state"
... id_key = "invoice.id"
>>> component.provideAdapter(
... InvoiceWorkflowState,
... (annotation_interfaces.IAnnotatable,),
... interfaces.IWorkflowState,
... name="invoice")
Now we can utilize the workflow:
>>> invoice = InvoiceDocument('abc', 22)

>>> info = component.getAdapter(invoice, interfaces.IWorkflowInfo, name="invoice")
>>> info.fireTransition('init_invoice')
>>> state = component.getAdapter(invoice, interfaces.IWorkflowState, name="invoice")
>>> state.getState()
'received'
>>> info.fireTransition('invoice_paid')
>>> state.getState()
'paid'
To make it easier to get the state and info adapters for a particular context
object, there are two convenience functions on the workflow info object. The
info object “knows” what workflow utility to look for, as they are associated
by name:
>>> info_ = InvoiceWorkflowInfo.info(invoice)
>>> interfaces.IWorkflowInfo.providedBy(info_)
True

>>> state_ = InvoiceWorkflowInfo.state(invoice)
>>> interfaces.IWorkflowState.providedBy(state_)
True
>>> state.getState() is state_.getState()
True
Of course, this document always have the default unnamed workflow:
>>> info = interfaces.IWorkflowInfo(invoice)
>>> info.fireTransition('to_a')
>>> state = interfaces.IWorkflowState(invoice)
>>> state.getState()
'a'


Multi-version workflow
Now let’s go for a more complicated scenario where have multiple
versions of a document. At any one time a document can have an
UNPUBLISHED version and a PUBLISHED version. There can also be a
CLOSED version and any number of ARCHIVED versions:
>>> UNPUBLISHED = 'unpublished'
>>> PUBLISHED = 'published'
>>> CLOSED = 'closed'
>>> ARCHIVED = 'archived'
Let’s start with a simple initial transition:
>>> init_transition = workflow.Transition(
... transition_id='init',
... title='Initialize',
... source=None,
... destination=UNPUBLISHED)
When the unpublished version is published, any previously published
version is made to be the CLOSED version. To accomplish this secondary
state transition, we’ll use the system’s built-in versioning ability
with the ‘fireTransitionsForVersions’ method, which can be used to
fire transitions of other versions of the document:
>>> def PublishAction(wf, context):
... wf.fireTransitionForVersions(PUBLISHED, 'close')
Now let’s build the transition:
>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=PublishAction,
... trigger=interfaces.MANUAL,
... order=1)
Next, we’ll define a transition from PUBLISHED to CLOSED, which means
we want to archive whatever was closed before:
>>> def CloseAction(wf, context):
... wf.fireTransitionForVersions(CLOSED, 'archive')
>>> close_transition = workflow.Transition(
... transition_id='close',
... title='Close',
... source=PUBLISHED,
... destination=CLOSED,
... condition=NullCondition,
... action=CloseAction,
... trigger=interfaces.MANUAL,
... order=2)
Note that CloseAction will also be executed automatically whenever
state is transitioned from PUBLISHED to CLOSED using
fireTransitionsForVersions. This means that publishing a document
results in the previously closed document being archived.
If there is a PUBLISHED but no UNPUBLISHED version, we can make a new
copy of the PUBLISHED version and make that the UNPUBLISHED version:
>>> def CanCopyCondition(wf, context):
... return not wf.hasVersion(UNPUBLISHED)
Since we are actually creating a new content object, the action should
return the newly created object with the new state:
>>> def CopyAction(wf, context):
... return Document('copy of %s' % context.title)

>>> copy_transition = workflow.Transition(
... transition_id='copy',
... title='Copy',
... source=PUBLISHED,
... destination=UNPUBLISHED,
... condition=CanCopyCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL,
... order=3)
A very similar transition applies to the closed version. If we have
no UNPUBLISHED version and no PUBLISHED version, we can make a new copy
from the CLOSED version:
>>> def CanCopyCondition(wf, context):
... return (not wf.hasVersion(UNPUBLISHED) and
... not wf.hasVersion(PUBLISHED))

>>> copy_closed_transition = workflow.Transition(
... transition_id='copy_closed',
... title='Copy',
... source=CLOSED,
... destination=UNPUBLISHED,
... condition=CanCopyCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL,
... order=4)
Finally let’s build the archiving transition:
>>> archive_transition = workflow.Transition(
... transition_id='archive',
... title='Archive',
... source=CLOSED,
... destination=ARCHIVED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL,
... order=5)
Now let’s build and provide the workflow utility:
>>> wf = workflow.Workflow([init_transition,
... publish_transition, close_transition,
... copy_transition, copy_closed_transition,
... archive_transition])

>>> component.provideUtility(wf, interfaces.IWorkflow)
Let’s get the workflow_versions utility which we can use to track
versions and come up with a new unique id:
>>> workflow_versions = component.getUtility(interfaces.IWorkflowVersions)
And let’s start with a document:
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
We need the document id to compare later when we create a new version:
>>> state = interfaces.IWorkflowState(document)
>>> document_id = state.getId()
Let’s add it to the workflow versions container so we can find it. Note
that we’re using a private API here; this could be implemented as adding
it to a folder or any other way, as long as getVersions() works later:
>>> workflow_versions.addVersion(document) # private API
Also clear out previously recorded events:
>>> del events[:]
We can publish it:
>>> info.getManualTransitionIds()
['publish']
So let’s do that:
>>> info.fireTransition('publish')
>>> state.getState()
'published'
The last event should be the ‘publish’ transition:
>>> events[-1].transition.transition_id
'publish'
And now we can either close or create a new copy of it. Note that the
names are sorted using the order of the transitions:
>>> info.getManualTransitionIds()
['close', 'copy']
Let’s close it:
>>> info.fireTransition('close')
>>> state.getState()
'closed'
We’re going to create a new copy for editing now:
>>> info.getManualTransitionIds()
['copy_closed', 'archive']
>>> document2 = info.fireTransition('copy_closed')
>>> workflow_versions.addVersion(document2) # private API to track it
>>> document2.title
'copy of bar'
>>> state = interfaces.IWorkflowState(document2)
>>> state.getState()
'unpublished'
>>> state.getId() == document_id
True
The original version is still there in its original state:
>>> interfaces.IWorkflowState(document).getState()
'closed'
Let’s also check the last event in some detail:
>>> event = events[-1]
>>> event.transition.transition_id
'copy_closed'
>>> event.old_object == document
True
>>> event.object == document2
True
Now we are going to publish the new version:
>>> info = interfaces.IWorkflowInfo(document2)
>>> info.getManualTransitionIds()
['publish']
>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document2).getState()
'published'
The original is still closed:
>>> interfaces.IWorkflowState(document).getState()
'closed'
Now let’s publish another copy after this:
>>> document3 = info.fireTransition('copy')
>>> workflow_versions.addVersion(document3)
>>> interfaces.IWorkflowInfo(document3).fireTransition('publish')
This copy is now published:
>>> interfaces.IWorkflowState(document3).getState()
'published'
And the previously published version is now closed:
>>> interfaces.IWorkflowState(document2).getState()
'closed'
Note that due to the condition, it’s not possible to copy from the
closed version, as there is a published version still remaining:
>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds()
['archive']
Meanwhile, the original version, previously closed, is now archived:
>>> interfaces.IWorkflowState(document).getState()
'archived'


Automatic transitions
Now let’s try a workflow transition that is automatic and time-based.
We’ll set up a very simple workflow between ‘unpublished’ and
‘published’, and have the ‘published’ transition be time-based.
To simulate time, we have moments:
>>> time_moment = 0
We will only publish if time_moment is greater than 3:
>>> def TimeCondition(wf, context):
... return time_moment > 3
Set up the transition using this condition; note that this one is
automatic, i.e. it doesn’t have to be triggered by humans:
>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=TimeCondition,
... action=NullAction,
... trigger=interfaces.AUTOMATIC)
Set up the workflow using this transition, and reusing the
init transition we defined before:
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
Clear out all versions; this is an private API we just use for
demonstration purposes:
>>> workflow_versions.clear()
Now create a document:
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
Private again; do this with the catalog or any way you prefer in your
own code:
>>> workflow_versions.addVersion(document)
Since this transition is automatic, we should see it like this:
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish']
Now fire let’s any automatic transitions:
>>> workflow_versions.fireAutomatic()
Nothing should have happened as we are still at time moment 0:
>>> state = interfaces.IWorkflowState(document)
>>> state.getState()
'unpublished'
We change the time moment past 3:
>>> time_moment = 4
Now fire any automatic transitions again:
>>> workflow_versions.fireAutomatic()
The transition has fired, so the state will be ‘published’:
>>> state.getState()
'published'


System transitions
Let’s try system transitions now. This transition shouldn’t show up
as manual nor as automatic:
>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... trigger=interfaces.SYSTEM)
Set up the workflow using this transition, and reusing the
init transition we defined before:
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
Clear out all versions; this is an private API we just use for
demonstration purposes:
>>> workflow_versions.clear()
Now create a document:
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
Private again; do this with the catalog or any way you prefer in your
own code:
>>> workflow_versions.addVersion(document)
We should see it as a system transition:
>>> info.getSystemTransitionIds()
['publish']
but not as automatic nor manual:
>>> info.getAutomaticTransitionIds()
[]
>>> info.getManualTransitionIds()
[]
This transition can be fired:
>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document).getState()
'published'


Multiple transitions
It’s possible to have multiple transitions from the source state to
the target state, for instance an automatic and a manual one.
Let’s set up a workflow with two manual transitions and a single
automatic transitions between two states:
>>> publish_1_transition = workflow.Transition(
... transition_id='publish 1',
... title='Publish 1',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> publish_2_transition = workflow.Transition(
... transition_id='publish 2',
... title='Publish 2',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> publish_auto_transition = workflow.Transition(
... transition_id='publish auto',
... title='Publish Auto',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=TimeCondition,
... action=NullAction,
... trigger=interfaces.AUTOMATIC)
Clear out all versions; this is an private API we just use for
demonstration purposes:
>>> workflow_versions.clear()
Since we’re using the time condition again, let’s make sure
time is at 0 again so that the publish_auto_transition doesn’t fire:
>>> time_moment = 0
Now set up the workflow using these transitions, plus our
init_transition:
>>> wf = workflow.Workflow([init_transition,
... publish_1_transition, publish_2_transition,
... publish_auto_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
Now create a document:
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
We should have two manual transitions:
>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds())
['publish 1', 'publish 2']
And a single automatic transition:
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish auto']


Protecting transitions with permissions
Transitions can be (and should be) protected with a permission, so
that not everybody can execute them.
Let’s set up a workflow with a permission that has a permission:
>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL,
... permission="zope.ManageContent")
Quickly set up the workflow state again for a document:
>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
Let’s set up the security context:
>>> from zope.security.interfaces import Unauthorized
>>> from zope.security.management import newInteraction, endInteraction
>>> class Principal:
... def __init__(self, id):
... self.id = id
... self.groups = []
>>> class Participation:
... interaction = None
... def __init__(self, principal):
... self.principal = principal
>>> endInteraction() # XXX argh, apparently one is already active?
>>> newInteraction(Participation(Principal('bob')))
We shouldn’t see this permission appear in our list of possible transitions,
as we do not have access:
>>> info.getManualTransitionIds()
[]
Now let’s try firing the transition. It should fail with Unauthorized:
>>> try:
... info.fireTransition('publish')
... except Unauthorized:
... print("Got unauthorized")
Got unauthorized
It’s also not allowed for fireTransitionToward:
>>> info.fireTransitionToward(PUBLISHED)
Traceback (most recent call last):
...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "unpublished" destination: "published"
In this case, the transition even’t even available because the user
doesn’t have the right permission.
The system user is however allowed to do it:
>>> from zope.security.management import system_user
>>> endInteraction()
>>> newInteraction(Participation(system_user))
>>> info.fireTransition('publish')
And this goes off without a problem.
There is also a special way to make it happen by passing check_security is
False to fireTransition:
>>> endInteraction()
>>> newInteraction(Participation(Principal('bob')))
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransition('publish', check_security=False)
This also works with fireTransitionToward:
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransitionToward(PUBLISHED, check_security=False)


Side effects during transitions
Sometimes we would like something to get executed before the
WorkflowTransitionEvent is fired, but after a (potential) new version
of the object has been created. If an object is edited during the
same request as a workflow transition, the editing should take place
after a potential new version has been created, otherwise the old, not
the new, version will be edited.
If something like a history logger hooks into IWorkflowTransitionEvent
however, it would get information about the new copy before the
editing took place. To allow an editing to take place between the
creation of the new copy and the firing of the event, a side effect
function can be passed along when a transition is fired.
The sequence of execution then is:

firing of transition itself, creating a new version
executing the side effect function on the new version
firing the IWorkflowTransitionEvent

Let’s set up a very simple workflow:

>>> foo_transition = workflow.Transition(
... transition_id='foo',
... title='Foo',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL)


Quickly set up the workflow state again for a document:
>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, foo_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> events = []
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')
Now let’s set up a side effect:
>>> def side_effect(context):
... context.title = context.title + '!'
Now fire the transition, with a side effect:
>>> new_version = info.fireTransition('foo', side_effect=side_effect)
The title of the new version should now have a ! at the end:
>>> new_version.title[-1] == '!'
True
But the old version doesn’t:
>>> document.title[-1] == '!'
False
The events list we set up before should contain two events:
>>> len(events)
2
>>> events[1].object.title[-1] == '!'
True



Ambiguous transitions
Let’s set up a situation where there are two equivalent transitions from
a to b:
>>> transition1 = workflow.Transition(
... transition_id='a_to_b1',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> transition2 = workflow.Transition(
... transition_id='a_to_b2',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)


>>> wf = workflow.Workflow([transition1, transition2])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> info = interfaces.IWorkflowInfo(document)
>>> state = interfaces.IWorkflowState(document)
>>> state.setState('a')
fireTransitionToward is ambiguous as two transitions are possible:
>>> info.fireTransitionToward('b')
Traceback (most recent call last):
...
hurry.workflow.interfaces.AmbiguousTransitionError: source: "a" destination: "b"

>>> from hurry.workflow.interfaces import AmbiguousTransitionError
>>> try:
... info.fireTransitionToward('b')
... except AmbiguousTransitionError as e_:
... e = e_
>>> e.source
'a'
>>> e.destination
'b'


Download

License

For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.