Options
All
  • Public
  • Public/Protected
  • All
Menu

Class ZBClient

description

A client for interacting with a Zeebe broker. With the connection credentials set in the environment, you can use a "zero-conf" constructor with no arguments.

example
const zbc = new ZBClient()
zbc.topology().then(info =>
    console.log(JSON.stringify(info, null, 2))
)

Hierarchy

Index

Constructors

constructor

  • Parameters

    • Optional options: ZBClientOptions

      Zero-conf constructor. The entire ZBClient connection config can be passed in via the environment.

    Returns ZBClient

  • Parameters

    • gatewayAddress: string
    • Optional options: ZBClientOptions

      Zero-conf constructor. The entire ZBClient connection config can be passed in via the environment.

    Returns ZBClient

Properties

Private Optional basicAuth

basicAuth: BasicAuthConfig

Private Optional closePromise

closePromise: Promise<any>

Private closing

closing: boolean = false

Optional connected

connected: boolean = ...

connectionTolerance

connectionTolerance: MaybeTimeDuration = ...

Private Optional customSSL

customSSL: CustomSSL

gatewayAddress

gatewayAddress: string

Private grpc

grpc: ZBGrpc

Private logger

logger: StatefulLogInterceptor

loglevel

loglevel: Loglevel

Private maxRetries

maxRetries: number = ...

Private maxRetryTimeout

maxRetryTimeout: MaybeTimeDuration = ...

Private Optional oAuth

oAuth: OAuthProvider

Optional onConnectionError

onConnectionError: (err: Error) => void

Type declaration

    • (err: Error): void
    • Parameters

      • err: Error

      Returns void

Optional onReady

onReady: () => void

Type declaration

    • (): void
    • Returns void

Private options

readied

readied: boolean = false

Private retry

retry: boolean

Private stdout

Private Optional tenantId

tenantId: string

Private useTLS

useTLS: boolean

Private workerCount

workerCount: number = 0

Private workers

workers: (ZBWorker<any, any, any> | ZBBatchWorker<any, any, any>)[] = []

Static Readonly DEFAULT_CONNECTION_TOLERANCE

DEFAULT_CONNECTION_TOLERANCE: Milliseconds = ...

Static Private Readonly DEFAULT_LONGPOLL_PERIOD

DEFAULT_LONGPOLL_PERIOD: Seconds = ...

Static Private Readonly DEFAULT_MAX_RETRIES

DEFAULT_MAX_RETRIES: -1 = -1

Static Private Readonly DEFAULT_MAX_RETRY_TIMEOUT

DEFAULT_MAX_RETRY_TIMEOUT: Seconds = ...

Static Private Readonly DEFAULT_POLL_INTERVAL

DEFAULT_POLL_INTERVAL: Milliseconds = ...

Methods

Private _onConnectionError

  • _onConnectionError(err: Error): void

activateJobs

  • description

    activateJobs allows you to manually activate jobs, effectively building a worker; rather than using the ZBWorker class.

    example
    const zbc = new ZBClient()
    zbc.activateJobs({
      maxJobsToActivate: 5,
      requestTimeout: 6000,
      timeout: 5 * 60 * 1000,
      type: 'process-payment',
      worker: 'my-worker-uuid'
    }).then(jobs =>
         jobs.forEach(job =>
        // business logic
        zbc.completeJob({
          jobKey: job.key,
          variables: {}
        ))
      )
    })
    

    Type parameters

    Parameters

    Returns Promise<Job<IInputVariables, ICustomHeaders>[]>

broadcastSignal

cancelProcessInstance

  • cancelProcessInstance(processInstanceKey: string | number): Promise<void>
  • description

    Cancel a process instance by process instance key.

    example
    const zbc = new ZBClient()
    
    zbc.cancelProcessInstance(processInstanceId)
        .catch(
            (e: any) => console.log(`Error cancelling instance: ${e.message}`)
    )
    

    Parameters

    • processInstanceKey: string | number

    Returns Promise<void>

close

  • close(timeout?: number): Promise<null>
  • description

    Gracefully shut down all workers, draining existing tasks, and return when it is safe to exit.

    example
    const zbc = new ZBClient()
    
    zbc.createWorker({
      taskType:
    })
    
    setTimeout(async () => {
      await zbc.close()
      console.log('All work completed.')
    }),
      5 * 60 * 1000 // 5 mins
    )
    

    Parameters

    • Optional timeout: number

    Returns Promise<null>

