Merge branch 'main' into feature/receive-message-correlations

This commit is contained in:
Elizabeth Esswein 2024-04-25 14:34:05 -04:00
commit ca2a50cf64
38 changed files with 717 additions and 419 deletions

View File

@ -56,12 +56,12 @@ class NodeParser:
def get_description(self):
return self.process_parser.parser.spec_descriptions.get(self.node.tag)
def xpath(self, xpath, extra_ns=None):
return self._xpath(self.node, xpath, extra_ns)
def xpath(self, xpath):
return self._xpath(self.node, xpath)
def doc_xpath(self, xpath, extra_ns=None):
def doc_xpath(self, xpath):
root = self.node.getroottree().getroot()
return self._xpath(root, xpath, extra_ns)
return self._xpath(root, xpath)
def attribute(self, attribute, namespace=None, node=None):
if node is None:
@ -156,13 +156,8 @@ class NodeParser:
if noderef is not None:
return noderef.getparent().get('name')
def _xpath(self, node, xpath, extra_ns=None):
if extra_ns is not None:
nsmap = self.nsmap.copy()
nsmap.update(extra_ns)
else:
nsmap = self.nsmap
return node.xpath(xpath, namespaces=nsmap)
def _xpath(self, node, xpath):
return node.xpath(xpath, namespaces=self.nsmap)
def raise_validation_exception(self, message):
raise ValidationException(message, self.node, self.filename)

View File

@ -18,7 +18,7 @@
# 02110-1301 USA
from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState, TaskFilter, TaskIterator
from SpiffWorkflow.specs.StartTask import StartTask
from SpiffWorkflow.specs.Join import Join
@ -72,7 +72,7 @@ class BoundaryEventJoin(Join, BpmnTaskSpec):
def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)
def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self)
@ -97,7 +97,7 @@ class BoundaryEventJoin(Join, BpmnTaskSpec):
cancel += [main]
else:
cancel = []
return force or finished, cancel
return finished, cancel
class StartEventJoin(Join, BpmnTaskSpec):
@ -105,7 +105,7 @@ class StartEventJoin(Join, BpmnTaskSpec):
def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)
def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
@ -118,23 +118,21 @@ class StartEventJoin(Join, BpmnTaskSpec):
else:
waiting.append(task)
return force or may_fire, waiting
return may_fire, waiting
class _EndJoin(UnstructuredJoin, BpmnTaskSpec):
def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all ready and waiting tasks (excluding
# ourself). The EndJoin waits for everyone!
waiting_tasks = []
for task in my_task.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING):
if task.thread_id != my_task.thread_id:
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all ready and waiting tasks (excluding ourself). The EndJoin waits for everyone!
for task in TaskIterator(my_task.workflow.task_tree, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name):
if task == my_task:
continue
if task.task_spec == my_task.task_spec:
continue
waiting_tasks.append(task)
return force or len(waiting_tasks) == 0, waiting_tasks
may_fire = False
break
else:
may_fire = True
return may_fire
def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)

View File

@ -18,7 +18,7 @@
# 02110-1301 USA
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.MultiChoice import MultiChoice
from .unstructured_join import UnstructuredJoin
@ -68,39 +68,40 @@ class InclusiveGateway(MultiChoice, UnstructuredJoin):
MultiChoice.test(self)
UnstructuredJoin.test(self)
def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all places where this task is used.
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all places where this task is used and unfinished tasks that may be ancestors
# If there are any, we may have to check whether this gateway is reachable from any of them.
tasks, sources = [], []
for task in my_task.workflow.get_tasks(end_at_spec=self.name):
if task.task_spec == self:
tasks.append(task)
elif task.has_state(TaskState.READY|TaskState.WAITING):
sources.append(task.task_spec)
# Look up which tasks have parents completed.
completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ])
# Find waiting tasks
# Exclude tasks whose specs have already been completed
# A spec only has to complete once, even if on multiple paths
waiting_tasks = []
# If any parents of this join have not been finished, this task must wait.
# A parent spec only has to be completed once, even it is on multiple paths
tasks_waiting = False
for task in tasks:
if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
waiting_tasks.append(task.parent)
tasks_waiting = True
break
if force:
# If force is true, complete the task
complete = True
elif len(waiting_tasks) > 0:
# If we have waiting tasks, we're obviously not done
if tasks_waiting:
complete = False
else:
# Handle the case where there are paths from active tasks that must go through waiting inputs
waiting_inputs = [i for i in self.inputs if i not in completed_inputs]
task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING)
sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)]
# This will go back through a task spec's ancestors and return the source, if applicable
def check(spec):
for parent in spec.inputs:
return parent if parent in sources else check(parent)
# If we can get to a completed input from this task, we don't have to wait for it
# Start with the completed inputs and recurse back through its ancestors, removing any waiting tasks that
# could reach one of them.
for spec in completed_inputs:
source = check(spec)
if source is not None:
@ -115,7 +116,7 @@ class InclusiveGateway(MultiChoice, UnstructuredJoin):
complete = len(unfinished_paths) == 0
return complete, waiting_tasks
return complete
def _run_hook(self, my_task):
outputs = self._get_matching_outputs(my_task)

View File

