Worker Reference
The ZeebeWorker
class inherits from ZeebeTaskRouter
class.
This means that all methods that ZeebeTaskRouter
has will also appear in ZeebeWorker
.
- class pyzeebe.ZeebeTaskRouter(before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
- __init__(before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
- after(*decorators: Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]) None
Add decorators to be performed after a job is run
- Parameters:
decorators (Iterable[TaskDecorator]) – The decorators to be performed after each job is run
- before(*decorators: Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]) None
Add decorators to be performed before a job is run
- Parameters:
decorators (Iterable[TaskDecorator]) – The decorators to be performed before each job is run
- exception_handler(exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]]) None
Add exception handler to be called when a job fails
- Parameters:
exception_handler (ExceptionHandler) – Handler that will be called when a job fails.
- get_task(task_type: str) Task
Get a task by its type
- Parameters:
task_type (str) – The type of the wanted task
- Returns:
The wanted task
- Return type:
Task
- Raises:
TaskNotFoundError – If no task with specified type exists
- remove_task(task_type: str) Task
Remove a task
- Parameters:
task_type (str) – The type of the wanted task
- Returns:
The task that was removed
- Return type:
Task
- Raises:
TaskNotFoundError – If no task with specified type exists
- task(task_type: str, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None, variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, *, single_value: Literal[False] = False) Callable[[Callable[[P], RD] | Callable[[P], Awaitable[RD]]], Callable[[P], RD] | Callable[[P], Awaitable[RD]]]
- task(task_type: str, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None, variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, *, single_value: Literal[True], variable_name: str) Callable[[Callable[[P], R] | Callable[[P], Awaitable[R]]], Callable[[P], R] | Callable[[P], Awaitable[R]]]
Decorator to create a task
- Parameters:
task_type (str) – The task type
exception_handler (ExceptionHandler) – Handler that will be called when a job fails.
variables_to_fetch (Optional[Iterable[str]]) – The variables to request from Zeebe when activating jobs.
timeout_ms (int) – Maximum duration of the task in milliseconds. If the timeout is surpassed Zeebe will give up on the worker and retry it. Default: 10000 (10 seconds).
max_jobs_to_activate (int) – Maximum amount of jobs the worker will activate in one request to the Zeebe gateway. Default: 32
max_running_jobs (int) – Maximum amount of jobs that will run simultaneously. Default: 32
before (list[TaskDecorator]) – All decorators which should be performed before the task.
after (list[TaskDecorator]) – All decorators which should be performed after the task.
single_value (bool) – If the function returns a single value (int, string, list) and not a dictionary set this to True. Default: False
variable_name (str) – If single_value then this will be the variable name given to zeebe: { <variable_name>: <function_return_value> }
- Raises:
DuplicateTaskTypeError – If a task from the router already exists in the worker
NoVariableNameGivenError – When single_value is set, but no variable_name is given
- class pyzeebe.ZeebeWorker(grpc_channel: Channel, name: str | None = None, request_timeout: int = 0, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, max_connection_retries: int = 10, poll_retry_delay: int = 5, tenant_ids: list[str] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
A zeebe worker that can connect to a zeebe instance and perform tasks.
- __init__(grpc_channel: Channel, name: str | None = None, request_timeout: int = 0, before: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, after: list[Callable[[Job], Job] | Callable[[Job], Awaitable[Job]]] | None = None, max_connection_retries: int = 10, poll_retry_delay: int = 5, tenant_ids: list[str] | None = None, exception_handler: Callable[[Exception, Job, JobController], Awaitable[None]] | None = None)
- Parameters:
grpc_channel (grpc.aio.Channel) – GRPC Channel connected to a Zeebe gateway
name (str) – Name of zeebe worker
request_timeout (int) – Longpolling timeout for getting tasks from zeebe. If 0 default value is used
before (list[TaskDecorator]) – Decorators to be performed before each task
after (list[TaskDecorator]) – Decorators to be performed after each task
exception_handler (ExceptionHandler) – Handler that will be called when a job fails.
max_connection_retries (int) – Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
poll_retry_delay (int) – The number of seconds to wait before attempting to poll again when reaching max amount of running jobs
tenant_ids (list[str]) – A list of tenant IDs for which to activate jobs. New in Zeebe 8.3.
- include_router(*routers: ZeebeTaskRouter) None
Adds all router’s tasks to the worker.
- Raises:
DuplicateTaskTypeError – If a task from the router already exists in the worker
- async stop() None
Stop the worker. This will emit a signal asking tasks to complete the current task and stop polling for new.
- async work() None
Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task.
- Raises:
ActivateJobsRequestInvalidError – If one of the worker’s task has invalid types
ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
ZeebeInternalError – If Zeebe experiences an internal error
UnknownGrpcStatusCodeError – If Zeebe returns an unexpected status code
- class pyzeebe.Job(key: 'int', type: 'str', process_instance_key: 'int', bpmn_process_id: 'str', process_definition_version: 'int', process_definition_key: 'int', element_id: 'str', element_instance_key: 'int', custom_headers: 'Headers', worker: 'str', retries: 'int', deadline: 'int', variables: 'Variables', tenant_id: 'str | None' = None, status: 'JobStatus' = <JobStatus.Running: 'Running'>)
-
- custom_headers: Headers
- variables: Variables
- task_result = None
- class pyzeebe.JobController(job: Job, zeebe_adapter: ZeebeAdapter)
- async set_error_status(message: str, error_code: str = '', variables: Variables | None = None) None
Error status means that the job could not be completed because of a business error and won’t ever be able to be completed. For example: a required parameter was not given An error code can be added to handle the error in the Zeebe process
- Parameters:
- Raises:
ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
ZeebeInternalError – If Zeebe experiences an internal error
- async set_failure_status(message: str, retry_back_off_ms: int = 0, variables: Variables | None = None) None
Failure status means a technical error has occurred. If retried the job may succeed. For example: connection to DB lost
- Parameters:
message (str) – The failure message that Zeebe will receive
retry_back_off_ms (int) – The backoff timeout (in ms) for the next retry. New in Zeebe 8.1.
variables (dict) – A dictionary containing variables that will instantiate the variables at the local scope of the job’s associated task. Must be JSONable. New in Zeebe 8.2.
- Raises:
ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
ZeebeInternalError – If Zeebe experiences an internal error
- async set_running_after_decorators_status() None
RunningAfterDecorators status means that the task has been completed as intended and the after decorators will now run.
- async set_success_status(variables: Variables | None = None) None
Success status means that the job has been completed as intended.
- Raises:
ZeebeBackPressureError – If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError – If the Zeebe gateway is unavailable
ZeebeInternalError – If Zeebe experiences an internal error