completeJob

  • description

    Explicitly complete a job. The method is useful for manually constructing a worker.

    example
    const zbc = new ZBClient()
    zbc.activateJobs({
      maxJobsToActivate: 5,
      requestTimeout: 6000,
      timeout: 5 * 60 * 1000,
      type: 'process-payment',
      worker: 'my-worker-uuid'
    }).then(jobs =>
         jobs.forEach(job =>
        // business logic
        zbc.completeJob({
          jobKey: job.key,
          variables: {}
        ))
      )
    })
    

    Parameters

    Returns Promise<void>

Private constructGrpcClient

  • constructGrpcClient(__namedParameters: { grpcConfig: { namespace: string; tasktype?: string; onConnectionError?: any; onReady?: any }; logConfig: ZBLoggerConfig }): { grpcClient: ZBGrpc; log: StatefulLogInterceptor }
  • Parameters

    • __namedParameters: { grpcConfig: { namespace: string; tasktype?: string; onConnectionError?: any; onReady?: any }; logConfig: ZBLoggerConfig }
      • grpcConfig: { namespace: string; tasktype?: string; onConnectionError?: any; onReady?: any }
        • namespace: string
        • Optional tasktype?: string
        • onConnectionError: function
          • onConnectionError(): void
        • onReady: function
          • onReady(): void
      • logConfig: ZBLoggerConfig

    Returns { grpcClient: ZBGrpc; log: StatefulLogInterceptor }

    • grpcClient: ZBGrpc
    • log: StatefulLogInterceptor

createBatchWorker

  • createBatchWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>(conf: ZBBatchWorkerConfig<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>): ZBBatchWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>
  • description

    Create a new Batch Worker. This is useful when you need to rate limit access to an external resource.

    example
    const zbc = new ZBClient()
    // Helper function to find a job by its key
    const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? []
    
    const handler = async (jobs: BatchedJob[]) => {
      console.log("Let's do this!")
      const {jobKey, variables} = job
      // Construct some hypothetical payload with correlation ids and requests
      const req = jobs.map(job => ({id: jobKey, data: variables.request}))
      // An uncaught exception will not be managed by the library
         try {
        // Our API wrapper turns that into a request, and returns
        // an array of results with ids
        const outcomes = await API.post(req)
        // Construct a find function for these jobs
        const getJob = findJobByKey(jobs)
        // Iterate over the results and call the succeed method on the corresponding job,
        // passing in the correlated outcome of the API call
        outcomes.forEach(res => getJob(res.id)?.complete(res.data))
      } catch (e) {
        jobs.forEach(job => job.fail(e.message))
      }
    }
    
    const batchWorker = zbc.createBatchWorker({
      taskType: 'get-data-from-external-api',
      taskHandler: handler,
      jobBatchMinSize: 10, // at least 10 at a time
      jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first
      timeout: Duration.seconds.of(80) // 80 second timeout means we have 20 seconds to process at least
    })
    

    Type parameters

    Parameters

    Returns ZBBatchWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>

createProcessInstance

  • description

    Create a new process instance. Asynchronously returns a process instance id.

    example
    const zbc = new ZBClient()
    
    zbc.createProcessInstance({
      bpmnProcessId: 'onboarding-process',
      variables: {
        customerId: 'uuid-3455'
      },
      version: 5 // optional, will use latest by default
    }).then(res => console.log(JSON.stringify(res, null, 2)))
    
        zbc.createProcessInstance({
            bpmnProcessId: 'SkipFirstTask',
            variables: { id: random },
            startInstructions: [{elementId: 'second_service_task'}]
        }).then(res => (id = res.processInstanceKey))
    

    Type parameters

    Parameters

    Returns Promise<CreateProcessInstanceResponse>

createProcessInstanceWithResult

