Tasks
Tasks are the building blocks of processes
Creating a Task
To create a task you must first create a ZeebeWorker
or ZeebeTaskRouter
instance.
@worker.task(task_type="my_task")
async def my_task():
return {}
This is a task that does nothing. It receives no parameters and also doesn’t return any.
Note
While this task indeed returns a python dictionary, it doesn’t return anything to Zeebe. To do that we have to fill the dictionary with values.
Async/Sync Tasks
Tasks can be regular or async functions. If given a regular function, pyzeebe will convert it into an async one by running asyncio.loop.run_in_executor()
Note
Make sure not to call any blocking function in an async task. This would slow the entire worker down.
Do:
@worker.task(task_type="my_task")
def my_task():
time.sleep(10) # Blocking call
return {}
Don’t:
@worker.task(task_type="my_task")
async def my_task():
time.sleep(10) # Blocking call
return {}
Task Exception Handler
An exception handler’s signature:
Callable[[Exception, Job, JobController], Awaitable[None]]
In other words: an exception handler is a function that receives an Exception
,
Job
instance and JobController
(a pyzeebe class).
The exception handler is called when the task has failed.
To add an exception handler to a task:
from pyzeebe import Job, JobController
async def my_exception_handler(exception: Exception, job: Job, job_controller: JobController) -> None:
print(exception)
await job_controller.set_failure_status(job, message=str(exception))
@worker.task(task_type="my_task", exception_handler=my_exception_handler)
def my_task():
raise Exception()
Now every time my_task
is called (and then fails), my_exception_handler
is called.
What does job_controller.set_failure_status do?
This tells Zeebe that the job failed. The job will then be retried (if configured in process definition).
Note
The exception handler can also be set via pyzeebe.ZeebeWorker
or pyzeebe.ZeebeTaskRouter
.
Pyzeebe will try to find the exception handler in the following order:
Worker
-> Router
-> Task
-> pyzeebe.default_exception_handler()
Task timeout
When creating a task one of the parameters we can specify is timeout
.
@worker.task(task_type="my_task", timeout=20000)
def my_task(input: str):
return {"output": f"Hello World, {input}!"}
Here we specify a timeout of 20000 milliseconds (20 seconds). If the job is not completed within this timeout, Zeebe will reactivate the job and another worker will take over.
The default value is 10000 milliseconds or 10 seconds.
Be sure to test your task’s time and adjust the timeout accordingly.
Tasks that don’t return a dictionary
Sometimes we want a task to return a singular JSON value (not a dictionary).
To do this we can set the single_value
parameter to True
.
@worker.task(task_type="my_task", single_value=True, variable_name="y")
def my_task(x: int) -> int:
return x + 1
This will create a task that receives parameter x
and returns an integer called y
.
So the above task is in fact equal to:
@worker.task(task_type="my_task")
def my_task(x: int) -> dict:
return {"y": x + 1}
This can be helpful when we don’t want to read return values from a dictionary each time we call the task (in tests for example).
Note
The parameter variable_name
must be supplied if single_value
is true. If not given a NoVariableNameGiven
will be raised.
Accessing the job object directly
It is possible to receive the job object as a parameter inside a task function. Simply annotate the parameter with the pyzeebe.Job
type.
Example:
from pyzeebe import Job
@worker.task(task_type="my_task")
async def my_task(job: Job):
print(job.process_instance_key)
return {**job.custom_headers}