Optional
options: ZBClientOptionsZero-conf constructor. The entire ZBClient connection config can be passed in via the environment.
Optional
options: ZBClientOptionsPrivate
_onPrivate
Optional
basicPrivate
Optional
closePrivate
closingOptional
connectedPrivate
constructPrivate
Optional
customSSLPrivate
executeIf this.retry is set true, the operation will be wrapped in an configurable retry on exceptions of gRPC error code 14 - Transient Network Failure. See: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md If this.retry is false, it will be executed with no retry, and the application should handle the exception.
A gRPC command operation
Private
grpcPrivate
loggerPrivate
maxPrivate
maxPrivate
Optional
oOptional
onOptional
onPrivate
optionsPrivate
retryPrivate
retryThis function takes a gRPC operation that returns a Promise as a function, and invokes it. If the operation throws gRPC error 14, this function will continue to try it until it succeeds or retries are exhausted.
A gRPC command operation that may fail if the broker is not available
Private
stdoutPrivate
useTLSPrivate
workerPrivate
workersStatic
Readonly
DEFAULT_Static
Private
Readonly
DEFAULT_Static
Private
Readonly
DEFAULT_Static
Private
Readonly
DEFAULT_Static
Private
Readonly
DEFAULT_activateJobs allows you to manually activate jobs, effectively building a worker; rather than using the ZBWorker class.
const zbc = new ZBClient()
zbc.activateJobs({
maxJobsToActivate: 5,
requestTimeout: 6000,
timeout: 5 * 60 * 1000,
type: 'process-payment',
worker: 'my-worker-uuid'
}).then(jobs =>
jobs.forEach(job =>
// business logic
zbc.completeJob({
jobKey: job.key,
variables: {}
))
)
})
Broadcast a Signal
``` const zbc = new ZBClient()
zbc.broadcastSignal({ signalName: 'my-signal', variables: { reasonCode: 3 } })
Cancel a process instance by process instance key.
const zbc = new ZBClient()
zbc.cancelProcessInstance(processInstanceId)
.catch(
(e: any) => console.log(`Error cancelling instance: ${e.message}`)
)
Gracefully shut down all workers, draining existing tasks, and return when it is safe to exit.
const zbc = new ZBClient()
zbc.createWorker({
taskType:
})
setTimeout(async () => {
await zbc.close()
console.log('All work completed.')
}),
5 * 60 * 1000 // 5 mins
)
Optional
timeout: numberExplicitly complete a job. The method is useful for manually constructing a worker.
const zbc = new ZBClient()
zbc.activateJobs({
maxJobsToActivate: 5,
requestTimeout: 6000,
timeout: 5 * 60 * 1000,
type: 'process-payment',
worker: 'my-worker-uuid'
}).then(jobs =>
jobs.forEach(job =>
// business logic
zbc.completeJob({
jobKey: job.key,
variables: {}
))
)
})
Create a new Batch Worker. This is useful when you need to rate limit access to an external resource.
const zbc = new ZBClient()
// Helper function to find a job by its key
const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? []
const handler = async (jobs: BatchedJob[]) => {
console.log("Let's do this!")
const {jobKey, variables} = job
// Construct some hypothetical payload with correlation ids and requests
const req = jobs.map(job => ({id: jobKey, data: variables.request}))
// An uncaught exception will not be managed by the library
try {
// Our API wrapper turns that into a request, and returns
// an array of results with ids
const outcomes = await API.post(req)
// Construct a find function for these jobs
const getJob = findJobByKey(jobs)
// Iterate over the results and call the succeed method on the corresponding job,
// passing in the correlated outcome of the API call
outcomes.forEach(res => getJob(res.id)?.complete(res.data))
} catch (e) {
jobs.forEach(job => job.fail(e.message))
}
}
const batchWorker = zbc.createBatchWorker({
taskType: 'get-data-from-external-api',
taskHandler: handler,
jobBatchMinSize: 10, // at least 10 at a time
jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first
timeout: Duration.seconds.of(80) // 80 second timeout means we have 20 seconds to process at least
})
Create a new process instance. Asynchronously returns a process instance id.
const zbc = new ZBClient()
zbc.createProcessInstance({
bpmnProcessId: 'onboarding-process',
variables: {
customerId: 'uuid-3455'
},
version: 5 // optional, will use latest by default
}).then(res => console.log(JSON.stringify(res, null, 2)))
zbc.createProcessInstance({
bpmnProcessId: 'SkipFirstTask',
variables: { id: random },
startInstructions: [{elementId: 'second_service_task'}]
}).then(res => (id = res.processInstanceKey))
Create a process instance, and return a Promise that returns the outcome of the process.
const zbc = new ZBClient()
zbc.createProcessInstanceWithResult('order-process', {
customerId: 123,
invoiceId: 567
})
.then(console.log)
Create a worker that polls the gateway for jobs and executes a job handler when units of work are available.
const zbc = new ZB.ZBClient()
const zbWorker = zbc.createWorker({
taskType: 'demo-service',
taskHandler: myTaskHandler,
})
// A job handler must return one of job.complete, job.fail, job.error, or job.forward
// Note: unhandled exceptions in the job handler cause the library to call job.fail
async function myTaskHandler(job) {
zbWorker.log('Task variables', job.variables)
// Task worker business logic goes here
const updateToBrokerVariables = {
updatedProperty: 'newValue',
}
const res = await callExternalSystem(job.variables)
if (res.code === 'SUCCESS') {
return job.complete({
...updateToBrokerVariables,
...res.values
})
}
if (res.code === 'BUSINESS_ERROR') {
return job.error({
code: res.errorCode,
message: res.message
})
}
if (res.code === 'ERROR') {
return job.fail({
errorMessage: res.message,
retryBackOff: 2000
})
}
}
Deploy a process model.
import { readFileSync } from 'fs'
import { join } from 'path'
const zbc = new ZBClient()
const bpmnFilePath = join(process.cwd(), 'bpmn', 'onboarding.bpmn')
// Loading the process model allows you to perform modifications or analysis
const bpmn = readFileSync(bpmnFilePath, 'utf8')
zbc.deployProcess({
definition: bpmn,
name: 'onboarding.bpmn'
})
// If you don't need access to model contents, simply pass a file path
zbc.deployProcess(bpmnFilePath)
Deploy a BPMN model or DMN table to the Zeebe cluster.
import {join} from 'path'
const zbc = new ZBClient()
zbc.deployResource({ processFilename: join(process.cwd(), 'bpmn', 'onboarding.bpmn' })
zbc.deployResource({ decisionFilename: join(process.cwd(), 'dmn', 'approval.dmn')})
Optional
params: { Evaluates a decision. The decision to evaluate can be specified either by using its unique key (as returned by DeployResource), or using the decision ID. When using the decision ID, the latest deployed version of the decision is used.
``` const zbc = new ZBClient() zbc.evaluateDecision({ decisionId: 'my-decision', variables: { season: "Fall" } }).then(res => console.log(JSON.stringify(res, null, 2)))
Fail a job. This is useful if you are using the decoupled completion pattern or building your own worker. For the retry count, the current count is available in the job metadata.
const zbc = new ZBClient()
zbc.failJob( {
jobKey: '345424343451',
retries: 3,
errorMessage: 'Could not get a response from the order invoicing API',
retryBackOff: 30 * 1000 // optional, otherwise available for reactivation immediately
})
Return an array of task types contained in a BPMN file or array of BPMN files. This can be useful, for example, to do
const zbc = new ZBClient()
zbc.getServiceTypesFromBpmn(['bpmn/onboarding.bpmn', 'bpmn/process-sale.bpmn'])
.then(tasktypes => console.log('The task types are:', tasktypes))
Modify a running process instance. This allows you to move the execution tokens, and change the variables. Added in 8.1. See the gRPC protocol documentation.
zbc.createProcessInstance('SkipFirstTask', {}).then(res =>
zbc.modifyProcessInstance({
processInstanceKey: res.processInstanceKey,
activateInstructions: [{
elementId: 'second_service_task',
ancestorElementInstanceKey: "-1",
variableInstructions: [{
scopeId: '',
variables: { second: 1}
}]
}]
})
)
Publish a message to the broker for correlation with a workflow instance. See this tutorial for a detailed description of message correlation.
const zbc = new ZBClient()
zbc.publishMessage({
// Should match the "Message Name" in a BPMN Message Catch
name: 'order_status',
correlationKey: 'uuid-124-532-5432',
variables: {
event: 'PROCESSED'
}
})
Publish a message to the broker for correlation with a workflow message start event. For a message targeting a start event, the correlation key is not needed to target a specific running process instance. However, the hash of the correlationKey is used to determine the partition where this workflow will start. So we assign a random uuid to balance workflow instances created via start message across partitions.
We make the correlationKey optional, because the caller can specify a correlationKey + messageId to guarantee an idempotent message.
Multiple messages with the same correlationKey + messageId combination will only start a workflow once. See: https://github.com/zeebe-io/zeebe/issues/1012 and https://github.com/zeebe-io/zeebe/issues/1022
const zbc = new ZBClient()
zbc.publishStartMessage({
name: 'Start_New_Onboarding_Flow',
variables: {
customerId: 'uuid-348-234-8908'
}
})
// To do the same in an idempotent fashion - note: only idempotent during the lifetime of the created instance.
zbc.publishStartMessage({
name: 'Start_New_Onboarding_Flow',
messageId: 'uuid-348-234-8908', // use customerId to make process idempotent per customer
variables: {
customerId: 'uuid-348-234-8908'
}
})
Resolve an incident by incident key.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZBClient()
async updateAndResolveIncident({
processInstanceId,
incidentKey,
variables
} : {
processInstanceId: string,
incidentKey: string,
variables: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceId,
variables
})
await zbc.updateRetries()
zbc.resolveIncident({
incidentKey
})
zbc.resolveIncident(incidentKey)
}
Directly modify the variables is a process instance. This can be used with resolveIncident
to update the process and resolve an incident.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZBClient()
async function updateAndResolveIncident({
incidentKey,
processInstanceKey,
jobKey,
variableUpdate
} : {
incidentKey: string
processInstanceKey: string
jobKey: string
variableUpdate: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceKey,
variables: variableUpdate
})
await zbc.updateJobRetries({
jobKey,
retries: 1
})
return zbc.resolveIncident({
incidentKey
})
}
Fail a job by throwing a business error (i.e. non-technical) that occurs while processing a job.
The error is handled in the workflow by an error catch event.
If there is no error catch event with the specified errorCode
then an incident will be raised instead.
This method is useful when building a worker, for example for the decoupled completion pattern.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
interface errorResult {
resultType: 'ERROR' as 'ERROR'
errorCode: string
errorMessage: string
}
interface successResult {
resultType: 'SUCCESS' as 'SUCCESS'
variableUpdate: JSONObject
}
type Result = errorResult | successResult
const zbc = new ZBClient()
// This could be a listener on a return queue from an external system
async function handleJob(jobKey: string, result: Result) {
if (resultType === 'ERROR') {
const { errorMessage, errorCode } = result
zbc.throwError({
jobKey,
errorCode,
errorMessage
})
} else {
zbc.completeJob({
jobKey,
variables: result.variableUpdate
})
}
}
Return the broker cluster topology.
const zbc = new ZBClient()
zbc.topology().then(res => console.res(JSON.stringify(res, null, 2)))
Update the number of retries for a Job. This is useful if a job has zero remaining retries and fails, raising an incident.
type JSONObject = {[key: string]: string | number | boolean | JSONObject}
const zbc = new ZBClient()
async function updateAndResolveIncident({
incidentKey,
processInstanceKey,
jobKey,
variableUpdate
} : {
incidentKey: string
processInstanceKey: string
jobKey: string
variableUpdate: JSONObject
}) {
await zbc.setVariables({
elementInstanceKey: processInstanceKey,
variables: variableUpdate
})
await zbc.updateJobRetries({
jobKey,
retries: 1
})
return zbc.resolveIncident({
incidentKey
})
}
Generated using TypeDoc
Description
A client for interacting with a Zeebe broker. With the connection credentials set in the environment, you can use a "zero-conf" constructor with no arguments.
Example