createWorker

  • createWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>(config: ZBWorkerConfig<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>): ZBWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>
  • description

    Create a worker that polls the gateway for jobs and executes a job handler when units of work are available.

    example
    const zbc = new ZB.ZBClient()
    
    const zbWorker = zbc.createWorker({
      taskType: 'demo-service',
      taskHandler: myTaskHandler,
    })
    
    // A job handler must return one of job.complete, job.fail, job.error, or job.forward
    // Note: unhandled exceptions in the job handler cause the library to call job.fail
    async function myTaskHandler(job) {
      zbWorker.log('Task variables', job.variables)
    
      // Task worker business logic goes here
      const updateToBrokerVariables = {
        updatedProperty: 'newValue',
      }
    
      const res = await callExternalSystem(job.variables)
    
      if (res.code === 'SUCCESS') {
        return job.complete({
           ...updateToBrokerVariables,
           ...res.values
        })
      }
      if (res.code === 'BUSINESS_ERROR') {
        return job.error({
          code: res.errorCode,
          message: res.message
        })
      }
      if (res.code === 'ERROR') {
        return job.fail({
           errorMessage: res.message,
           retryBackOff: 2000
        })
      }
    }
    

    Type parameters

    Parameters

    • config: ZBWorkerConfig<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>

    Returns ZBWorker<WorkerInputVariables, CustomHeaderShape, WorkerOutputVariables>

deployProcess

  • description

    Deploy a process model.

    example
    import { readFileSync } from 'fs'
    import { join } from 'path'
    
    const zbc = new ZBClient()
    const bpmnFilePath = join(process.cwd(), 'bpmn', 'onboarding.bpmn')
    
    // Loading the process model allows you to perform modifications or analysis
    const bpmn = readFileSync(bpmnFilePath, 'utf8')
    
    zbc.deployProcess({
       definition: bpmn,
       name: 'onboarding.bpmn'
    })
    
    // If you don't need access to model contents, simply pass a file path
    zbc.deployProcess(bpmnFilePath)
    

    Parameters

    Returns Promise<DeployProcessResponse>