@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from .unstructured_join import UnstructuredJoin
@ -41,11 +41,9 @@ class ParallelGateway(UnstructuredJoin):
Essentially, this means that we must wait until we have a completed parent
task on each incoming sequence.
"""
def _check_threshold_unstructured(self, my_task, force=False):
def _check_threshold_unstructured(self, my_task):
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
# Look up which tasks have parents completed.
waiting_tasks = []
tasks = my_task.workflow.get_tasks(spec_name=self.name)
waiting_inputs = set(self.inputs)
def remove_ancestor(task):
@ -56,13 +54,15 @@ class ParallelGateway(UnstructuredJoin):
remove_ancestor(task.parent)
for task in tasks:
if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
# Do not wait for descendants of this task
elif task.is_descendant_of(my_task):
# Handle the case where the parallel gateway is part of a loop.
if task.is_descendant_of(my_task):
# This is the first iteration; we should not wait on this task, because it will not be reached
# until after this join completes
remove_ancestor(task)
# Ignore predicted tasks; we don't care about anything not definite
elif task.parent.has_state(TaskState.DEFINITE_MASK):
waiting_tasks.append(task.parent)
elif my_task.is_descendant_of(task):
# This is an subsequent iteration; we need to ignore the parents of previous iterations
continue
elif task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
return force or len(waiting_inputs) == 0, waiting_tasks
return len(waiting_inputs) == 0

View File

@ -41,14 +41,20 @@ class SubWorkflowTask(TaskSpec):
def _on_subworkflow_completed(self, subworkflow, my_task):
self.update_data(my_task, subworkflow)
# I don't like manually moving back to ready, but don't want to run it
# Ideally, update hook would create the subprocess and return True, _run would start the subprocess and
# return None (so that the state would transition to started), and the completed event for this task
# could be used to run post-completed actions automatically.
# However, until I align the events with state transitions, I don't want to encourage external use of
# callback methods (though completed event is not going to change).
my_task._set_state(TaskState.READY)
def _update_hook(self, my_task):
subprocess = my_task.workflow.top_workflow.subprocesses.get(my_task.id)
if subprocess is None:
super()._update_hook(my_task)
self.create_workflow(my_task)
self.start_workflow(my_task)
my_task._set_state(TaskState.WAITING)
my_task._set_state(TaskState.STARTED)
else:
return subprocess.is_completed()
@ -59,21 +65,17 @@ class SubWorkflowTask(TaskSpec):
def copy_data(self, my_task, subworkflow):
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)
start.set_data(**deepcopy(my_task.data))
def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)
def create_workflow(self, my_task):
def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.create_subprocess(my_task, self.spec)
subworkflow.completed_event.connect(self._on_subworkflow_completed, my_task)
def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
self.copy_data(my_task, subworkflow)
start = subworkflow.get_next_task(spec_name='Start')
start.run()
my_task._set_state(TaskState.WAITING)
class CallActivity(SubWorkflowTask):

View File

@ -16,53 +16,47 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from copy import deepcopy
from SpiffWorkflow.util.task import TaskState, TaskIterator
from SpiffWorkflow.specs.Join import Join
class UnstructuredJoin(Join):
"""
A helper subclass of Join that makes it work in a slightly friendlier way
for the BPMN style threading
"""
def _do_join(self, my_task):
split_task = self._get_split_task(my_task)
def _update_hook(self, my_task):
# Identify all corresponding task instances within the thread.
# Also remember which of those instances was most recently changed,
# because we are making this one the instance that will
# continue the thread of control. In other words, we will continue
# to build the task tree underneath the most recently changed task.
last_changed = None
thread_tasks = []
for task in TaskIterator(split_task, spec_name=self.name):
if task.thread_id != my_task.thread_id:
# Ignore tasks from other threads. (Do we need this condition?)
continue
if not task.parent.has_state(TaskState.FINISHED_MASK):
# For an inclusive join, this can happen - it's a future join
continue
if my_task.is_descendant_of(task):
# Skip ancestors (otherwise the branch this task is on will get dropped)
continue
# We have found a matching instance.
thread_tasks.append(task)
may_fire = self._check_threshold_unstructured(my_task)
other_tasks = [t for t in my_task.workflow.tasks.values()
if t.task_spec == self and t != my_task and t.state is TaskState.WAITING]
for task in other_tasks:
# By cancelling other waiting tasks immediately, we can prevent them from being updated repeeatedly and pointlessly
task.cancel()
if not may_fire:
# Only the most recent instance of the spec needs to wait.
my_task._set_state(TaskState.WAITING)
else:
# Only copy the data to the task that will proceed
my_task._inherit_data()
return may_fire
# Check whether the state of the instance was recently changed.
changed = task.parent.last_state_change
if last_changed is None or changed > last_changed.parent.last_state_change:
last_changed = task
def _run_hook(self, my_task):
other_tasks = filter(
lambda t: t.task_spec == self and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t),
my_task.workflow.tasks.values()
)
for task in sorted(other_tasks, key=lambda t: t.last_state_change):
# By inheriting directly from parent tasks, we can avoid copying previouly merged data
# Update data from all the same thread tasks.
thread_tasks.sort(key=lambda t: t.parent.last_state_change)
collected_data = {}
for task in thread_tasks:
collected_data.update(task.data)
my_task.set_data(**deepcopy(task.parent.data))
# This condition only applies when a workflow is reset inside a parallel branch.
# If reset to a branch that was originally cancelled, all the descendants of the previously completed branch will still
# appear in the tree, potentially corrupting the structure and data.
if task.has_state(TaskState.COMPLETED):
task._drop_children(force=True)
for task in thread_tasks:
if task != last_changed:
task._set_state(TaskState.CANCELLED)
task._drop_children()
else:
task.data.update(collected_data)
# My task is not finished, so won't be included above.
my_task._inherit_data()
return True

View File

@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from SpiffWorkflow.util.task import TaskFilter, TaskIterator
from SpiffWorkflow.util.task import TaskFilter, TaskIterator, TaskState
from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent
class BpmnTaskFilter(TaskFilter):
@ -54,27 +54,28 @@ class BpmnTaskIterator(TaskIterator):
task = self.task_list.pop(0)
subprocess = task.workflow.top_workflow.subprocesses.get(task.id)
if task.task_spec.name == self.end_at_spec:
self.task_list = []
elif all([
if all([
len(task._children) > 0 or subprocess is not None,
task.state >= self.min_state or subprocess is not None,
self.depth < self.max_depth,
task.task_spec.name != self.end_at_spec,
]):
if subprocess is None:
next_tasks = task.children
elif self.depth_first:
next_tasks = [subprocess.task_tree] + task.children
# Do not descend into a completed subprocess to look for unfinished tasks.
if subprocess is None or (task.state >= TaskState.FINISHED_MASK and self.task_filter.state <= TaskState.FINISHED_MASK):
subprocess_tasks = []
else:
next_tasks = task.children + [subprocess.task_tree]
subprocess_tasks = [subprocess.task_tree]
if self.depth_first:
next_tasks = subprocess_tasks + task.children
self.task_list = next_tasks + self.task_list
else:
next_tasks = task.children + subprocess_tasks
self.task_list.extend(next_tasks)
self._update_depth(task)
elif self.depth_first and len(self.task_list) > 0:
self._handle_leaf_depth(task)
return task
return task

View File

@ -131,7 +131,7 @@ class BpmnWorkflow(BpmnBaseWorkflow):
def send_event(self, event):
"""Allows this workflow to catch an externally generated event."""
tasks = self.get_tasks(catches_event=event)
tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)
if len(tasks) == 0:
raise WorkflowException(f"This process is not waiting for {event.event_definition.name}")
for task in tasks:
@ -217,29 +217,28 @@ class BpmnWorkflow(BpmnBaseWorkflow):
def reset_from_task_id(self, task_id, data=None, remove_subprocess=True):
task = self.get_task_from_id(task_id)
run_task_at_end = False
if isinstance(task.parent.task_spec, BoundaryEventSplit):
task = task.parent
run_task_at_end = True # we jumped up one level, so exectute so we are on the correct task as requested.
descendants = []
# Since recursive deletion of subprocesses requires access to the tasks, we have to delete any subprocesses first
# We also need diffeent behavior for the case where we explictly reset to a subprocess (in which case we delete it)
# vs resetting inside (where we leave it and reset the tasks that descend from it)
for item in task:
if item == task and not remove_subprocess:
continue
if item.id in self.subprocesses:
descendants.extend(self.delete_subprocess(item))
descendants.extend(super().reset_from_task_id(task.id, data))
descendants = []
# If we're resetting to a boundary event, we also have to delete subprocesses underneath the attached events
top = task if not isinstance(task.parent.task_spec, BoundaryEventSplit) else task.parent
for desc in filter(lambda t: t.id in self.subprocesses, top):
if desc != task or remove_subprocess:
descendants.extend(self.delete_subprocess(desc))
# This resets the boundary event branches
if isinstance(task.parent.task_spec, BoundaryEventSplit):
for child in task.parent.children:
descendants.extend(super().reset_from_task_id(child.id, data if child == task else None))
else:
descendants.extend(super().reset_from_task_id(task.id, data))
if task.workflow.parent_task_id is not None:
sp_task = self.get_task_from_id(task.workflow.parent_task_id)
descendants.extend(self.reset_from_task_id(sp_task.id, remove_subprocess=False))
sp_task._set_state(TaskState.WAITING)
if run_task_at_end:
task.run()
sp_task._set_state(TaskState.STARTED)
return descendants

View File

@ -26,7 +26,7 @@ from ...bpmn.parser.util import full_tag
from ...bpmn.parser.ValidationException import ValidationException
from ...bpmn.parser.BpmnParser import BpmnParser, BpmnValidator
from ...dmn.parser.DMNParser import DMNParser, get_dmn_ns
from ...dmn.parser.DMNParser import DMNParser
from ..engine.DMNEngine import DMNEngine
XSD_DIR = os.path.join(os.path.dirname(__file__), 'schema')
@ -62,16 +62,19 @@ class BpmnDmnParser(BpmnParser):
"""
Add the given lxml representation of the DMN file to the parser's set.
"""
nsmap = get_dmn_ns(node)
namespaces = self.namespaces.copy()
namespaces.update(node.nsmap)
if None in namespaces:
namespaces['dmn'] = namespaces.pop(None)
# We have to create a dmn validator on the fly, because we support multiple versions
# If we have a bpmn validator, assume DMN validation should be done as well.
# I don't like this, but I don't see a better solution.
schema = self.dmn_schemas.get(nsmap.get('dmn'))
schema = self.dmn_schemas.get(namespaces.get('dmn'))
if self.validator and schema is not None:
validator = BpmnValidator(schema)
validator.validate(node, filename)
dmn_parser = DMNParser(self, node, nsmap, filename=filename)
dmn_parser = DMNParser(self, node, namespaces, filename=filename)
self.dmn_parsers[dmn_parser.bpmn_id] = dmn_parser
self.dmn_parsers_by_name[dmn_parser.get_name()] = dmn_parser

View File

@ -19,7 +19,7 @@
import ast
from SpiffWorkflow.bpmn.parser.node_parser import NodeParser, DEFAULT_NSMAP
from SpiffWorkflow.bpmn.parser.node_parser import NodeParser
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
from SpiffWorkflow.bpmn.parser.util import xpath_eval
@ -34,20 +34,6 @@ from SpiffWorkflow.dmn.specs.model import (
Rule,
)
def get_dmn_ns(node):
"""
Returns the namespace definition for the current DMN
:param node: the XML node for the DMN document
"""
nsmap = DEFAULT_NSMAP.copy()
if 'http://www.omg.org/spec/DMN/20151101/dmn.xsd' in node.nsmap.values():
nsmap['dmn'] = 'http://www.omg.org/spec/DMN/20151101/dmn.xsd'
elif 'http://www.omg.org/spec/DMN/20180521/DI/' in node.nsmap.values():
nsmap['dmn'] = 'http://www.omg.org/spec/DMN/20180521/DI/'
elif 'https://www.omg.org/spec/DMN/20191111/MODEL/' in node.nsmap.values():
nsmap['dmn'] = 'https://www.omg.org/spec/DMN/20191111/MODEL/'
return nsmap
class DMNParser(NodeParser):
"""
@ -79,20 +65,20 @@ class DMNParser(NodeParser):
self.filename = filename
def parse(self):
self.decision = self._parse_decision(self.node.findall('{*}decision'))
self.decision = self._parse_decision(self.xpath('.//dmn:decision'))
@property
def bpmn_id(self):
"""
Returns the process ID
"""
return self.node.findall('{*}decision[1]')[0].get('id')
return self.xpath('dmn:decision[1]')[0].get('id')
def get_name(self):
"""
Returns the process name (or ID, if no name is included in the file)
"""
return self.node.findall('{*}decision[1]')[0].get('name')
return self.xpath('dmn:decision[1]')[0].get('name')
def _parse_decision(self, root):
decision_elements = list(root)
@ -115,7 +101,7 @@ class DMNParser(NodeParser):
return decision
def _parse_decision_tables(self, decision, decisionElement):
for decision_table_element in decisionElement.findall('{*}decisionTable'):
for decision_table_element in decisionElement.findall('dmn:decisionTable', namespaces=self.nsmap):
name = decision_table_element.attrib.get('name', '')
hitPolicy = decision_table_element.attrib.get('hitPolicy', 'UNIQUE').upper()
decision_table = DecisionTable(decision_table_element.attrib['id'],
@ -146,12 +132,11 @@ class DMNParser(NodeParser):
def _parse_input(self, input_element):
type_ref = None
prefix = self.nsmap['dmn']
xpath = xpath_eval(input_element, {'dmn': prefix})
xpath = xpath_eval(input_element, self.nsmap)
expression = None
for input_expression in xpath('dmn:inputExpression'):
type_ref = input_expression.attrib.get('typeRef', '')
expression_node = input_expression.find('{' + prefix + '}text')
expression_node = input_expression.find('dmn:text', namespaces=self.nsmap)
if expression_node is not None:
expression = expression_node.text

View File

@ -97,22 +97,41 @@ class Join(TaskSpec):
self.threshold = threshold
self.cancel_remaining = cancel
def _branch_is_complete(self, my_task):
# Determine whether that branch is now completed by checking whether
# it has any waiting items other than myself in it.
skip = None
for task in TaskIterator(my_task, state=TaskState.NOT_FINISHED_MASK):
# If the current task is a child of myself, ignore it.
if skip is not None and task.is_descendant_of(skip):
continue
if task.task_spec == self:
skip = task
continue
return False
return True
def _check_threshold_unstructured(self, my_task):
# This method is extremely poorly named. It is called where there is no split task, but whether or not
# there is a known split is actually irrelevant. The distinction that actually needs to be made is
# "Do we have to look at unfinshed tasks to find out if any of the might pass through this task?" vs
# "Can we make a distinction solely by looking at our own completed inputs?"
def _branch_may_merge_at(self, task):
for child in task:
# The default threshold is the number of inputs.
threshold = valueof(my_task, self.threshold)
if threshold is None:
threshold = len(self.inputs)
# Find all places where this task spec is used and check whether enough inputs have completed to meet the threshold
# Omit building the list of waiting tasks unless they need to be cancelled if the threshold is met
waiting_tasks = []
completed = 0
spec_names = [ts.name for ts in self.inputs]
for task in TaskIterator(my_task.workflow.task_tree, end_at_spec=self.name):
if not task.task_spec.name in spec_names:
continue
if task.parent is None or task.has_state(TaskState.COMPLETED):
completed += 1
elif not task.has_state(TaskState.FINISHED_MASK) and self.cancel_remaining:
waiting_tasks.append(task)
if completed >= threshold:
may_fire = True
if not self.cancel_remaining:
break
else:
may_fire = False
# If the threshold was reached, get ready to fire.
return may_fire, waiting_tasks
def _branch_may_merge(self, task):
for child in TaskIterator(task, end_at_spec=self.name):
# Ignore tasks that were created by a trigger.
if child.triggered:
continue
@ -126,54 +145,19 @@ class Join(TaskSpec):
return True
return False
def _get_split_task(self, my_task):
# One Join spec may have multiple corresponding Task objects::
#
# - Due to the MultiInstance pattern.
# - Due to the ThreadSplit pattern.
#
# When using the MultiInstance pattern, we want to join across
# the resulting task instances. When using the ThreadSplit
# pattern, we only join within the same thread. (Both patterns
# may also be mixed.)
#
# We are looking for all task instances that must be joined.
# We limit our search by starting at the split point.
if self.split_task:
split_task = my_task.find_ancestor(self.split_task)
else:
split_task = my_task.workflow.task_tree
return split_task
def _branch_is_complete(self, task):
# Determine whether that branch is now completed by checking whether
# it has any waiting items other than myself in it.
for child in TaskIterator(task, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name):
if child.task_spec != self:
return False
return True
def _check_threshold_unstructured(self, my_task, force=False):
# The default threshold is the number of inputs.
threshold = valueof(my_task, self.threshold)
if threshold is None:
threshold = len(self.inputs)
# Look at the tree to find all places where this task is used.
tasks = []
for spec in self.inputs:
tasks.extend([t for t in my_task.workflow.get_tasks(spec_name=spec.name) if t.thread_id == my_task.thread_id])
# Look up which tasks have already completed.
waiting_tasks = []
completed = 0
for task in tasks:
if task.parent is None or task.has_state(TaskState.COMPLETED):
completed += 1
elif not task.has_state(TaskState.FINISHED_MASK):
waiting_tasks.append(task)
# If the threshold was reached, get ready to fire.
return force or completed >= threshold, waiting_tasks
def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):
# Retrieve a list of all activated tasks from the associated task that did the conditional parallel split.
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
msg = 'Join with %s, which was not reached' % self.split_task
raise WorkflowException(msg, task_spec=self)
raise WorkflowException(f'Split task {self.split_task} which was not reached', task_spec=self)
tasks = split_task.task_spec._get_activated_tasks(split_task, my_task)
# The default threshold is the number of branches that were started.
@ -184,38 +168,27 @@ class Join(TaskSpec):
# Look up which tasks have already completed.
waiting_tasks = []
completed = 0
for task in tasks:
if not self._branch_may_merge_at(task):
if self._branch_is_complete(task):
completed += 1
elif self._branch_is_complete(task):
elif not self._branch_may_merge(task):
completed += 1
else:
waiting_tasks.append(task)
# If the threshold was reached, get ready to fire.
return force or completed >= threshold, waiting_tasks
def _start(self, my_task, force=False):
"""
Checks whether the preconditions for going to READY state are met.
Returns True if the threshold was reached, False otherwise.
Also returns the list of tasks that yet need to be completed.
"""
if my_task.has_state(TaskState.FINISHED_MASK):
return False, None
if my_task.has_state(TaskState.READY):
return True, None
# Check whether we may fire.
if self.split_task is None:
return self._check_threshold_unstructured(my_task, force)
else:
return self._check_threshold_structured(my_task, force)
return completed >= threshold, waiting_tasks
def _update_hook(self, my_task):
# Check whether enough incoming branches have completed.
my_task._inherit_data()
may_fire, waiting_tasks = self._start(my_task)
if self.split_task is None:
may_fire, waiting_tasks = self._check_threshold_unstructured(my_task)
else:
may_fire, waiting_tasks = self._check_threshold_structured(my_task)
if may_fire:
# If this is a cancelling join, cancel all incoming branches except for the one that just completed.
if self.cancel_remaining:
@ -223,16 +196,17 @@ class Join(TaskSpec):
task.cancel()
# Update the state of our child objects.
self._do_join(my_task)
return True
elif not my_task.has_state(TaskState.FINISHED_MASK):
my_task._set_state(TaskState.WAITING)
return may_fire
def _find_tasks(self, my_task):
split_task = self._get_split_task(my_task) or my_task.workflow.task_tree
split_task = my_task.find_ancestor(self.split_task) or my_task.workflow.task_tree
# Identify all corresponding task instances within the thread.
thread_tasks = []
for task in TaskIterator(split_task, spec_name=self.name):
for task in TaskIterator(split_task, spec_name=self.name, end_at_spec=self.name):
# Ignore tasks from other threads.
if task.thread_id != my_task.thread_id:
continue

