Event Script Syntax
Event Script uses YAML to represent an end-to-end transaction flow. A transaction is a business use case, and the flow can be an API service, a batch job or a real-time transaction.
Flow list
This configuration file sits in the project "resources" project and contains a list of filenames.
The default flow list is "flows.yaml" under the "resources" folder. It may look like this.
flows:
- 'get-profile.yml'
- 'create-profile.yml'
- 'delete-profile.yml'
location: 'classpath:/flows/'
The "location" parameter is optional. If present, you can tell the system to load the flow config files from another folder location.
Multiple flow lists
You can provide more than one flow list to your application and it can become very handy under different situations. For instance, to achieve better modularity in complex application, flows can be grouped to multiple categories based on development team's choice and these flows can be managed in multiple flow lists. Another great place to use multiple flow list is to include external libraries which contain pre-defined flow lists. The following example demonstrates that an application loads a list of flows defined in "flows.yaml" and additional flows defined in "more-flows.yaml" file of a composable library.
yaml.flow.automation=classpath:/flows.yaml, classpath:/more-flows.yaml
Writing new REST endpoint and function
You can use the "composable-example" subproject as a template to write your own composable application.
For each filename in the flows.yml, you should create a corresponding configuration file under the "resources/flows" folder.
Let's write a new flow called "greetings". You can copy-n-paste the following into a file called "greetings.yml" under the "resources/flows" folder.
flow:
id: 'greetings'
description: 'Simplest flow'
ttl: 10s
first.task: 'greeting.demo'
tasks:
- input:
- 'input.path_parameter.user -> user'
process: 'greeting.demo'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: end
In the application.properties, you can specify the following parameter:
yaml.flow.automation=classpath:/flows.yaml
and update the "flows.yaml" file in the resources folder as follows:
flows:
- 'get-profile.yml'
- 'create-profile.yml'
- 'delete-profile.yml'
- 'greetings.yml'
Then, you can add a new REST endpoint in the "rest.yaml" configuration file like this.
- service: "http.flow.adapter"
methods: ['GET']
url: "/api/greetings/{user}"
flow: 'greetings'
timeout: 10s
cors: cors_1
headers: header_1
The above REST endpoint takes the path parameter "user". The task executor will map the path parameter to the input arguments (headers and body) in your function. Now you can write your new function with the named route "greeting.demo". Please copy-n-paste the following into a Java class called "Greetings" and save in the package under "my.organization.tasks" in the source project.
Note: "my.organization" package name is just an example. Please replace it with your organization package path.
@PreLoad(route="greeting.demo", instances=10, isPrivate = false)
public class Greetings implements TypedLambdaFunction<Map<String, Object>, Map<String, Object>> {
private static final String USER = "user";
@Override
public Map<String, Object> handleEvent(Map<String, String> headers, Map<String, Object> input, int instance) {
if (input.containsKey(USER)) {
String user = input.get(USER).toString();
Map<String, Object> result = new HashMap<>();
result.put(USER, user);
result.put("message", "Welcome");
result.put("time", new Date());
return result;
} else {
throw new IllegalArgumentException("Missing path parameter 'user'");
}
}
}
For the flow-engine to find your new function, please update the key-value for "web.component.scan" in application.properties:
web.component.scan=my.organization
To test your new REST endpoint, flow configuration and function, please point your browser to
http://127.0.0.1:8100/api/greetings/my_name
You can replace "my_name" with your first name to see the response to the browser.
Flow configuration syntax
In your "greetings.yml" file above, you find the following key-values:
flow.id
- Each flow must have a unique flow ID. The flow ID is usually originated from a user facing endpoint
through an event adapter. For example, you may write an adapter to listen to a cloud event in a serverless deployment.
In The most common one is the HTTP adapter.
The flow ID is originated from the "rest.yaml". The flow-engine
will find the corresponding flow configuration
and create a new flow instance to process the user request.
flow.description
- this describes the purpose of the flow
flow.ttl
- "Time to live (TTL)" timer for each flow. You can define the maximum time for a flow to finish processing.
All events are delivered asynchronously and there is no timeout value for each event. The TTL defines the time budget
for a complete end-to-end flow. Upon expiry, an unfinished flow will be aborted. You can use suffix "s" for seconds,
"m" for minutes and "h" for hours. e.g. "30s" for 30 seconds.
Note: When using the HTTP Flow Adapter, the flow.ttl value can be higher than the REST endpoint's timeout value. This would happen when one of your tasks in the event flow responds to the caller and the event flow continues to execute the rest of the flow. This type of task is called "response" task.
first.task
- this points to the route name of a function (aka "task") to which the flow engine will deliver
the incoming event.
The configuration file contains a list of task entries where each task is defined by "input", "process", "output" and "execution" type. In the above example, the execution type is "end", meaning that it is the end of a transaction and its result set will be delivered to the user.
Underlying Event System
The Event Script system uses platform-core as the event system where it encapsulates Java Virtual Threads, Eclipse Vertx, Kotlin coroutine and suspend function.
The integration points are intentionally minimalist. For most use cases, the user application does not need to make any API calls to the underlying event system.
REST automation and HTTP flow adapter
The most common transaction entry point is a REST endpoint. The event flow may look like this:
REQUEST -> "http.request" -> "task.executor" -> user defined tasks
-> "async.http.response" -> RESPONSE
REST automation is part of the platform-core library. It contains a non-blocking HTTP server that converts HTTP requests and responses into events.
It routes an HTTP request event to the HTTP adapter if the "flow" tag is provided.
In the following example, the REST endpoint definition is declared in a "rest.yaml" configuration. It will route the URI "/api/decision" to the HTTP flow adapter that exposes its service route name as "http.flow.adapter".
rest:
- service: "http.flow.adapter"
methods: ['GET']
url: "/api/decision?decision=_"
flow: 'decision-test'
timeout: 10s
cors: cors_1
headers: header_1
tracing: true
The "cors" and "headers" sections are optional. When specified, the REST endpoint will insert CORS headers and HTTP request headers accordingly.
For REST automation syntax, please refer to Chapter 3
The HTTP flow adapter maps the HTTP request dataset and the flow ID into a standard event envelope for delivery to the flow engine.
The HTTP request dataset, addressable with the "input." namespace, contains the following:
Key | Values |
---|---|
method | HTTP method |
uri | URI path |
header | HTTP headers |
cookie | HTTP cookies |
path_parameter | Path parameters if any |
query | HTTP query parameters if any |
body | HTTP request body if any |
stream | input stream route ID if any |
ip | remote IP address |
filename | filename if request is a multipart file upload |
session | authenticated session key-values if any |
For easy matching, please use lower case for headers, cookies, query and path parameters.
Regular API uses JSON and XML and they will be converted to a hash map in the event's body.
For special use cases like file upload/download, your application logic may invoke a streaming API to retrieve the binary payload. Please refer to Appendix-III
Task and its corresponding function
Each task in a flow must have a corresponding function. You can assign a task name to the function using the
Preload
annotation like this.
@PreLoad(route="greeting.demo", instances=10)
public class Greetings implements TypedLambdaFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> handleEvent(Map<String, String> headers, Map<String, Object> input, int instance) {
// business logic here
return someOutput;
}
}
The "route" in the Preload
annotation is the task name. The "instances" define the maximum number of "workers" that
the function can handle concurrently. The system is designed to be reactive and the function does not consume memory
and CPU resources until an event arrives.
You may also define concurrency using environment variable. You can replace the "instances" with envInstances
using
standard environment variable syntax like ${SOME_ENV_VARIABLE:default_value}
.
Unique task naming
Composable functions are designed to be reusable. By changing some input data mapping to feed different parameters and payload, your function can behave differently.
Therefore, it is quite common to use the same function (i.e. the process
parameter) more than once in a single
event flow.
When a task is not named, the "process" parameter is used to name the task.
Since each task must have a unique name for event routing, we cannot use the same "process" name more than once in
an event flow. To handle this use case, you can create unique names for the same function using the name
parameter
like this:
flow:
id: 'greetings'
description: 'Simplest flow'
ttl: 10s
first.task: 'my.first.task'
tasks:
- name: 'my.first.task'
input:
- 'input.path_parameter.user -> user'
process: 'greeting.demo'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: sequential
next:
- 'another.task'
The above event flow configuration uses "my.first.task" as a named route for "greeting.demo" by adding the "name" parameter to the composable function.
Note: The Event Manager performs event choreography using the unique task name. Therefore, when the "process" name for the function is not unique, you must create unique task "names" for the same function to ensure correct routing.
Assigning multiple route names to a single function
The built-in distributed tracing system tracks the actual composable functions using the "process" name and not the task names.
When there is a need to track the task names in distributed trace, you can tell the system to create additional instances of the same function with different route names.
You can use a comma separated list as the route name like this:
@PreLoad(route="greeting.case.1, greeting.case.2", instances=10)
public class Greetings implements TypedLambdaFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> handleEvent(Map<String, String> headers, Map<String, Object> input, int instance) {
// business logic here
return someResult;
}
}
Note: The "unique task naming" method is more memory efficient than creating additional route names
Preload overrides
Once a composable function is published as a reusable library in the artifactory, its route name and number of instances are fixed using the "PreLoad" annotation in the function class.
Without refactoring your libary, you can override its route name and instances using a preload override file like this:
preload:
- original: 'greeting.demo'
routes:
- 'greeting.case.1'
- 'greeting.case.2'
# the "instances" tag is optional
instances: 20
- original: 'v1.another.reusable.function'
keep-original: true
routes:
- 'v1.reusable.1'
- 'v1.reusable.2'
In the above example, the function associated with "greeting.demo" will be preloaded as "greeting.case.1" and "greeting.case.2". The number of maximum concurrent instances is also changed from 10 to 20.
In the second example, "v1.another.reusable.function" is updated as "v1.reusable.1" and "v1.reusable.2" and the number of concurrent instances is not changed. The original route "v1.another.reusable.function" is preserved when the "keep-original" parameter is set to true.
Assuming the above file is "preload-override.yaml" in the "resources" folder of the application source code project, you should add the following parameter in application.properties to activate this preload override feature.
yaml.preload.override=classpath:/preload-override.yaml
Multiple preload override config files
When you publish a composable function as a library, you may want to ensure the route names of the functions are merged properly. In this case, you can bundle a library specific preload override config file.
For example, your library contains a "preload-kafka.yaml" to override some route names, you can add it to the yaml.preload.override parameter like this:
yaml.preload.override=classpath:/preload-override.yaml, classpath:/preload-kafka.yaml
The system will then merge the two preload override config files.
The concurrency value of a function is overwritten using the "instances" parameter in the first preload override file. Subsequent override of the "instances" parameter is ignored. i.e. the first preload override file will take precedence.
Hierarchy of flows
Inside a flow, you can run one or more sub-flows.
To do this, you can use the flow protocol identifier (flow://
) to indicate that the task is a flow.
For example, when running the following task, "flow://my-sub-flow" will be executed like a regular task.
tasks:
- input:
- 'input.path_parameter.user -> header.user'
- 'input.body -> body'
process: 'flow://my-sub-flow'
output:
- 'result -> model.pojo'
description: 'Execute a sub-flow'
execution: sequential
next:
- 'my.next.function'
If the sub-flow is not available, the system will throw an error stating that it is not found.
Hierarchy of flows would reduce the complexity of a single flow configuration file. The "time-to-live (TTL)" value of the parent flow should be set to a value that covers the complete flow including the time used in the sub-flows.
Note: For simplicity, the input data mapping for a sub-flow should contain only the "header" and "body" arguments.
Tasks and data mapping
All tasks for a flow are defined in the "tasks" section.
Input/Output data mapping
A function is self-contained. This modularity reduces application complexity because the developer only needs interface contract details for a specific function.
To handle this level of modularity, the system provides configurable input/output data mapping.
Namespaces for I/O data mapping
Type | Keyword and/or namespace | LHS / RHS | Mappings |
---|---|---|---|
Flow input dataset | input. |
left | input |
Flow output dataset | output. |
right | output |
Function input body | no namespace required | right | input |
Function input or output headers | header or header. |
right | I/O |
Function output result set | result. |
left | output |
Function output status code | status |
left | output |
Decision value | decision |
right | output |
State machine dataset | model. |
left/right | I/O |
External state machine key-value | ext: |
right | I/O |
Note: The external state machine namespace uses the colon character (
:
) to indicate that the key-value is external.
Constants for input data mapping
Type | Keyword for the left-hand-side argument |
---|---|
String | text(example_value) |
Integer | int(number) |
Long | long(number) |
Float | float(number) |
Double | double(number) |
Boolean | boolean(true or false) |
Map | map(k1=v1, k2=v2) map(base.config.parameter) |
File | file(text:file_path) file(binary:file_path) |
Classpath | classpath(text:file_path) classpath(binary:file_path) |
For input data mapping, the "file" constant type is used to load some file content as an argument of a user function. You can tell the system to render the file as "text" or "binary". Similarly, the "classpath" constant type refers to static file in the application source code's "resources" folder.
The "map" constant type is used for two purposes:
1. Map of key-values
The following example illustrates creation of a map of key-values. In the first entry, a map of 2 key-values is set as the input argument "myMap" of a user function. In the second entry, the map's values are retrieved from the key "some.key" in base configuration and the environment variable "ENV_VAR_ONE".
'map(k1=v1, k2=v2) -> myMap'
'map(k1=${some.key}, k2=${ENV_VAR_ONE}) -> myMap'
Note: The comma character is used as a separator for each key-value pair. If the value contains a comma, the system cannot parse the key-values correctly. In this case, please use the 2nd method below.
2. Mapping values from application.yml
The following input data mapping sets the value of "my.key" from the application.yml base configuration file to the input argument "myKey" of a user function.
'map(my.key) -> myKey'
Since the system uses both application.properties and application.yml as base configuration files, you can use either configuration files depending on the data type of the value.
For application.properties, "map(my.key)" is the same as "text(${my.key})".
For application.yml, "map(my.key)" would set a primitive value (text, integer, float, boolean), a hash map of key-values or an array of values.
Special content type for output data mapping
Type | Keyword for the right-hand-side argument |
---|---|
File | file(file_path) |
For output data mapping, the "file" content type is used to save some data from the output of a user function to a file in the local file system.
Decision value
The "decision" keyword applies to "right hand side" of output data mapping statement in a decision task only (See "Decision" in the task section).
Each flow has its own input and output
Each function has its input headers, input body and output result set. Optionally, a function can return an EventEnvelope object to hold its result set in the "body", a "status" code and one or more header key-values.
Since each function is stateless, a state machine (with namespace model.
) is available as a temporary memory store
for transaction states that can be passed from one task to another.
All variables are addressable using the standard dot-bracket convention.
For example, "hello.world" will retrieve the value 100
from this data structure:
{
"hello": {
"world": 100
}
}
and "numbers[1]" will retrieve the value 200
below:
{ "numbers": [100, 200] }
The assignment is done using the assignment (->
) syntax.
In the following example, the HTTP input query parameter 'amount' is passed as input body argument 'amount' to the task 'simple.decision'. The result (function "return value") from the task will be mapped to the special "decision" variable that the flow engine will evaluate. This assumes the result is a boolean or numeric value.
The "decision" value is also saved to the state machine (model
) for subsequent tasks to evaluate.
- input:
- 'input.query.amount -> amount'
process: 'simple.decision'
output:
- 'result -> decision'
- 'result -> model.decision'
Metadata for each flow instance
For each flow instance, the state machine in the "model" namespace provides the following metadata that you can use in the input/output data mapping. For example, you can set this for an exception handler to log additional information.
Type | Keyword | Comment |
---|---|---|
Flow ID | model.flow |
The ID of the event flow config |
Trace ID | model.trace |
Optional traceId when tracing is turned on |
Correlation ID | model.cid |
Correlation ID of the inbound request |
Special handling for header
When function input keyword header
is specified in the "right hand side" of an input data mapping statement,
it refers to the input event envelope's headers. Therefore, it assumes the "left hand side" to resolve into
a Map object of key-values. Otherwise, it will reject the input data mapping statement with an error like this:
Invalid input mapping 'text(ok) -> header', expect: Map, Actual: String
When function input namespace header.
is used, the system will map the value resolved from the "left hand side"
statement into the specific header.
For example, the input data mapping statement text(ok) -> header.demo
will set "demo=ok" into the input event
envelope's headers.
When function output keyword header
is specified in the "left hand side" of an output data mapping statement,
it will resolve as a Map from the function output event envelope's headers.
Similarly, when function output namespace header.
is used, the system will resolve the value from a specific
key of the function output event envelope's headers.
Function input and output
To support flexible input data mapping, the input to a function must be either Map<String, Object>
or PoJo
.
However, the output (i.e. result set) of a function can be Map, PoJo or Java primitive.
Your function should implement the TypedLambdaFunction
interface to configure input and output.
Since a data structure is passed to your function's input argument as key-values, you may create a PoJo class to deserialize the data structure.
To tell the system that your function is expecting input as a PoJo, you can use the special notation *
on
the right hand side.
For example, the following entry tells the system to set the value in "model.dataset" as a PoJo input.
- input:
- 'model.dataset -> *'
Note: If the value from the left hand side is not a map, the system will ignore the input mapping command and print out an error message in the application log.
Setting function input headers
When function input body is used to hold a PoJo, we may use function input headers to pass other arguments to the function without changing the data structure of a user defined PoJo.
In the following example, the HTTP query parameter "userid" will be mapped to the function input header key "user" and the HTTP request body will be mapped to the function input body.
- input:
- 'input.query.userid -> header.user'
- 'input.body -> *'
process: 'my.user.function'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
Task types
Decision task
A decision task makes decision to select the next task to execute. It has the tag execution=decision
.
In the output data mapping section, it must map the corresponding result set or its key-value to the decision
object.
The "next" tag contains a list of tasks to be selected based on the decision value.
If decision value is boolean, a true
value will select the first task. Otherwise, the second task will be selected.
If decision value is an integer, the number should start from 1 where the corresponding "next" task will be selected.
tasks:
- input:
- 'input.query.decision -> decision'
process: 'simple.decision'
output:
- 'result -> model.decision'
- 'result -> decision'
description: 'Simple decision test'
execution: decision
next:
- 'decision.case.one'
- 'decision.case.two'
Response task
A response task will provide result set as a flow output or "response". A response task allows the flow to respond to the user or caller immediately and then move on to the next task asynchronously. For example, telling the user that it has accepted a request and then moving on to process the request that may take longer time to run.
A response task has the tag execution=response
and a "next" task.
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
- 'result -> output.body'
description: 'Pass a pojo to another task'
execution: response
next:
- 'sequential.two'
End task
An end task indicates that it is the last task of the transaction processing in a flow. If the flow has not executed a response task, the end task will generate the response. Response is defined by output data mapping.
This task has the tag execution=end
.
For example, the greeting task in the unit tests is an end task.
- input:
- 'input.path_parameter.user -> user'
process: 'greeting.demo'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: end
Sequential task
Upon completion of a sequential task, the next task will be executed.
This task has the tag execution=sequential
.
In the following example, sequential.two
will be executed after sequential.one
.
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
description: 'Pass a pojo to another task'
execution: sequential
next:
- 'sequential.two'
Parallel task
Upon completion of a parallel
task, all tasks in the "next" task list will be executed in parallel.
This task has the tag execution=parallel
.
In this example, parallel.one
and parallel.two
will run after begin.parallel.test
tasks:
- input:
- 'int(2) -> count'
process: 'begin.parallel.test'
output: []
description: 'Setup counter for two parallel tasks'
execution: parallel
next:
- 'parallel.one'
- 'parallel.two'
Fork-n-join task
Fork-n-join is a parallel processing pattern.
A "fork" task will execute multiple "next" tasks in parallel and then wait for the result sets before running the "join" task.
This task has the tag execution=fork
. It must have a list of "next" tasks and a "join" task.
It may look like this:
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
description: 'Pass a pojo to another task'
execution: fork
next:
- 'echo.one'
- 'echo.two'
join: 'join.task'
Sink task
A sink task is a task without any next tasks. Sink tasks are used by fork-n-join and pipeline tasks as reusable modules.
This task has the tag execution=sink
.
- input:
- 'text(hello-world-two) -> key2'
process: 'echo.two'
output:
- 'result.key2 -> model.key2'
description: 'Hello world'
execution: sink
Pipeline feature
Pipeline is an advanced feature of Event Script.
Pipeline task
A pipeline is a list of tasks that will be executed orderly within the current task.
When the pipeline is done, the system will execute the "next" task.
This task has the tag execution=pipeline
.
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
description: 'Pass a pojo to another task'
execution: pipeline
pipeline:
- 'echo.one'
- 'echo.two'
next:
- 'echo.three'
Some special uses of pipelines include "for/while-loop" and "continue/break" features.
Simple for-loop
In the following example, the loop.statement
contains a for-loop that uses a variable in the state machine to
evaluate the loop.
In this example, the pipeline will be executed three times before passing control to the "next" task.
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
description: 'Pass a pojo to another task'
execution: pipeline
loop:
statement: 'for (model.n = 0; model.n < 3; model.n++)'
pipeline:
- 'echo.one'
- 'echo.two'
- 'echo.three'
next:
- 'echo.four'
Simple while loop
The loop.statement
may use a "while loop" syntax like this:
loop:
statement: 'while (model.running)'
To exit the above while loop, one of the functions in the pipeline should return a boolean "false" value with
output "data mapping" to the model.running
variable.
For loop with break/continue decision
In the following example, the system will evaluate if the model.quit
variable is true.
If yes, the break
or continue
condition will be executed.
The state variable is obtained after output data mapping and any task in the pipeline can set a key-value for mapping into the state variable.
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.seq -> sequence'
process: 'sequential.one'
output:
- 'result -> model.pojo'
description: 'Pass a pojo to another task'
execution: pipeline
loop:
statement: 'for (model.n = 0; model.n < 3; model.n++)'
condition: 'if (model.quit) break'
pipeline:
- 'echo.one'
- 'echo.two'
- 'echo.three'
next:
- 'echo.four'
Note that the "condition" parameter can be a single condition or a list of conditions. In the following example, the system will evaluate both the model.quit and model.jump values.
loop:
statement: 'for (model.n = 0; model.n < 3; model.n++)'
condition:
- 'if (model.quit) break'
- 'if (model.jump) break'
Handling exception
You can define exception handler at the top level or at the task level.
Exception is said to occur when a user function throws exception or returns an EventEnvelope object with a status code equals to or larger than 400.
The event status uses the same numbering scheme as HTTP exception status code. Therefore, status code less than 400 is not considered an exception.
Top-level exception handler
Top-level exception handler is a "catch-all" handler. You can define it like this:
flow:
id: 'greetings'
description: 'Simplest flow of one task'
ttl: 10s
exception: 'v1.my.exception.handler'
In this example, the v1.my.exception.handler
should point to a corresponding exception handler that you provide.
The following input arguments will be delivered to your function when exception happens.
Key | Description |
---|---|
status | Exception status code |
message | Error message |
stack | Stack trace in a text string |
The exception handler function can be an "end" task to abort the transaction or a decision task to take care of the exception. For example, the exception handler can be a "circuit-breaker" to retry a request.
Note: for efficiency, stack trace transport is limited to the first 10 lines.
Task-level exception handler
You can attach an exception handler to a task. One typical use is the "circuit breaker" pattern. In the following example, the user function "breakable.function" may throw an exception for some error condition. The exception will be caught by the "v1.circuit.breaker" function.
- input:
- 'input.path_parameter.accept -> accept'
- 'model.attempt -> attempt'
process: 'breakable.function'
output:
- 'int(0) -> model.attempt'
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'This demo function will break until the "accept" number is reached'
execution: end
exception: 'v1.circuit.breaker'
The configuration for the circuit breaker function may look like this:
- input:
- 'model.attempt -> attempt'
- 'int(2) -> max_attempts'
- 'error.code -> status'
- 'error.message -> message'
- 'error.stack -> stack'
process: 'v1.circuit.breaker'
output:
- 'result.attempt -> model.attempt'
- 'result.decision -> decision'
- 'result.status -> model.status'
- 'result.message -> model.message'
description: 'Just a demo circuit breaker'
execution: decision
next:
- 'breakable.function'
- 'abort.request'
An exception handler will be provided with the "error" object that contains error code, error message and a stack trace. The exception handler can inspect the error object to make decision of the next step.
For circuit breaker, we can keep the number of retry attempts in the state machine under "model.attempt" or any key name that you prefer. In the above example, it sets an integer constant of 2 for the maximum attempts.
The circuit breaker can then evaluate if the number of attempts is less than the maximum attempts. If yes, it will return a decision of "true" value to tell the system to route to the "breakable.function" again. Otherwise, it will return a decision of "false" value to abort the request.
A more sophisticated circuit breaker may be configured with "alternative execution paths" depending on the error status and stack trace. In this case, the decision value can be a number from 1 to n that corresponds to the "next" task list.
Exception handlers may be used in both queries and transactions. For a complex transaction, the exception handler may implement database rollback logic or recovery mechanism.
Best practice
When a task-level exception handler throws exception, it will be caught by the top-level exception handler, if any.
A top-level exception handler should not throw exception. Otherwise it may go into an exception loop.
Therefore, we recommend that an exception handler should return regular result set in a PoJo or a Map object.
An example of task-level exception handler is shown in the "HelloException.class" in the "task" folder where it set the status code in the result set so that the system can map the status code from the result set to the next task or to the HTTP output status code.
Advanced features
Simple type matching and conversion
Event script's state machine supports simple type matching and conversion. This "impedance matching" feature allows us to accommodate minor interface contract changes without refactoring business logic of a user function.
This is supported in both the left-hand-side and right-hand-side of both input and output data mappings.
For the left-hand-side, the state machine's model value is matched or converted to the target data type before setting the value of the right-hand-side. The state machine values are unchanged.
For the right-hand-side, the matched or converted value is applied to the state machine's model value.
The syntax is model.somekey:type
where "type" is one of the following:
Type | Match value as | Example |
---|---|---|
text | text string | model.someKey:text |
binary | byte array | model.someKey:binary |
int | integer or -1 if not numeric | model.someKey:int |
long | long or -1 if not numeric | model.someKey:long |
float | float or -1 if not numeric | model.someKey:float |
double | double or -1 if not numeric | model.someKey:double |
boolean | true or false | model.someKey:boolean |
boolean(value) | true if value matches | model.someKey:boolean(positive) |
boolean(value=true) | true if value matches | model.someKey:boolean(positive=true) |
boolean(value=false) | false if value matches | model.someKey:boolean(negative=false) |
and(model.key) | boolean AND of 2 model keys | model.someKey:and(model.another) |
or(model.key) | boolean OR of 2 model keys | model.someKey:or(model.another) |
substring(start, end) | extract a substring | model.someKey:substring(0, 5) |
substring(start) | extract a substring | model.someKey:substring(5) |
b64 | byte-array to Base64 text | model.someKey:b64 |
b64 | Base64 text to byte-array | model.someKey:b64 |
For boolean with value matching, the value can be null. This allows your function to test if the key-value in the left-hand-side is a null value.
For Base64 type matching, if the key-value is a text string, the system will assume it is a Base64 text string and convert it to a byte-array. If the key-value is a byte-array, the system will encode it into a Base64 text string.
An interesting use case of type matching is a simple decision task using the built-in no-op function. For example, when a control file for the application is not available, your application will switch to run in dev mode.
A sample task may look like this:
first.task: 'no.op'
tasks:
- input:
- 'file(binary:/tmp/interesting-config-file) -> model.is-local:boolean(null=true)'
process: 'no.op'
output:
- 'model.is-local -> decision'
execution: decision
next:
- 'start.in.dev.mode'
- 'start.in.cloud'
External state machine
The in-memory state machine is created for each query or transaction flow and it is temporal.
For complex transactions or long running work flows, you would typically want to externalize some transaction states to a persistent store such as a distributed cache system or a high performance key-value data store.
In these use cases, you can implement an external state machine function and configure it in a flow.
Below is an example from a unit test. When you externalize a key-value to an external state machine, you must configure the route name (aka level-3 functional topic) of the external state machine.
Note: Passing a
null
value to a key of an external state machine means "removal".
external.state.machine: 'v1.ext.state.machine'
tasks:
- input:
# A function can call an external state machine using input or output mapping.
# In this example, it calls external state machine from input data mapping.
- 'input.path_parameter.user -> ext:/${app.id}/user'
- 'input.body -> model.body'
# demonstrate saving constant to state machine and remove it using model.none
- 'text(world) -> ext:hello'
- 'model.none -> ext:hello'
process: 'no.op'
output:
- 'text(application/json) -> output.header.content-type'
# It calls external state machine again from output data mapping
- 'input.body -> ext:/${app.id}/body'
- 'input.body -> output.body'
- 'text(message) -> ext:test'
- 'model.none -> ext:test'
description: 'Hello World'
execution: end
The "external.state.machine" parameter is optional.
When present, the system will send a key-value from the current flow instance's state machine to the function implementing the external state machine. The system uses the "ext:" namespace to externalize a state machine's key-value.
Note: The delivery of key-values to the external state machine is asynchronous. Therefore, please assume eventual consistency.
You should implement a user function as the external state machine.
The input interface contract to the external state machine for saving a key-value is:
header.type = 'put'
header.key = key
body = value
Your function should save the input key-value to a persistent store.
In another flow that requires the key-value, you can add an initial task to retrieve from the persistent store and do "output data mapping" to save to the in-memory state machine so that your transaction flow can use the persisted key-values to continue processing.
In the unit tests of the event-script-engine subproject, these two flows work together:
externalize-put-key-value
externalize-get-key-value
IMPORTANT: Events to an external state machine are delivered asynchronously. If you want to guarantee message sequencing, please do not set the "instances" parameter in the
PreLoad
annotation.
To illustrate a minimalist implementation, below is an example of an external state machine in the event-script-engine's unit test section.
@PreLoad(route = "v1.ext.state.machine")
public class ExternalStateMachine implements LambdaFunction {
private static final Logger log = LoggerFactory.getLogger(ExternalStateMachine.class);
private static final ManagedCache store = ManagedCache.createCache("state.machine", 5000);
private static final String TYPE = "type";
private static final String PUT = "put";
private static final String GET = "get";
private static final String REMOVE = "remove";
private static final String KEY = "key";
@Override
public Object handleEvent(Map<String, String> headers, Object input, int instance) {
if (!headers.containsKey(KEY)) {
throw new IllegalArgumentException("Missing key in headers");
}
String type = headers.get(TYPE);
String key = headers.get(KEY);
if (PUT.equals(type) && input != null) {
log.info("Saving {} to store", key);
store.put(key, input);
return true;
}
if (GET.equals(type)) {
Object v = store.get(key);
if (v != null) {
log.info("Retrieve {} from store", key);
return v;
} else {
return null;
}
}
if (REMOVE.equals(type)) {
if (store.exists(key)) {
store.remove(key);
log.info("Removed {} from store", key);
return true;
} else {
return false;
}
}
return false;
}
}
Future task scheduling
You may add a “delay” tag in a task so that it will be executed later. This feature is usually used for unit tests or "future task scheduling".
Since the system is event-driven and non-blocking, the delay is simulated by event scheduling. It does not block the processing flow.
Type | Value | Example |
---|---|---|
Fixed delay | Milliseconds | delay: '1000 ms' |
Variable delay | State machine variable | delay: model.delay |
Note that the "ms" suffix is optional for documentation purpose. It denotes milliseconds if present.
When delay is set to a state variable that its value is not configured by a prior data mapping, the delay command will be ignored.
An example task that has an artificial delay of 2 seconds:
tasks:
- input:
- 'input.path_parameter.user -> user'
- 'input.query.ex -> exception'
- 'text(hello world) -> greeting'
process: 'greeting.test'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: end
delay: '2000 ms'
Chapter-3 | Home | Chapter-5 |
---|---|---|
REST Automation | Table of Contents | Build, Test and Deploy |