Worker Reference

The ZeebeWorker class inherits from ZeebeTaskRouter class. This means that all methods that ZeebeTaskRouter has will also appear in ZeebeWorker.

class pyzeebe.ZeebeTaskRouter(before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
__init__(before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
Parameters:
  • before (list[TaskDecorator]) – Decorators to be performed before each task

  • after (list[TaskDecorator]) – Decorators to be performed after each task

  • exception_handler (ExceptionHandler) – Handler that will be called when a job fails.

after(*decorators: Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]) None

Add decorators to be performed after a job is run

Parameters:

decorators (Iterable[TaskDecorator]) – The decorators to be performed after each job is run

before(*decorators: Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]) None

Add decorators to be performed before a job is run

Parameters:

decorators (Iterable[TaskDecorator]) – The decorators to be performed before each job is run

exception_handler(exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]]) None

Add exception handler to be called when a job fails

Parameters:

exception_handler (ExceptionHandler) – Handler that will be called when a job fails.

get_task(task_type: str) Task

Get a task by its type

Parameters:

task_type (str) – The type of the wanted task

Returns:

The wanted task

Return type:

Task

Raises:

TaskNotFoundError – If no task with specified type exists

remove_task(task_type: str) Task

Remove a task

Parameters:

task_type (str) – The type of the wanted task

Returns:

The task that was removed

Return type:

Task

Raises:

TaskNotFoundError – If no task with specified type exists

task(task_type: str, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None, variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, *, single_value: Literal[False] = False) Callable[[Callable[[P], RD] | Callable[[P], Awaitable[RD]]], Callable[[P], RD] | Callable[[P], Awaitable[RD]]]
task(task_type: str, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None, variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, *, single_value: Literal[True], variable_name: str) Callable[[Callable[[P], R] | Callable[[P], Awaitable[R]]], Callable[[P], R] | Callable[[P], Awaitable[R]]]

Decorator to create a task

Parameters:
  • task_type (str) – The task type

  • exception_handler (ExceptionHandler) – Handler that will be called when a job fails.

  • variables_to_fetch (Optional[Iterable[str]]) – The variables to request from Zeebe when activating jobs.

  • timeout_ms (int) – Maximum duration of the task in milliseconds. If the timeout is surpassed Zeebe will give up on the worker and retry it. Default: 10000 (10 seconds).

  • max_jobs_to_activate (int) – Maximum amount of jobs the worker will activate in one request to the Zeebe gateway. Default: 32

  • max_running_jobs (int) – Maximum amount of jobs that will run simultaneously. Default: 32

  • before (list[TaskDecorator]) – All decorators which should be performed before the task.

  • after (list[TaskDecorator]) – All decorators which should be performed after the task.

  • single_value (bool) – If the function returns a single value (int, string, list) and not a dictionary set this to True. Default: False

  • variable_name (str) – If single_value then this will be the variable name given to zeebe: { <variable_name>: <function_return_value> }

Raises:
class pyzeebe.ZeebeWorker(grpc_channel: Channel, name: str | None = None, request_timeout: int = 0, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, max_connection_retries: int = 10, poll_retry_delay: int = 5, tenant_ids: list[str] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)

A zeebe worker that can connect to a zeebe instance and perform tasks.

__init__(grpc_channel: Channel, name: str | None = None, request_timeout: int = 0, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, max_connection_retries: int = 10, poll_retry_delay: int = 5, tenant_ids: list[str] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
Parameters:
  • grpc_channel (grpc.aio.Channel) – GRPC Channel connected to a Zeebe gateway

  • name (str) – Name of zeebe worker

  • request_timeout (int) – Longpolling timeout for getting tasks from zeebe. If 0 default value is used

  • before (list[TaskDecorator]) – Decorators to be performed before each task

  • after (list[TaskDecorator]) – Decorators to be performed after each task

  • exception_handler (ExceptionHandler) – Handler that will be called when a job fails.

  • max_connection_retries (int) – Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1

  • poll_retry_delay (int) – The number of seconds to wait before attempting to poll again when reaching max amount of running jobs

  • tenant_ids (list[str]) – A list of tenant IDs for which to activate jobs. New in Zeebe 8.3.

include_router(*routers: ZeebeTaskRouter) None

Adds all router’s tasks to the worker.

Raises:

DuplicateTaskTypeError – If a task from the router already exists in the worker

async stop() None

Stop the worker. This will emit a signal asking tasks to complete the current task and stop polling for new.

async work() None

Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task.

Raises:
class pyzeebe.Job(key: 'int', type: 'str', process_instance_key: 'int', bpmn_process_id: 'str', process_definition_version: 'int', process_definition_key: 'int', element_id: 'str', element_instance_key: 'int', custom_headers: 'Headers', worker: 'str', retries: 'int', deadline: 'int', variables: 'Variables', tenant_id: 'str | None' = None, status: 'JobStatus' = <JobStatus.Running: 'Running'>)
key: int
type: str
process_instance_key: int
bpmn_process_id: str
process_definition_version: int
process_definition_key: int
element_id: str
element_instance_key: int
custom_headers: Headers
worker: str
retries: int
deadline: int
variables: Variables
tenant_id: str | None = None
status: JobStatus = 'Running'
task_result = None
set_task_result(task_result: Any) None
class pyzeebe.JobController(job: Job, zeebe_adapter: ZeebeAdapter)
async set_error_status(message: str, error_code: str = '', variables: Variables | None = None) None

Error status means that the job could not be completed because of a business error and won’t ever be able to be completed. For example: a required parameter was not given An error code can be added to handle the error in the Zeebe process

Parameters:
  • message (str) – The error message

  • error_code (str) – The error code that Zeebe will receive

  • variables (dict) – A dictionary containing variables that will instantiate the variables at the local scope of the job’s associated task. Must be JSONable. New in Zeebe 8.2.

Raises:
async set_failure_status(message: str, retry_back_off_ms: int = 0, variables: Variables | None = None) None

Failure status means a technical error has occurred. If retried the job may succeed. For example: connection to DB lost

Parameters:
  • message (str) – The failure message that Zeebe will receive

  • retry_back_off_ms (int) – The backoff timeout (in ms) for the next retry. New in Zeebe 8.1.

  • variables (dict) – A dictionary containing variables that will instantiate the variables at the local scope of the job’s associated task. Must be JSONable. New in Zeebe 8.2.

Raises:
async set_running_after_decorators_status() None

RunningAfterDecorators status means that the task has been completed as intended and the after decorators will now run.

async set_success_status(variables: Variables | None = None) None

Success status means that the job has been completed as intended.

Raises:
class pyzeebe.JobStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
Completed = 'Completed'
ErrorThrown = 'ErrorThrown'
Failed = 'Failed'
Running = 'Running'
RunningAfterDecorators = 'RunningAfterDecorators'