View File

@ -112,8 +112,7 @@ class ThreadMerge(Join):
if self.split_task and task.is_descendant_of(my_task):
continue
changed = task.parent.last_state_change
if last_changed is None \
or changed > last_changed.parent.last_state_change:
if last_changed is None or changed > last_changed.parent.last_state_change:
last_changed = task
tasks.append(task)

View File

@ -313,7 +313,7 @@ class Task(object):
def _inherit_data(self):
"""Copies the data from the parent."""
self.set_data(**self.parent.data)
self.set_data(**deepcopy(self.parent.data))
def _set_internal_data(self, **kwargs):
"""Defines the given attribute/value pairs in this task's internal data."""

View File

@ -218,13 +218,11 @@ class TaskIterator:
raise StopIteration()
task = self.task_list.pop(0)
if task.task_spec.name == self.end_at_spec:
self.task_list = []
elif all([
if all([
len(task._children) > 0,
task.state >= self.min_state,
self.depth < self.max_depth,
task.task_spec.name != self.end_at_spec,
]):
if self.depth_first:
self.task_list = task.children + self.task_list

View File

@ -16,6 +16,13 @@ We'll mainly focus on the engine, as it contains the interface with the library,
the other components. The engine is quite small and simple compared to the code required to handle user input and
display information in a terminal.
.. warning::
This application is *not* a robust application and won't be suitable for displaying large amounts of data, which
may cause it to crash. The application won't run unless your terminal is at least 13 lines high. It also may
randomly crash at other times as well. While I'll make improvements as I add more examples and bug reports and/or
fixes are always welcome, my focus is on using the library rather than the UI.
Configuration is set up in a python module and passed into the application with the `-e` argument, which loads the
configured engine from this file. This setup should make it relatively to change the behavior of engine. The
following configurations are included:
@ -123,7 +130,7 @@ We initialize a scripting enviroment:
.. code-block:: python
script_env = TaskDataEnvironment({'datetime': datetime })
>script_engine = PythonScriptEngine(script_env)
script_engine = PythonScriptEngine(script_env)
The :code:`PythonScriptEngine` handles execution of script tasks and evaluation of gateway and DMN conditions.
We'll create the script engine based on it; execution and evaluation will occur in the context of this enviroment.
@ -159,5 +166,6 @@ We then create our BPMN engine (:app:`engine/engine.py`) using each of these com
.. code-block:: python
from ..engine import BpmnEngine
engine = BpmnEngine(parser, serializer, handlers, script_env)
engine = BpmnEngine(parser, serializer, script_env)
The handlers are automatically passed to the curses UI by the main runner.

View File

@ -7,7 +7,27 @@ starts with a timer, the timer waits until the event occurs; this might be days
Of course, we can always check that it's waiting and serialize the workflow until that time. However, we might decide that
we don't want SpiffWorkflow to manage this at all. We could do this with a custom task spec.
First we'll create a new class
The code for this example can be found in :app:`misc/custom_start_event.py`.
There is a very simple diagram :bpmn:`timer_start.bpmn` with the process ID `timer_start` with a Start Event
with a Duration Timer of one day that can be used to illustrate how the custom task works. If you run this workflow
with any of the configurations provided, you'll see a `WAITING` Start Event; if you use the parser and serializer we
just created, you'll be propmted to complete the User Task immediately.
To run this model with the custom spec:
.. code:: python
./runner.py -e spiff_example.misc.custom_start_event add -p timer_start -b bpmn/tutorial/timer_start.bpmn
./runner.py -e spiff_example.misc.custom_start_event
First we'll create a new class.
.. note::
It might be better have the class's init method take both the event definition to use *and* the timer event
definition. Unfortunately, our parser is not terribly intuitive or easily extendable, so I've done it this
way to make this a little easier to follow.
.. code:: python
@ -27,7 +47,7 @@ First we'll create a new class
super().__init__(wf_spec, bpmn_id, event_definition, **kwargs)
self.timer_event = None
When we create our custom event, we'll check to see if we're creating a Start Event with a :code:`TimerEventDefinition`, and
When we create our custom spec, we'll check to see if we're creating a Start Event with a :code:`TimerEventDefinition`, and
if so, we'll replace it with a :code:`NoneEventDefinition`. There are three different types of Timer Events, so we'll use
the base class for all three to make sure we account for TimeDate, Duration, and Cycle.
@ -47,57 +67,44 @@ Whenever we create a custom task spec, we'll need to create a converter for it s
.. code:: python
from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer
from SpiffWorkflow.bpmn.serializer.default import EventConverter
from SpiffWorkflow.spiff.serializer.task_spec import SpiffBpmnTaskConverter
from SpiffWorkflow.spiff.serializer import DEFAULT_CONFIG
class CustomStartEventConverter(SpiffBpmnTaskConverter):
def __init__(self, registry):
super().__init__(CustomStartEvent, registry)
def to_dict(self, spec):
dct = super().to_dict(spec)
if spec.timer_event is not None:
dct['event_definition'] = self.registry.convert(spec.timer_event)
else:
dct['event_definition'] = self.registry.convert(spec.event_definition)
dct['event_definition'] = self.registry.convert(spec.event_definition)
dct['timer_event'] = self.registry.convert(spec.timer_event)
return dct
DEFAULT_CONFIG['task_specs'].remove(StartEventConverter)
DEFAULT_CONFIG['task_specs'].append(CustomStartEventConverter)
registry = BpmnWorkflowSerializer.configure(DEFAULT_CONFIG)
serializer = BpmnWorkflowSerializer(registry)
def from_dict(self, dct):
spec = super().from_dict(dct)
spec.event_definition = self.registry.restore(dct['event_definition'])
spec.timer_event = self.registry.restore(dct['timer_event'])
return spec
Our converter will inherit from the :code:`SpiffBpmnTaskConverter`, since that's our base generic BPMN mixin class.
The parent converter will handle serializing the standard BPMN attributes, as well as attributes added in the
:code:`spiff` package. There is a similar base converter in the :code:`bpmn.serializer.helpers` package.
The :code:`SpiffBpmnTaskConverter` itself inherits from
:code:`SpiffWorkflow.bpmn.serializer.helpers.task_spec.BpmnTaskSpecConverter`. which provides some helper methods for
extracting standard attributes from tasks; the :code:`SpiffBpmnTaskConverter` does the same for extensions from the
:code:`spiff` package.
A converter needs to implement two methods: :code:`to_dict` (which takes a task spec and returns a JSON-serializable
dictionary of its attributes) and :code:`from_dict` (which takes the dictionary and returns a task spec of the
appropriate type. We call the base method to do most of the work, and then update the result to reflect the changes
we made, in this case ensuring that both event definitions are handled. The parent converter also provides :code:`convert`
and :code:`restore` methods to serialize any object that Spiff's serializer knows how to handle. For more details about
the serializer, see :doc:`serialization`.
We don't have to do much -- all we do is replace the event definition with the original. The timer event will be
moved when the task is restored, and this saves us from having to write a custom parser.
When we create our serializer, we need to tell it about this task. The serializer is initialized with a mapping
of object class to converter class, so we just need to add an entry for this mapping.
.. note::
.. code:: python
It might be better have the class's init method take both the event definition to use *and* the timer event
definition. Unfortunately, our parser is not terribly intuitive or easily extendable, so I've done it this
way to make this a little easier to follow.
SPIFF_CONFIG[CustomStartEvent] = CustomStartEventConverter
registry = FileSerializer.configure(SPIFF_CONFIG)
serializer = FileSerializer(dirname, registry=registry)
When we create our serializer, we need to tell it about this task. We'll remove the converter for the standard Start
Event and add the one we created to the configuration. We then get a registry of classes that the serializer knows
about that includes our custom spec, as well as all the other specs and initialize the serializer with it.
.. note::
The reason there are two steps involved (regurning a registry and *then* passing it to the serializer) rather
that using the configuration directly is to allow further customization of the :code:`registry`. Workflows
can contain arbtrary data, we want to provide developers the ability to serialization code for any object. See
:ref:`serializing_custom_objects` for more information about how this works.
Finally, we have to update our parser:
We also have to tell the parser to use our class instead of the standard class.
.. code:: python
@ -114,10 +121,3 @@ will use. This is a bit unintuitive, but that's how it works.
Fortunately, we were able to reuse an existing Task Spec parser, which simplifies the process quite a bit.
Having created a parser and serializer, we could create a configuration module and instantiate an engine with these
components.
There is a very simple diagram :bpmn:`timer_start.bpmn` with the process ID `timer_start` with a Start Event
with a Duration Timer of one day that can be used to illustrate how the custom task works. If you run this workflow
with any of the configurations provided, you'll see a `WAITING` Start Event; if you use the parser and serializer we
just created, you'll be propmted to complete the User Task immediately.

View File

@ -124,7 +124,7 @@ Examples
--------
- :doc:`serialization`
- :doc:`custom_task_specs`
- :doc:`custom_task_spec`
DMN
===

View File

@ -16,15 +16,14 @@ Restricting the Script Environment
The following example replaces the default global enviroment with the one provided by
`RestrictedPython <https://restrictedpython.readthedocs.io/en/latest/>`_
We've modified our engine configuration to use the restricted environment in :app:`spiff/restricted.py`
We've modified our engine configuration to use the restricted environment in :app:`misc/restricted.py`
.. code:: python
from RestrictedPython import safe_globals
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import TaskDataEnvironment
from SpiffWorkflow.bpmn.script_engine import TaskDataEnvironment
restricted_env = TaskDataEnvironment(safe_globals)
restricted_script_engine = PythonScriptEngine(environment=restricted_env)
script_env = TaskDataEnvironment(safe_globals)
We've also included a dangerous process in :bpmn:`dangerous.bpmn`
@ -48,7 +47,8 @@ You'll get an error, because imports have been restricted.
.. note::
Since we used exactly the same parser and serializer, we can simply switch back and forth between these
two script engines (that is the only difference between the two configurations).
two script engines (that is the only difference between the two configurations). If you've made any
serializer or parser customizations, this is not likely to be possible.
Making Custom Classes and Functions Available
=============================================
@ -74,7 +74,8 @@ We are not going to actually include a database or API and write code for connec
it, but since we only have 7 products we can model our database with a simple dictionary lookup
and just return the same static info for shipping for the purposes of the tutorial.
We'll customize our scripting environment in :app:`spiff/custom_object.py`:
We'll create these "services" along with serialization methods in :app:`spiff/product_info.py` (see
:ref:`serializing_custom_objects` for more information about serialization):
.. code:: python
@ -97,12 +98,15 @@ We'll customize our scripting environment in :app:`spiff/custom_object.py`:
def lookup_shipping_cost(shipping_method):
return 25.00 if shipping_method == 'Overnight' else 5.00
We'll then make the "services" available to our scripting environment.
.. code:: python
script_env = TaskDataEnvironment({
'datetime': datetime,
'lookup_product_info': lookup_product_info,
'lookup_shipping_cost': lookup_shipping_cost,
})
script_engine = PythonScriptEngine(script_env)
.. note::
@ -131,19 +135,17 @@ engine, but through a different method, with the help of some custom extensions
The advantage of a Service Task is that it is a bit more transparent what is happening (at least at a conceptual level)
than function calls embedded in a Script Task.
We implement the :code:`PythonScriptEngine.call_service` method in :app:`spiff/service_task.py`:
We customize a scripting environment to implement the :code:`call_service` method in :app:`spiff/service_task.py`:
.. code:: python
service_task_env = TaskDataEnvironment({
'product_info_from_dict': product_info_from_dict,
'datetime': datetime,
})
class ServiceTaskEngine(PythonScriptEngine):
class ServiceTaskEnvironment(TaskDataEnvironment):
def __init__(self):
super().__init__(environment=service_task_env)
super().__init__(({
'datetime': datetime,
'product_info_from_dict': product_info_from_dict,
})
def call_service(self, operation_name, operation_params, task_data):
if operation_name == 'lookup_product_info':
@ -155,7 +157,7 @@ We implement the :code:`PythonScriptEngine.call_service` method in :app:`spiff/s
raise Exception("Unknown Service!")
return json.dumps(result)
service_task_engine = ServiceTaskEngine()
script_env = ServiceTaskEnvironment()
Instead of adding our custom functions to the environment, we'll override :code:`call_service` and call them directly
according to the `operation_name` that was given. The :code:`spiff` Service Task also evaluates the parameters
@ -207,3 +209,282 @@ To run this workflow:
./runner.py -e spiff_example.spiff.service_task add -p order_product \
-b bpmn/tutorial/{top_level_service_task,call_activity_service_task}.bpmn
Generating BPMN Events Inside the Scripting Environment
=======================================================
When calling external services, there is course a possibility that a failure could occur, and you might want to be
able to pass that information back into the workflow and define how to handle it there.
In this example, we'll have a service that displays the contents of a file and handles :code:`FileNotFoundError`. We'll
use the diagram :bpmn:`event_handler.bpmn` and the code in :app:`misc/event_handler.py`.
As in the previous section, we'll use the :code:`ServiceTask` from the :code:`spiff` package, but we'll need to extend
it. This is where we'll handle errors.
We define the following error in our XML (we can do this in our
`modeler <https://github.com/sartography/bpmn-js-spiffworkflow>`_):
.. code:: xml
<bpmn:error id="file_not_found" name="file_not_found" errorCode="1">
<bpmn:extensionElements>
<spiffworkflow:variableName>filename</spiffworkflow:variableName>
</bpmn:extensionElements>
</bpmn:error>
In our scripting enviroment, we'll implement a "read_file" service. This will of course raise an exception if the
requested file is missing, but will otherwise return the contents.
.. code:: python
class ServiceTaskEnvironment(TaskDataEnvironment):
def call_service(self, operation_name, operation_params, context):
if operation_name == 'read_file':
return open(operation_params['filename']).read()
else:
raise ValueError('Unknown Service')
And here is the code for our task spec.
.. code:: python
class EventHandlingServiceTask(ServiceTask):
def _execute(self, my_task):
script_engine = my_task.workflow.script_engine
# The param also has a type, but I don't need it
params = dict((name, script_engine.evaluate(my_task, p['value'])) for name, p in self.operation_params.items())
try:
result = script_engine.call_service(self.operation_name, params, my_task.data)
my_task.data[self._result_variable(my_task)] = result
return True
except FileNotFoundError as exc:
event_definition = ErrorEventDefinition('file_not_found', code='1')
event = BpmnEvent(event_definition, payload=params['filename'])
my_task.workflow.top_workflow.catch(event)
return False
except Exception as exc:
raise WorkflowTaskException('Service Task execution error', task=my_task, exception=exc)
If the file was read successfully, we'll set a variable in our task data with the result (the name of the result variable
is optionally specified in the XML and the :code:`_result_variable` method returns either the specified name or a calculated
name otherwise). We return :code:`True` because the operation was a success (see :doc:`../concepts` for more information
about state transitions).
We'll catch :code:`FileNotFoundError` and construct an event to send it back to the workflow. What we generate needs
to match what's in the XML.
.. note::
If you are building an application, you'll probably need to manage known exceptions in a way that is accesible to
both your modeler and your execution engine, but here we'll just show how to build the event so that it can be
caught in the diagram in the task spec.
We have to construct an :code:`EventDefinition` that matches what will be generated from the parsed XML (see
:ref:`events` for a general overview of BPMN event handling). SpiffWorkflow uses the :code:`EventDefinition` to
determine whether a particular task handles an event. The BPMN spec allows certain events, including Error Events, to
optionally contain a payload. In this case, we'll set the payload to be the name of the missing file, which can then be
displayed to the user.
We pass our contructed event to the workflow's :code:`catch` method, which will check to see if there are any tasks
waiting for this event. Each task has a reference to its workflow, but this task occurs in a subworkflow. Event
handling is done at the outermost level so we'll use :code:`my_task.workflow.top_workflow` to get access to the top
level.
We'll return :code:`False`, since the operation was not a success; this will prevent task execution on that branch,
but will not halt overall workflow execution. An unhandled exception, as in the last case, will cause the entire
workflow to halt.
.. note::
The task spec is not the only place error handling could be implemented. I kind of like this approach, as the task
spec defines the behavior for a particular type of task and this is part of that. It would also be possible to extend
the :code:`PythonScriptEngine` to handle the errors. The main reason I didn't do that here is that this example
application can be made less complex if only a scripting environment is supplied. The script engine, unlike the script
enviroment, has access to the task and workflow (via the task), and the same thing could be done there as well.
To load this example:
.. code:: console
./runner.py -e spiff_example.misc.event_handler add -p read_file -b bpmn/tutorial/event_handler.bpmn
./runner.py -e spiff_example.misc.event_handler
.. note::
When running this example, it will probably useful to change the task filter so that all tasks are visible. Set
the state to `ANY_MASK` to see all tasks.
Threaded Service Task
=====================
Suppose that we have some potentially time-consuming tasks and we want to execute them in threads so that we aren't
blocking the entire workflow from executing while it runs (the default behavior). In this section, we'll customize a
scripting enviroment that contains a thread pool.
First let's write a simple "service" that simply waits.
.. code:: python
def wait(seconds, job_id):
time.sleep(seconds)
return f'{job_id} slept {seconds} seconds'
We'll make this "service" available in our environment:
.. code:: python
class ServiceTaskEnvironment(TaskDataEnvironment):
def __init__(self):
super().__init__()
self.pool = ThreadPoolExecutor(max_workers=10)
self.futures = {}
def call_service(self, operation_name, operation_params, context):
if operation_name == 'wait':
seconds = randrange(1, 30)
return self.pool.submit(wait, seconds, operation_params['job_id'])
else:
raise ValueError("Unknown Service!")
Our service will return a future, and we'll manage these futures via a custom task spec. The parent class is the
Service Task of the :code:`spiff` package, which provides us with an :code:`operation_name` and
:code:`operation_parameters`. Each parameter has a name and a type, but I don't need the type, so I'll just get the
values. The values are expressions that we evaluate against the task data. We'll map the future to the task in the script
environment.
.. code:: python
class ThreadedServiceTask(ServiceTask):
def _execute(self, my_task):
script_engine = my_task.workflow.script_engine
params = dict((name, script_engine.evaluate(my_task, p['value'])) for name, p in self.operation_params.items())
try:
future = script_engine.call_service(self.operation_name, params, my_task.data)
script_engine.environment.futures[future] = my_task
except Exception as exc:
raise WorkflowTaskException('Service Task execution error', task=my_task, exception=exc)
Since our :code:`_execute` method returns :code:`None`, our task will transition to a :code:`STARTED` state (see
:doc:`../concepts` for more information about state transitions). SpiffWorkflow will ignore this task from this point on;
this means our engine has to take over.
We'll extend the :code:`Instance` class (defined in :app:`engine/instance.py`) to also check these futures when waiting
tasks are refreshed. As jobs complete, we'll call :code:`task.complete` to mark the task :code:`COMPLETED`. The workflow
will then be able to continue down that branch.
.. code:: python
class ThreadInstance(Instance):
def update_completed_futures(self):
futures = self.workflow.script_engine.environment.futures
finished = [f for f in futures if f.done()]
for future in finished:
task = futures.pop(future)
result = future.result()
task.data[task.task_spec._result_variable(task)] = result
task.complete()
def run_ready_events(self):
self.update_completed_futures()
super().run_ready_events()
.. note::
In a real application, you would probably want a separate service keeping track of the jobs and checking the
futures rather than polling in the engine, but that can't be easily set up in this example application.
To load and run thie example (as in the previous example, it is probably a good idea to update the task filter to show all
tasks with the `ANY_MASK` state.
.. code:: console
./runner.py -e spiff_example.misc.threaded_service_task add -p threaded_service -b bpmn/tutorial/threaded_service_task.bpmn
./runner.py -e spiff_example.misc.threaded_service_task
Executing Scripts in a Subprocess
=================================
In this section, we'll show how you might execute your scripts outside of the workflow execution context. This ia a little
contrived and there are undoubtedly better ways to accomplish it, but this has the advantage of being very simple.
First we'll create an executable that can take a JSON-serialized context and an expression to evaluate or a script to execute
(see :app:`spiff/subprocess_engine.py`). This little program simply replicates the behavior of the default
script engine.
We import our custom function here rather than our workflow's engine. We'll also import the registry used by our serializer;
we need to be able to generate JSON when we write our output, so we might as well reuse what we have.
.. code:: python
from .custom_exec import (
lookup_product_info,
lookup_shipping_cost,
registry,
)
This emulates how the default script engine handles evaluation and execution.
.. code:: python
local_ctx = registry.restore(json.loads(args.context))
global_ctx = globals()
global_ctx.update(local_ctx)
if args.external is not None:
global_ctx.update(registry.restore(json.loads(args.external)))
if args.method == 'eval':
result = eval(args.expr, global_ctx, local_ctx)
elif args.method == 'exec':
exec(args.script, global_ctx, local_ctx)
result = local_ctx
print(json.dumps(registry.convert(result)))
Then we'll tell our scripting enviroment to use the script rather directly invoke :code:`eval` and :code:`exec`.
.. code:: python
class SubprocessScriptingEnvironment(BasePythonScriptEngineEnvironment):
def __init__(self, executable, serializer, **kwargs):
super().__init__(**kwargs)
self.executable = executable
self.serializer = serializer
def evaluate(self, expression, context, external_context=None):
output = self.run(['eval', expression], context, external_context)
return self.parse_output(output)
def execute(self, script, context, external_context=None):
output = self.run(['exec', script], context, external_context)
DeepMerge.merge(context, self.parse_output(output))
return True
def run(self, args, context, external_context):
cmd = ['python', '-m', self.executable] + args + ['-c', json.dumps(registry.convert(context))]
if external_context is not None:
cmd.extend(['-x', json.dumps(registry.convert(external_context))])
return subprocess.run(cmd, capture_output=True)
def parse_output(self, output):
if output.stderr:
raise Exception(output.stderr.decode('utf-8'))
return registry.restore(json.loads(output.stdout))
executable = 'spiff_example.spiff.subprocess_engine'
script_env = SubprocessScriptingEnvironment(executable, serializer)
To load this example:
.. code:: console
./runner.py -e spiff_example.spiff.custom_exec add -p order_product \
-b bpmn/tutorial/{top_level_script,call_activity_script}.bpmn
./runner.py -e spiff_example.spiff.custom_exec

View File

@ -9,7 +9,7 @@ From the :code:`start_workflow` method of our BPMN engine (:app:`engine/engine.p
spec, sp_specs = self.serializer.get_workflow_spec(spec_id)
wf = BpmnWorkflow(spec, sp_specs, script_engine=self._script_engine)
wf_id = self.serializer.create_workflow(wf, spec_id)
return wf_id
return Instance(wf_id, workflow)
We'll use our serializer to recreate the workflow spec based on the id. As discussed in :ref:`parsing_subprocesses`,
a process has a top level specification and dictionary of process id -> spec containing any other processes referenced
@ -28,6 +28,8 @@ In the simplest case, running a workflow involves implementing the following loo
until there are no tasks left to complete.
We'll refer to code from :app:`engine/instance.py` in the next few sections.
Here are our engine methods:
.. code-block:: python
@ -54,7 +56,12 @@ it catches whatever event it is waiting on, at which point it becomes :code:`REA
:code:`workflow.refresh_waiting_tasks` method iterates over all the waiting tasks and changes the state to :code:`READY`
if the conditions for doing so have been met.
We'll cover using the `workflow.get_next_task` method and handling Human tasks later in this document.
We'll cover using the :code:`workflow.get_next_task` method and handling Human tasks later in this document.
.. note::
The :code:`Instance` class also has a task filter attribute and a list of filtered tasks, which are used
by the UI, so we update that in these methods as weill.
Tasks
=====
@ -72,6 +79,14 @@ don't have to pay much attention to most of them. A few of the important ones a
* `description`: we use this attribute to provide a description of the BPMN task type
* `manual`: :code:`True` if human input is required to complete tasks associated with this Task Spec
The :code:`manual` attribute is particularly important, because SpiffWorkflow does not include built-in
handling of these tasks so you'll need to implement this as part of your application. We'll go over how this is
handled in this application in the next section.
.. note::
NoneTasks (BPMN tasks with no more specific type assigned) are treated as Manual Tasks by SpiffWorkflow.
BPMN Task Specs have the following additional attributes.
* `bpmn_id`: the ID of the BPMN Task (this will be :code:`None` if the task is not visible on the diagram)
@ -80,16 +95,17 @@ BPMN Task Specs have the following additional attributes.
* `documentation`: the contents of the BPMN `documentation` element for the Task
In the example application, we use these :code:`bpmn_name` (or :code:`name` when a :code:`bpmn_name` isn't specified),
and :code:`lane` to display information about the tasks in a workflow (see the :code:`update_task_tree` method of
:app:`curses_ui/workflow_view.py`).
and :code:`lane` to display information about the tasks in a workflow:
The :code:`manual` attribute is particularly important, because SpiffWorkflow does not include built-in
handling of these tasks so you'll need to implement this as part of your application. We'll go over how this is
handled in this application in the next section.
.. code:: python
.. note::
NoneTasks (BPMN tasks with no more specific type assigned) are treated as Manual Tasks by SpiffWorkflow.
def get_task_display_info(self, task):
return {
'depth': task.depth,
'state': TaskState.get_name(task.state),
'name': task.task_spec.bpmn_name or task.task_spec.name,
'lane': task.task_spec.lane,
}
Instantiated Tasks
------------------
@ -125,14 +141,14 @@ Our User and Manual Task handlers render the instructions (this code is from :ap
from jinja2 import Template
def get_instructions(self):
instructions = f'{self.task.task_spec.bpmn_name}\n\n'
def set_instructions(self, task):
user_input = self.ui._states['user_input']
user_input.instructions = f'{self.task.task_spec.bpmn_name}\n\n'
text = self.task.task_spec.extensions.get('instructionsForEndUser')
if text is not None:
template = Template(text)
instructions += template.render(self.task.data)
instructions += '\n\n'
return instructions
user_input.instructions += template.render(self.task.data)
user_input.instructions += '\n\n'
We're not going to attempt to handle Markdown in a curses UI, so we'll assume we just have text. However, we do
want to be able to incorporate data specific to the workflow in information that is presented to a user; this is
@ -149,20 +165,19 @@ We won't go into the details about how the form screen works, as it's specific t
library itself; instead we'll skip to the code that runs the task after it has been presented to the user; any
application needs to do this.
Simply running the task is sufficient for Manual Tasks.
When our form is submitted, we ask our :code:`Instance` to update the task data (if applicable, as in the case of a
form) and run the task.
.. code-block:: python
def on_complete(self, results):
self.task.run()
However, we need to extend this method for User Tasks, to incorporate the user-submitted data into the workflow:
.. code-block:: python
def on_complete(self, results):
self.task.set_data(**results)
super().on_complete(results)
def run_task(self, task, data=None):
if data is not None:
task.set_data(**data)
task.run()
if not self.step:
self.run_until_user_input_required()
else:
self.update_task_filter()
Here we are setting a key for each field in the form. Other possible options here are to set one key that contains
all of the form data, or map the schema to Python class and use that in lieu of a dictionary. It's up to you to
@ -174,7 +189,7 @@ simple example next.
We'll refer to the process modeled in :bpmn:`task_types.bpmn` contains a simple form which asks a user to input a
product and quantity as well a manual task presenting the order information at the end of the process (the form is
defined in :form:`select_product_and_quantity.json`
defined in :form:`select_product_and_quantity.json`)
After the user submits the form, we'll collect the results in the following dictionary:
@ -227,13 +242,15 @@ Filtering Tasks
SpiffWorkflow has two methods for retrieving tasks:
- :code:`workflow.get_tasks`: returns a list of matching tasks, or an empty list
- :code:`workflow.get_tasks`: returns an iterator over matching tasks, or an empty list
- :code:`workflow.get_next_task`: returns the first matching task, or None
Both of these methods use the same helper classes and take the same arguments -- the only difference is the return
type.
Both of these methods use the same helper classes and take the same arguments -- the only difference is the return type.
These methods return a :code:`TaskIterator`, which in turn uses a :code:`TaskFilter` to determine what tasks match.
These methods create a :code:`TaskIterator`. The an optional first argument of a task to begin the iteration from (if it is
not provided, iteration begins are the root). This is useful if you know you want to continue executing a workflow from a
particular place. The remainder of the arguments are keyword arguments that are passed directly into a :code:`TaskFilter`,
which will determine which tasks match.
Tasks can be filtered by:
@ -260,6 +277,9 @@ correspond to which states).
from SpiffWorkflow.util.task import TaskState
We can use this object to translate an integer to a human-readable name using :code:`TaskState.get_name(task.state)`;
there is also a corresponding :code:`TaskState.get_value` method that goes from name to integer.
Ready Human Tasks
^^^^^^^^^^^^^^^^^
@ -336,6 +356,8 @@ Additionally, the class has a few extra attributes to make it more convenient to
These methods exist on the top level workflow as well, and return :code:`None`.
.. _events:
Events
======
@ -373,3 +395,5 @@ of event and might be used to help determine this.
Once you have determined which workflow should receive the event, you can pass it to :code:`workflow.catch` to handle
it.
In :doc:`script_engine`, there is an example of how to create an event and pass it back to a workflow when executing
a Service Task; this shows how you might construct a :code:`BpmnEvent` to pass to :code:`workflow.catch`.

View File

@ -21,16 +21,17 @@ SpiffWorkflow consists of two different categories of objects:
- **Specification objects**, which represent definitions of structure and behavior and derive from :code:`WorkflowSpec` and :code:`TaskSpec`
- **Instance objects**, which represent the state of a running workflow (:code:`Workflow`/:code:`BpmnWorkflow` and :code:`Task`)
In the workflow context, a specification is model of the workflow, an abstraction that describes *every path that could
be taken whenever the workflow is executed*. An instance is a particular instantiation of a specification. It describes *the
current state* or *the path(s) that were actually taken when the workflow ran*.
For workflows, a specification is model of the workflow, an abstraction that describes *every path that could
be taken whenever the workflow is executed*. An instance is an execution of a specification. It describes *the current state*
or *the path(s) that were actually taken when the workflow ran*.
In the task context, a specification is a model for how a task behaves. It describes the mechanisms for deciding *whether
For tasks, a specification is a model for how a task behaves. It describes the mechanisms for deciding *whether
there are preconditions for running an associated task*, *how to decide whether they are met*, and *what it means to complete
(successfully or unsuccessfully)*. An instance describes the *state of the task, as it pertains to a particular workflow* and
*contains the data used to manage that state*.
(successfully or unsuccessfully)*. An instance describes the *state of the task, as it pertains to the workflow it is part of*
and *contains the data used to manage that state*.
Specifications are unique, whereas instances are not. There is *one* model of a workflow, and *one* specification for a particular task.
Specifications are unique, whereas instances are not. There is *one* model of a workflow, and *one* specification for a
particular task. The model can be executed many times, and within one execution, a task spec may also be reached many times.
Imagine a workflow with a loop. The loop is defined once in the specification, but there can be many tasks associated with
each of the specs that comprise the loop.
@ -104,7 +105,7 @@ to Task State. These hooks are:
* `run_hook`: This method implements the task's behavior when it is run, returning:
- :code:`True` if the task completed successfully. The state will transition to **COMPLETED**.
- :code:`False` if the task completed unsucessfully. The state will transition to **ERRROR**.
- :code:`False` if the task completed unsucessfully. The state will transition to **ERROR**.
- :code:`None` if the task has not completed. The state will transition to **STARTED**.
* `_on_complete_hook`: This method will be run when the task's state is changed to **COMPLETED**.

View File

@ -37,7 +37,7 @@ extensions = ['sphinx.ext.autodoc',
]
# Configure links to example repo
branch = 'improvement/better-interactive-workflow-runner'
branch = 'main'
extlinks = {
'example': (f'https://github.com/sartography/spiff-example-cli/tree/{branch}/' + '%s', '%s'),
'bpmn': (f'https://github.com/sartography/spiff-example-cli/tree/{branch}/bpmn/tutorial/' + '%s', '%s'),

View File

@ -22,7 +22,7 @@ What is SpiffWorkflow?
SpiffWorkflow is the workflow library underlying `Spiff Arena <https://github.com/sartography/spiff-arena>`_.
It consists of a generic core library, with packages supporting parsing and execution of BPMN diagrams that extend
It consists of a generic core library, with modules supporting parsing and execution of BPMN diagrams that extend
this core.
Extensive documentation about BPMN and how SpiffWorkflow interprets it, as well as information about custom extensions

View File

@ -16,7 +16,7 @@ capabilities.
- Specs implementations are in :code:`specs`
- Workflow implementation is in :code:`workflow.py`
- Task implementation is in :code:`task.py`, with utilities for iteration and filtering in :code:`util.task.py`
- Task implementation is in :code:`task.py`, with utilities for iteration and filtering in :code:`util/task.py`
It is documented in :doc:`core/index`.

View File

@ -35,7 +35,7 @@ class CollaborationTest(BpmnWorkflowTestCase):
buddy = self.workflow.get_next_task(spec_name='process_buddy')
self.assertIsInstance(buddy.task_spec, CallActivityMixin)
self.assertEqual(buddy.task_spec.spec, 'process_buddy')
self.assertEqual(buddy.state, TaskState.WAITING)
self.assertEqual(buddy.state, TaskState.STARTED)
def testBpmnMessage(self):

View File

@ -61,9 +61,8 @@ class CallActivityDataTest(BpmnWorkflowTestCase):
self.assertNotIn('unused', task.data)
self.complete_subprocess()
# Refreshing causes the subprocess to become ready
self.workflow.refresh_waiting_tasks()
task = self.workflow.get_next_task(state=TaskState.READY)
# This is the subprocess
task = self.workflow.get_next_task(spec_name='Activity_1wdjypm')
# Originals should not change
self.assertEqual(task.data['in_1'], 1)
self.assertEqual(task.data['in_2'], "hello world")
@ -74,11 +73,11 @@ class CallActivityDataTest(BpmnWorkflowTestCase):
def advance_to_subprocess(self):
# Once we enter the subworkflow it becomes a waiting task
waiting = self.workflow.get_tasks(state=TaskState.WAITING)
while len(waiting) == 0:
started = self.workflow.get_tasks(state=TaskState.STARTED)
while len(started) == 0:
next_task = self.workflow.get_next_task(state=TaskState.READY)
next_task.run()
waiting = self.workflow.get_tasks(state=TaskState.WAITING)
started = self.workflow.get_tasks(state=TaskState.STARTED)
def complete_subprocess(self):
# Complete the ready tasks in the subprocess

View File

@ -49,7 +49,7 @@ class NestedProcessesTest(BpmnWorkflowTestCase):
sub = [t for t in self.workflow.get_tasks() if t.task_spec.bpmn_name == 'Nested level 1'][0]
self.workflow.reset_from_task_id(task.id)
self.assertEqual(task.state, TaskState.READY)
self.assertEqual(sub.state, TaskState.WAITING)
self.assertEqual(sub.state, TaskState.STARTED)
self.assertEqual(len(self.workflow.subprocesses), 1)
task.run()
@ -68,7 +68,7 @@ class NestedProcessesTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(len(self.workflow.subprocesses), 1)
self.assertEqual(task.state, TaskState.WAITING)
self.assertEqual(task.state, TaskState.STARTED)
self.complete_task('Action2', True)
self.complete_task('Action3', True)
self.assertTrue(self.workflow.is_completed())

View File

@ -67,7 +67,7 @@ class ResetTokenOnBoundaryEventTest(BpmnWorkflowTestCase):
# The task we returned to should be ready, the subprocess should be waiting, the final task should be future
sub = self.workflow.get_next_task(spec_name='subprocess')
self.assertEqual(sub.state, TaskState.WAITING)
self.assertEqual(sub.state, TaskState.STARTED)
self.assertEqual(task.state, TaskState.READY)
final = self.workflow.get_next_task(spec_name='Final')
self.assertEqual(final.state, TaskState.FUTURE)

View File

@ -40,7 +40,8 @@ class ActionManagementTest(BpmnWorkflowTestCase):
time.sleep(self.START_TIME_DELTA)
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step("Start Work")
@ -62,18 +63,21 @@ class ActionManagementTest(BpmnWorkflowTestCase):
time.sleep(self.START_TIME_DELTA)
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step("Start Work")
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual('Finish Time', self.workflow.get_tasks(state=TaskState.WAITING)[1].task_spec.bpmn_name)
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual('Finish Time', self.workflow.get_next_task(state=TaskState.WAITING).task_spec.bpmn_name)
time.sleep(self.FINISH_TIME_DELTA)
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()
self.assertEqual(3, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertNotEqual('Finish Time', self.workflow.get_tasks(state=TaskState.WAITING)[0].task_spec.bpmn_name)
overdue_escalation_task = [
@ -109,10 +113,12 @@ class ActionManagementTest(BpmnWorkflowTestCase):
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
time.sleep(self.START_TIME_DELTA)
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step("Start Work")

View File

@ -24,7 +24,8 @@ class MessageInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something In a Subprocess')
self.workflow.do_engine_steps()
@ -35,7 +36,7 @@ class MessageInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughInterruptSaveAndRestore(self):
@ -46,7 +47,8 @@ class MessageInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.workflow.do_engine_steps()
@ -57,4 +59,4 @@ class MessageInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())

View File

@ -24,7 +24,8 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something That Takes A Long Time')
self.save_restore()
@ -35,7 +36,7 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterruptSaveAndRestore(self):
@ -46,14 +47,15 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.save_restore()
self.workflow.do_engine_steps()
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_exclusive_step('Acknowledge Interrupt Message')
@ -61,7 +63,7 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.save_restore()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughHappy(self):
@ -70,7 +72,8 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something That Takes A Long Time')
@ -78,7 +81,7 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterrupt(self):
@ -87,15 +90,16 @@ class MessageInterruptsTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_exclusive_step('Acknowledge Interrupt Message')
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())

View File

@ -24,7 +24,8 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something That Takes A Long Time')
self.save_restore()
@ -35,7 +36,7 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterruptSaveAndRestore(self):
@ -46,13 +47,15 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step('Acknowledge Non-Interrupt Message')
@ -68,7 +71,7 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughHappy(self):
@ -77,7 +80,8 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something That Takes A Long Time')
@ -85,7 +89,7 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterrupt(self):
@ -94,24 +98,27 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step('Acknowledge Non-Interrupt Message')
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(3, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_named_step('Do Something That Takes A Long Time')
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterruptOtherOrder(self):
@ -120,12 +127,14 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.workflow.do_engine_steps()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step('Do Something That Takes A Long Time')
@ -136,7 +145,7 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.do_next_named_step('Acknowledge Non-Interrupt Message')
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageInterruptOtherOrderSaveAndRestore(self):
@ -148,13 +157,15 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY)))
self.do_next_named_step('Do Something That Takes A Long Time')
@ -167,4 +178,4 @@ class MessageNonInterruptTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())

View File

@ -24,7 +24,8 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.do_next_exclusive_step('Do Something In a Subprocess')
self.workflow.do_engine_steps()
@ -35,7 +36,7 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageSaveAndRestore(self):
@ -46,7 +47,8 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
@ -63,7 +65,7 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageOrder2SaveAndRestore(self):
@ -74,7 +76,8 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
self.do_next_named_step('Do Something In a Subprocess')
@ -90,7 +93,7 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughMessageOrder3SaveAndRestore(self):
@ -101,7 +104,8 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY)))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
@ -118,4 +122,4 @@ class MessageNonInterruptsSpTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())