deployResource

  • description

    Deploys one or more resources (e.g. processes or decision models) to Zeebe. Note that this is an atomic call, i.e. either all resources are deployed, or none of them are.

    Errors: PERMISSION_DENIED:

    • if a deployment to an unauthorized tenant is performed INVALID_ARGUMENT:
    • no resources given.
    • if at least one resource is invalid. A resource is considered invalid if:
    • the content is not deserializable (e.g. detected as BPMN, but it's broken XML)
    • the content is invalid (e.g. an event-based gateway has an outgoing sequence flow to a task)
    • if multi-tenancy is enabled, and:
    • a tenant id is not provided
    • a tenant id with an invalid format is provided
    • if multi-tenancy is disabled and a tenant id is provided
    example
    import {join} from 'path'
    const zbc = new ZBClient()
    
    zbc.deployResource({ processFilename: join(process.cwd(), 'bpmn', 'onboarding.bpmn' })
    zbc.deployResource({ decisionFilename: join(process.cwd(), 'dmn', 'approval.dmn')})
    

    Parameters

    • resource: { processFilename: string; tenantId?: string } | { name: string; process: Buffer; tenantId?: string }

    Returns Promise<DeployResourceResponse<ProcessDeployment>>

  • Parameters

    • resource: { decisionFilename: string; tenantId?: string } | { decision: Buffer; name: string; tenantId?: string }

    Returns Promise<DeployResourceResponse<DecisionDeployment>>

  • Parameters

    • resource: { formFilename: string; tenantId?: string } | { form: Buffer; name: string; tenantId?: string }

    Returns Promise<DeployResourceResponse<FormDeployment>>

emit

  • emit<K>(eventName: K, params?: { close: "close"; connectionError: "connectionError"; ready: "ready"; unknown: "unknown" }[K]): void
  • Type parameters

    • K: EventKey<{ close: "close"; connectionError: "connectionError"; ready: "ready"; unknown: "unknown" }>

    Parameters

    • eventName: K
    • Optional params: { close: "close"; connectionError: "connectionError"; ready: "ready"; unknown: "unknown" }[K]

    Returns void

evaluateDecision

  • description

    Evaluates a decision. The decision to evaluate can be specified either by using its unique key (as returned by DeployResource), or using the decision ID. When using the decision ID, the latest deployed version of the decision is used.

    example
    const zbc = new ZBClient()
    zbc.evaluateDecision({
      decisionId: 'my-decision',
      variables: { season: "Fall" }
    }).then(res => console.log(JSON.stringify(res, null, 2)))
    

    Parameters

    Returns Promise<EvaluateDecisionResponse>

Private executeOperation

  • executeOperation<T>(operationName: string, operation: () => Promise<T>, retries?: number): Promise<T>
  • If this.retry is set true, the operation will be wrapped in an configurable retry on exceptions of gRPC error code 14 - Transient Network Failure. See: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md If this.retry is false, it will be executed with no retry, and the application should handle the exception.

    Type parameters

    • T

    Parameters

    • operationName: string
    • operation: () => Promise<T>

      A gRPC command operation

        • (): Promise<T>
        • Returns Promise<T>

    • Optional retries: number

    Returns Promise<T>

failJob

  • description

    Fail a job. This is useful if you are using the decoupled completion pattern or building your own worker. For the retry count, the current count is available in the job metadata.

    example
    const zbc = new ZBClient()
    zbc.failJob( {
      jobKey: '345424343451',
      retries: 3,
      errorMessage: 'Could not get a response from the order invoicing API',
      retryBackOff: 30 * 1000 // optional, otherwise available for reactivation immediately
    })
    

    Parameters

    Returns Promise<void>

getServiceTypesFromBpmn

  • getServiceTypesFromBpmn(files: string | string[]): Promise<string[]>
  • description

    Return an array of task types contained in a BPMN file or array of BPMN files. This can be useful, for example, to do

    example
    const zbc = new ZBClient()
    zbc.getServiceTypesFromBpmn(['bpmn/onboarding.bpmn', 'bpmn/process-sale.bpmn'])
      .then(tasktypes => console.log('The task types are:', tasktypes))
    
    

    Parameters

    • files: string | string[]

    Returns Promise<string[]>

modifyProcessInstance

  • description

    Modify a running process instance. This allows you to move the execution tokens, and change the variables. Added in 8.1. See the gRPC protocol documentation.

    example
    zbc.createProcessInstance('SkipFirstTask', {}).then(res =>
         zbc.modifyProcessInstance({
        processInstanceKey: res.processInstanceKey,
        activateInstructions: [{
          elementId: 'second_service_task',
          ancestorElementInstanceKey: "-1",
          variableInstructions: [{
            scopeId: '',
            variables: { second: 1}
          }]
        }]
         })
    )
    

    Parameters

    Returns Promise<ModifyProcessInstanceResponse>

off

  • off<K>(eventName: K, fn: EventReceiver): void
  • Type parameters

    • K: EventKey<{ close: "close"; connectionError: "connectionError"; ready: "ready"; unknown: "unknown" }>

    Parameters

    • eventName: K
    • fn: EventReceiver

    Returns void

on

  • on<K>(eventName: K, fn: EventReceiver): ZBClient
  • Type parameters

    • K: EventKey<{ close: "close"; connectionError: "connectionError"; ready: "ready"; unknown: "unknown" }>

    Parameters

    • eventName: K
    • fn: EventReceiver

    Returns ZBClient

publishMessage

  • description

    Publish a message to the broker for correlation with a workflow instance. See this tutorial for a detailed description of message correlation.

    example
    const zbc = new ZBClient()
    
    zbc.publishMessage({
      // Should match the "Message Name" in a BPMN Message Catch
      name: 'order_status',
      correlationKey: 'uuid-124-532-5432',
      variables: {
        event: 'PROCESSED'
      }
    })
    

    Type parameters

    Parameters

    Returns Promise<PublishMessageResponse>

publishStartMessage

  • description

    Publish a message to the broker for correlation with a workflow message start event. For a message targeting a start event, the correlation key is not needed to target a specific running process instance. However, the hash of the correlationKey is used to determine the partition where this workflow will start. So we assign a random uuid to balance workflow instances created via start message across partitions.

    We make the correlationKey optional, because the caller can specify a correlationKey + messageId to guarantee an idempotent message.

    Multiple messages with the same correlationKey + messageId combination will only start a workflow once. See: https://github.com/zeebe-io/zeebe/issues/1012 and https://github.com/zeebe-io/zeebe/issues/1022

    example
    const zbc = new ZBClient()
    zbc.publishStartMessage({
      name: 'Start_New_Onboarding_Flow',
      variables: {
        customerId: 'uuid-348-234-8908'
      }
    })
    
    // To do the same in an idempotent fashion - note: only idempotent during the lifetime of the created instance.
    zbc.publishStartMessage({
      name: 'Start_New_Onboarding_Flow',
      messageId: 'uuid-348-234-8908', // use customerId to make process idempotent per customer
      variables: {
        customerId: 'uuid-348-234-8908'
      }
    })
    

    Type parameters

    Parameters

    Returns Promise<PublishMessageResponse>

removeAllListeners

  • removeAllListeners(): void

resolveIncident

  • description

    Resolve an incident by incident key.

    example
    type JSONObject = {[key: string]: string | number | boolean | JSONObject}
    
    const zbc = new ZBClient()
    
    async updateAndResolveIncident({
      processInstanceId,
      incidentKey,
      variables
    } : {
      processInstanceId: string,
      incidentKey: string,
      variables: JSONObject
    }) {
      await zbc.setVariables({
        elementInstanceKey: processInstanceId,
        variables
      })
      await zbc.updateRetries()
      zbc.resolveIncident({
        incidentKey
      })
      zbc.resolveIncident(incidentKey)
    }
    
    

    Parameters

    Returns Promise<void>

Private retryOnFailure

  • retryOnFailure<T>(operationName: string, operation: () => Promise<T>, retries?: number): Promise<T>
  • This function takes a gRPC operation that returns a Promise as a function, and invokes it. If the operation throws gRPC error 14, this function will continue to try it until it succeeds or retries are exhausted.

    Type parameters

    • T

    Parameters

    • operationName: string
    • operation: () => Promise<T>

      A gRPC command operation that may fail if the broker is not available

        • (): Promise<T>
        • Returns Promise<T>

    • retries: number = ...

    Returns Promise<T>

setVariables

  • description

    Directly modify the variables is a process instance. This can be used with resolveIncident to update the process and resolve an incident.

    example
    type JSONObject = {[key: string]: string | number | boolean | JSONObject}
    
    const zbc = new ZBClient()
    
    async function updateAndResolveIncident({
      incidentKey,
      processInstanceKey,
      jobKey,
      variableUpdate
    } : {
      incidentKey: string
      processInstanceKey: string
      jobKey: string
      variableUpdate: JSONObject
    }) {
      await zbc.setVariables({
        elementInstanceKey: processInstanceKey,
        variables: variableUpdate
      })
      await zbc.updateJobRetries({
        jobKey,
        retries: 1
      })
      return zbc.resolveIncident({
        incidentKey
      })
    }
    

    Type parameters

    Parameters

    Returns Promise<void>

throwError

  • description

    Fail a job by throwing a business error (i.e. non-technical) that occurs while processing a job. The error is handled in the workflow by an error catch event. If there is no error catch event with the specified errorCode then an incident will be raised instead. This method is useful when building a worker, for example for the decoupled completion pattern.

    example
    type JSONObject = {[key: string]: string | number | boolean | JSONObject}
    
    interface errorResult {
      resultType: 'ERROR' as 'ERROR'
         errorCode: string
      errorMessage: string
    }
    
    interface successResult {
      resultType: 'SUCCESS' as 'SUCCESS'
      variableUpdate: JSONObject
    }
    
    type Result = errorResult | successResult
    
    const zbc = new ZBClient()
    
    
    // This could be a listener on a return queue from an external system
    async function handleJob(jobKey: string, result: Result) {
      if (resultType === 'ERROR') {
        const { errorMessage, errorCode } = result
            zbc.throwError({
           jobKey,
           errorCode,
              errorMessage
        })
      } else {
        zbc.completeJob({
          jobKey,
          variables: result.variableUpdate
        })
      }
    }
    

    Parameters

    Returns Promise<void>

topology

  • description

    Return the broker cluster topology.

    example
    const zbc = new ZBClient()
    
    zbc.topology().then(res => console.res(JSON.stringify(res, null, 2)))
    

    Returns Promise<TopologyResponse>

updateJobRetries

  • description

    Update the number of retries for a Job. This is useful if a job has zero remaining retries and fails, raising an incident.

    example
    type JSONObject = {[key: string]: string | number | boolean | JSONObject}
    
    const zbc = new ZBClient()
    
    async function updateAndResolveIncident({
      incidentKey,
      processInstanceKey,
      jobKey,
      variableUpdate
    } : {
      incidentKey: string
      processInstanceKey: string
      jobKey: string
      variableUpdate: JSONObject
    }) {
      await zbc.setVariables({
        elementInstanceKey: processInstanceKey,
        variables: variableUpdate
      })
      await zbc.updateJobRetries({
        jobKey,
        retries: 1
      })
      return zbc.resolveIncident({
        incidentKey
      })
    }
    

    Parameters

    Returns Promise<void>

Generated using TypeDoc