Project Flogo is an ultra-light, Go-based open source ecosystem for building event-driven apps. It provides a bunch of capabilities to build those apps, like:
In this tutorial you will learn how to use the Stream Processing capability in Flogo
This demo makes use of the Flogo CLI. If you don’t have that one running yet, please check out Getting Started with the Flogo CLI
If you have any questions, feel free to post an issue on GitHub and tag it as a question or chat with the team and community:
The first step to create a Flogo streams app is to create a quick, barebones Flogo app using the Flogo CLI. Using the Flogo CLI, you’ll need to specify that it should get the master branch of both flogo-lib
to make sure you can build a streaming app correctly. The command to execute is
flogo create -flv <branch you need> <appname>
So in your case, using the name streamfilter
, the command will be
flogo create -flv github.com/TIBCOSoftware/flogo-contrib/activity/log@master,github.com/TIBCOSoftware/flogo-lib/app/resource@master streamfilter
Open up the flogo.json
file that was created in the streamfilter directory and delete all the contents. For this tutorial, you’ll be guided through the different sections of the flogo.json and what they mean:
Before you can get to define the app, you’ll need to define some metadata that the app model needs as well. The fields name, type, version, and appModel describe the metadata of the app and the JSON model you’re building through this tutorial. For this tutorial, that part of the file will look like:
{
"name": "streamfilter",
"type": "flogo:app",
"version": "0.0.1",
"appModel": "1.0.0",
}
Flogo is an event-driven framework. A trigger is the entrypoint for events. A trigger can be a subscriber on an MQTT topic, Kafka topic, HTTP REST interface or a specific IoT sensor. The trigger is responsible for accepting the incoming event and invoking one or more defined actions. In this case, the trigger will be the REST trigger that comes out-of-the-box with Flogo. The trigger has a bunch of configurations that are important:
9234
GET
method registered for the endpoint /filter/:val
simple_filter
input
and is assigned the value of the PATH parameter val
Since there could be multiple triggers, the triggers element is an array. In this tutorial, though, you’ll only use one REST trigger
{
"triggers": [
{
"id": "receive_http_message",
"ref": "github.com/TIBCOSoftware/flogo-contrib/trigger/rest",
"name": "Receive HTTP Message",
"settings": {
"port": "9234"
},
"handlers": [
{
"settings": {
"method": "GET",
"path": "/filter/:val"
},
"action": {
"id": "simple_filter",
"mappings": {
"input": [
{
"mapTo": "input",
"type": "assign",
"value": "$.pathParams.val"
}
]
}
}
}
]
}
],
}
An action is a generic implementation for processing the incoming event. Different types of actions can be implemented, thus defining different methods by which an incoming event can be processed. In your case, a pipeline is needed (which is implemented by the github.com/project-flogo/stream
action) and it will dispatch the event to the resource with the URI res://pipeline:simple_filter
{
"actions": [
{
"id": "simple_filter",
"ref": "github.com/project-flogo/stream",
"settings": {
"pipelineURI": "res://pipeline:simple_filter"
}
}
]
}
The resources are the actual workhorses of the Flogo app. They define, among a ton of other things, the sequences in which activities have to be executed, rules that need to be followed and parameters that need to be mapped. First, let’s look at the metadata of the resource that defines the input and the output. In this case the input is an integer called input
(and no output).
{
"resources": [
{
"id": "pipeline:simple_filter",
"data": {
"metadata": {
"input": [
{
"name": "input",
"type": "integer"
}
]
},
}
The stages, as the name implies, define the sequential steps that a pipeline needs to perform. The first step is the filter activity, which:
non-zero
filterThe second activity is a log activity, where the message field is mapped straight from the value field of the filter activity. Note that in stream actions, unlike flow actions, only the output of the preceding activity is available and not all other outputs.
{
"stages": [
{
"ref": "github.com/TIBCOSoftware/flogo-contrib/activity/filter",
"settings": {
"type": "non-zero",
"proceedOnlyOnEmit": true
},
"input": {
"value": "=$.input"
}
},
{
"ref": "github.com/TIBCOSoftware/flogo-contrib/activity/log",
"input": {
"message": "=$.value"
}
}
]
}
The complete flogo.json will look like
{
"name": "streamfilter",
"type": "flogo:app",
"version": "0.0.1",
"appModel": "1.0.0",
"triggers": [
{
"id": "receive_http_message",
"ref": "github.com/TIBCOSoftware/flogo-contrib/trigger/rest",
"name": "Receive HTTP Message",
"settings": {
"port": "9234"
},
"handlers": [
{
"settings": {
"method": "GET",
"path": "/filter/:val"
},
"action": {
"id": "simple_filter",
"mappings": {
"input": [
{
"mapTo": "input",
"type": "assign",
"value": "$.pathParams.val"
}
]
}
}
}
]
}
],
"actions": [
{
"id": "simple_filter",
"ref": "github.com/project-flogo/stream",
"settings": {
"pipelineURI": "res://pipeline:simple_filter"
}
}
],
"resources": [
{
"id": "pipeline:simple_filter",
"data": {
"metadata": {
"input": [
{
"name": "input",
"type": "integer"
}
]
},
"stages": [
{
"ref": "github.com/TIBCOSoftware/flogo-contrib/activity/filter",
"settings": {
"type": "non-zero",
"proceedOnlyOnEmit": true
},
"input": {
"value": "=$.input"
}
},
{
"ref": "github.com/TIBCOSoftware/flogo-contrib/activity/log",
"input": {
"message": "=$.value"
}
}
]
}
}
]
}
In your app, you’re making use of a few external dependencies that you need to install into your app for the Go compiler to be able to successfully build the app. To install external dependencies, you need to run
flogo install <dependency location>
So in the case of this tutorial, you’ll need to run
flogo install github.com/project-flogo/stream
flogo install github.com/TIBCOSoftware/flogo-lib/app/resource
flogo install github.com/TIBCOSoftware/flogo-contrib/activity/filter
The next step is to build the executable. To build a flogo app from the source you can execute the command flogo build -e
, which tells the flogo cli to build the app (and place it in a bin directory) and embed all configuration into a single executable
flogo build -e
To run the app you just built, open a terminal and run
cd bin
./streamfilter
You can send commands to it using cURL and sending values that are non-zero should result in a log message. Events with a value of 0, will not result in a log message.
curl --request GET --url http://localhost:9234/filter/1
curl --request GET --url http://localhost:9234/filter/0
curl --request GET --url http://localhost:9234/filter/1
The above requests will result in the log activity being executed only twice
2018-10-04 08:16:46.048 INFO [engine] - Engine Starting...
2018-10-04 08:16:46.049 INFO [engine] - Starting Services...
2018-10-04 08:16:46.049 INFO [engine] - Started Services
2018-10-04 08:16:46.049 INFO [engine] - Starting Triggers...
2018-10-04 08:16:46.050 INFO [engine] - Trigger [ receive_http_message ]: Started
2018-10-04 08:16:46.050 INFO [engine] - Triggers Started
2018-10-04 08:16:46.050 INFO [engine] - Engine Started
2018-10-04 08:16:51.514 INFO [trigger-flogo-rest] - Received request for id 'receive_http_message'
2018-10-04 08:16:51.515 INFO [activity-flogo-log] - 1
2018-10-04 08:16:51.530 INFO [trigger-flogo-rest] - Received request for id 'receive_http_message'
2018-10-04 08:16:51.545 INFO [trigger-flogo-rest] - Received request for id 'receive_http_message'
2018-10-04 08:16:51.545 INFO [activity-flogo-log] - 1