View File

@ -21,7 +21,8 @@ class MessagesTest(BpmnWorkflowTestCase):
self.do_next_exclusive_step('Select Test', choice='Messages')
self.workflow.do_engine_steps()
self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Wrong Message'), {}))
self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
@ -30,7 +31,7 @@ class MessagesTest(BpmnWorkflowTestCase):
self.assertEqual('Test Message', self.workflow.get_tasks(state=TaskState.READY)[0].task_spec.bpmn_name)
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())
def testRunThroughSaveAndRestore(self):
@ -41,7 +42,8 @@ class MessagesTest(BpmnWorkflowTestCase):
self.save_restore()
self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY))
self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED)))
self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING)))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Wrong Message'), {}))
self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY))
self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {}))
@ -50,4 +52,4 @@ class MessagesTest(BpmnWorkflowTestCase):
self.save_restore()
self.workflow.do_engine_steps()
self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING)))
self.assertTrue(self.workflow.is_completed())

View File

@ -22,13 +22,13 @@ class TimerDurationTest(BpmnWorkflowTestCase):
def actual_test(self,save_restore = False):
self.workflow.do_engine_steps()
ready_tasks = self.workflow.get_tasks(state=TaskState.READY)
ready_tasks[0].run()
ready_tasks = self.workflow.get_next_task(state=TaskState.READY)
ready_tasks.run()
self.workflow.do_engine_steps()
loopcount = 0
# test bpmn has a timeout of .03s; we should terminate loop before that.
while len(self.workflow.get_tasks(state=TaskState.WAITING)) == 2 and loopcount < 11:
while len(self.workflow.get_tasks(state=TaskState.WAITING)) == 1 and loopcount < 11:
if save_restore:
self.save_restore()
time.sleep(0.01)

