Spring Cloud Stream includes integration with Spring Cloud Function’s function-based programming model that lets the
business logic of an application be modeled as a java.util.Supplier
, a java.util.Function
, and a java.util.Consumer
,
representing the roles of a Source
, a Processor
, and a Sink
, respectively.
Building on this foundation, we can extend existing Source
and Sink
applications by importing the configuration of an
existing Source
or Sink
and adding code that defines a java.util.Function
— this delivers a lot of powerful composition
possibilities.
Take for instance, the Source
applications are auto-configured with functions, which may optionally
be included in a composite function definition.
With this, the same Source application can potentially do one or all of the following without having to build it out as a standalone processor.
-
execute SpEL transformations
-
enrich message headers
-
filter events
-
produce task launch requests on upstream events
For example, the time
source when it is running, as shown below, will perform a series of internal transformations to
finally publish a task launch request every second to the rabbit exchange with the name time-test
.
java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
--spring.cloud.stream.bindings.output.destination=time-test \
--spring.cloud.stream.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
--spel.function.expression="payload.length()" \
--header.enricher.headers=task-id=payload*2 \
--task.launch.request.task-name-expression="'task-'+headers['task-id']"
Now, the transformed message would look like:
headers:
task-id: 34
content_type: application/json
Payload
49 bytes
Encoding: string
{"args":[],"deploymentProps":{},"name":"task-34"}
Let us unpack what is happening behind the scenes. We will start with the following function definition.
timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction
-
Here, the function definition creates a composite
Supplier
beginning with the defaulttimeSupplier
Java function included in this repository, which is the foundation fortime-source
. -
The
spelFunction
applies to aMessage
from which we can extract and transform thepayload
orheaders
, by following standard Spring Integration conventions. -
The output of
spelFunction
is the length of the date-time String,17
. -
From here, we apply the header-enricher Java function to add a Message header,
task-id
with the value ofpayload*2
. That would be34
. -
We use the
task-id
in the header to generate the "task name", and to programmatically derive the task launch request using the SpEL expression "'task-'+headers['task-id']", ortask-34
.
This somewhat contrived example, but the goal here was to highlight the power of function composition.
If you have had a task definition in Spring Cloud Data Flow with the name task-34
, you could build a time | tasklauncher
streaming data pipeline to launch that task every second.
Before the 3.0
release of Stream Applications, this composition required extensive customization. And a lot more manual
configuration changes, extensions, and custom build of the applications.
Note
|
Support for composite functions includes auto-configuration for conventional binding name mappings (input and output )
derived from the function definition and the presence of spring.cloud.stream.bindings.output… .
|
In this example, --spring.cloud.stream.bindings.output.destination=time-test
is enabled behind the scenes by the auto-configured
property
--spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output
.