View File

@ -30,9 +30,9 @@ class CallActivityMessageTest(BaseTestCase):
self.workflow.do_engine_steps()
ready_tasks = self.workflow.get_tasks(state=TaskState.READY)
waiting_tasks = self.workflow.get_tasks(state=TaskState.WAITING)
started_tasks = self.workflow.get_tasks(state=TaskState.STARTED)
self.assertEqual(1, len(ready_tasks),'Expected to have one ready task')
self.assertEqual(2, len(waiting_tasks), 'Expected to have two waiting tasks')
self.assertEqual(2, len(started_tasks), 'Expected to have two started tasks')
for step in steps:
current_task = ready_tasks[0]

View File

@ -43,7 +43,7 @@ class DepthFirstTest(IterationTest):
tasks = self.workflow.get_tasks(end_at_spec='c')
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'a1', 'last', 'End', 'a2', 'last', 'End', 'c']
['Start', 'a', 'a1', 'last', 'End', 'a2', 'last', 'End', 'c', 'b', 'b1', 'last', 'End', 'b2', 'last', 'End']
)
def test_get_tasks_max_depth(self):
@ -67,7 +67,7 @@ class BreadthFirstTest(IterationTest):
tasks = self.workflow.get_tasks(end_at_spec='c', depth_first=False)
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'b', 'a1', 'a2', 'c']
['Start', 'a', 'b', 'a1', 'a2', 'c', 'b1', 'b2', 'last', 'last', 'last', 'last', 'End', 'End', 'End', 'End']
)
def test_get_tasks_max_depth(self):

View File

@ -3,7 +3,8 @@ import os
from lxml import etree
from SpiffWorkflow.dmn.engine.DMNEngine import DMNEngine
from SpiffWorkflow.dmn.parser.DMNParser import DMNParser, get_dmn_ns
from SpiffWorkflow.dmn.parser.DMNParser import DMNParser
from SpiffWorkflow.bpmn.parser.node_parser import DEFAULT_NSMAP
class WorkflowSpec:
def __init__(self):
@ -38,7 +39,12 @@ class DecisionRunner:
with open(fn) as fh:
node = etree.parse(fh)
self.dmnParser = DMNParser(None, node.getroot(), get_dmn_ns(node.getroot()))
nsmap = DEFAULT_NSMAP.copy()
nsmap.update(node.getroot().nsmap)
if None in nsmap:
nsmap['dmn'] = nsmap.pop(None)
self.dmnParser = DMNParser(None, node.getroot(), nsmap)
self.dmnParser.parse()
decision = self.dmnParser.decision

View File

@ -9,6 +9,7 @@ class DmnVersionTest(unittest.TestCase):
def setUp(self):
self.parser = BpmnDmnParser()
self.parser.namespaces.update({'dmn': 'https://www.omg.org/spec/DMN/20191111/MODEL/'})
def test_load_v1_0(self):
filename = os.path.join(data_dir, 'dmn_version_20151101_test.dmn')