An Enterprise Service Bus is used to coordinate communications between multiple services in a service oriented architecture (SOA). ESB’s try to simplify large complex problems into smaller independent problems that can be devised, built, and maintained independently.
A large portion of an ESB is a message broker that can queue messages and distribute them to other services that need to receive them. Many brokers exist today, ActiveMQ, RabbitMQ, Apache Kafka, etc. They can be setup to support the enterprise customer’s needs by adding functionality such as persisting messages (guarantee delivery) and providing redundancy. Since they concentrate on message delivery, their performance can be tuned to provide very high message delivery rates.
Several message protocols for sending messages to these brokers exist, including Stomp, JMS, and AMQP. There are APIs for different languages to produce and consume messages to/from the brokers.
Applications can be written to communicate with the broker in both synchronous and asynchronous fashions and perform work based on the messages queued. One of the benefits of a message broker is that applications can be highly reactive by just listening on message queues. When a message arrives to the broker, the broker pushes the message to the application.
Now along comes microservices. Microservices try to solve the same problem by writing small independent applications to perform specific functions really well. Then the microservices are brought together to solve a larger problem. Typically microservices communicate together either via web-services or through message passing through a broker.
One of the benefits of microservices is that it is easier to write a bunch of smaller applications and the applications can be isolated from one another. However, one of the drawbacks is that it makes it harder to leverage common functionality that can be shared between the two. Forklift allows the developer to write microservices very quickly and benefit from having some common functionality that all services can use.
Forklift performs the the following prime duties of an ESB of:
-
An abstraction layer provided for sub-components
-
Control deployment and versioning of services
-
Marshal use of redundant services
-
Data transformation and mapping
-
Security and exception handling
The other requirements of an ESB are met by the use of a broker such as ActiveMQ. The following diagram shows how Forklift with a message broker combine to form an ESB.
Forklift is written in Java and and its consumers are to be written in Java. (can we say this will work with other JVM languages that can be loaded as a jar?) However, it can consume messages that are placed on a message queue written in any language and can use any of the standard Java communication protocols to communicate to other services.
Purposely, Forklift is built to quickly create consumers in code to perform actions based on messages. There is no XML configuration, there is no business process modeling, no developed flow tools. It is built for developers who can read/write code to implement redundant, retry capable, provable, testable, queued, message processing.
Just like a Java based application server, Forklift can be started up all on its own to do nothing. It doesn’t start processing anything until a consumer is deployed to it. Consumers are written to identify which queue/topic they will consume. A method within the class must be designated to process the messages and is run each time a new message is available. Other features such as designating methods to validate the messages can easily be added. Failed validation is handled by the Error Handler, see Standardized Exception/Error Handling.
Example:
@Queue("example")
public class MyConsumer {
@OnValidate
public List<String> validate() {
// code
return errors;
}
@OnMessage
public void handler() {
// code
}
}
The way that Forklift provides easy configuration is by using a set of Java Annotations provided by the system to wrap your business logic with core code to handle all the routine things provided by Forklift. Here are a few examples of annotations that provide functionality:
-
@Queue - specified on a class given a name will begin processing any messages on the named queue and running specified methods on the messages
-
@OnMessage - specified on a method designates the method that should process every time a message is received.
-
@Message - Specified on a property of the class will try and inject the received message into the property to be used to read the message contents easily.
Forklift provides a method to extend the base life-cycle functionality using plug-ins. There are several plug-ins that have already been developed, for example, the way to have a message retry on error, or create audit logs that store an entire message that can be replayed.
Years of development experience led to Forklift being developed. Forklift needed to process messages asynchronously, but end-users wanted to know exactly what happened with each of their messages. After many discussions, the developers decided on the following minimal life-cycle processing for each message.
-
Pending - Message has been picked up from the broker and is going to passed onto the consumer.
-
Validating - Forklift will start running any @OnValidate methods to validate the message.
-
Processing - Forklift is going to run any @OnMessage methods within the consumer to process the message.
-
Complete - Forklift successfully validated and processed the message without error.
-
Invalid - Validation did not succeed successfully so the message won’t be processed.
-
Error - Validation may have succeeded but some other type of error occurred while processing the message with the consumer.
Since the life-cycle is built for the developer to be able to track processing, Forklift provides the ability to intercept each step. Components written and installed within Forklift can add the @LifeCycle annotation on methods to have targeted code run when messages reach each step. With this ability, it is quite easy to write specialized message life-cycle auditing. Plug-ins are available that intercept the life-cycle to be able to replay messages or allow for retrying a message in the event that a message errors during processing.
The purpose of a consumer is to process messages off of queues or topics from the broker. To make this easier, Forklift provides easy configuration of the consumer by annotating the consumer Java class with annotations:
-
@Queue - All messages placed on the named queue will be passed to this consumer.
-
@Topic - All messages placed on the named topic will be passed to this consumer.
-
@MultiThread - Forklift will run the specified number of consumers to process messages off the queue when needed. Helps when consumers may be longer running processes.
-
@Order - Messages are guaranteed to run in order for an exclusive consumer.
-
@RequireSystem - TBD
While making it easy to consume messages, it is also easy to configure the consumer. Properties files containing data values that can be picked up by the consumer at run-time can be deployed independently of the consumer. This provides a quick way to deploy the same consumer in different environments (such as production or test) and just change the properties for different behavior. Files with the file extension .properties placed in the deployment directory will be scanned and made available to the consumers.
@Named @Entity and other Spring configurable objects. - TBD
A lot of the base code when dealing JMS messages from Java involves marshaling the message from the message into a usable object that can then just be used. Forklift provides nice annotations that can be placed on properties to do all this marshaling for you.
-
@Config("file") - Place this annotation on a java.util.Properties object. All properties deployed properties files are made available for use. If you dont specify a file, it will infer from the field name.
-
@Config("file", "field") - Injects the specified field from the property file. If you dont specify a field, it will try to infer from the field name.
-
@Message - Placed on different property types, this will try and marshal the message data into the object.
-
@Headers - Placed on a Map of string and object, Forklift will marshal the message headers into the map.
-
@Headers(Header….) - Injects a specific header.
-
@Properties - Placed on a Map of string and object, Forklift will marshal the message properties into the map.
-
@Properties("name") - Loads a specific property "name". If you just want to marshal a few specific properties but dont need the entire object, use this. If you don’t provide a name, it will infer from the field name.
@Inject - TBD
In helping the developer organize their code and inject code into the life-cycle, the developer can easily write message validation routines that can run before the message processing is started. Forklift provides an annotation to easily provide this functionality.
-
@OnValidate - Place this annotation on methods to validate the message. Any method that fails to validate will stop processing of the message and send the life-cycle to the Invalid state.
If a consumer errors (throwing an exception - Checked or Runtime), Forklift will automatically route the life-cycle to the Error state. This event can then be hooked to show the errors. For more information on logging and auditing see information about some of the plug-ins that have been built.
-
Download the forklift-server-x.x.zip release.
-
unzip the download
-
Using a command line,
cd
into the unzipped directory -
Create a directory named
forklift
in your home directory.
$ mkdir ~/forklift
-
Create a sub-directory named deploy.
$ mkdir ~/forklift/deploy
-
Start Forklift
Within the forklift-server-x.x directory run the following command:
$ bin/forklift-server -monitor1 ~/forklift/deploy -url embedded
At this point, you should see a lot of log output from Forklift showing that it is running, but it really isn’t doing anything other than waiting for you to start deploying consumers to it.
This guide will lead you through writing a quick consumer. Provided are build scripts for both Maven and Sbt. Please choose the method that you are most comfortable using. You can also do this directly within and IDE such as Eclipse, but this will be left up to the developer as their own exercise.
The Maven build file for your first consumer, if you’re into that kind of thing.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>forklift.consumer</groupId>
<artifactId>MyExampleConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>My Example Consumer</name>
<description>An example Forklift consumer.</description>
<repositories>
<repository>
<id>oss-sonatype</id>
<name>oss-sonatype</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.github.dcshock</groupId>
<artifactId>forklift</artifactId>
<version>[0.5,)</version>
</dependency>
</dependencies>
</project>
The Sbt build file for your first consumer, if you’re into that kind of thing.
import com.github.dcshock.SbtBinks._
organization := "forklift.consumer"
name := "MyExampleConsumer"
version := "0.1"
libraryDependencies ++= Seq(
"com.github.dcshock" % "forklift" % "[0.5,)" % "provided"
)
resolvers ++= Seq(
"Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
)
// Inform sbt-eclipse to not add Scala nature
EclipseKeys.projectFlavor := EclipseProjectFlavor.Java
// Remove scala dependency for pure Java libraries
autoScalaLibrary := false
// Remove the scala version from the generated/published artifact
crossPaths := false
// With this enabled, compiled jars are easier to debug in other projects
// variable names are visible.
javacOptions in compile ++= Seq("-source", "1.8", "-g:lines,vars,source", "-deprecation")
javacOptions in doc += "-Xdoclint:none"
addCommandAlias("dist", ";compile;binks")
binksSettings
For sbt, you will also be required to add these lines to your project/plugins.sbt
addSbtPlugin("com.github.dcshock" % "sbt-binks" % "0.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")
The first example, is very simple and should be easily understood by most developers. It listens to the "test" queue and then logs out the message it receives.
The source code is as follows:
package forklift.consumer;
import forklift.decorators.Message;
import forklift.decorators.OnMessage;
import forklift.decorators.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@Queue("test") // (1)
public class MyExampleConsumer {
Logger log = LoggerFactory.getLogger(MyExampleConsumer.class);
@Message // (2)
public Map<String, String> msg;
@OnMessage // (3)
public void processMyMessage() {
log.info("My message was: {}", msg);
}
}
-
Tells the consumer to listen to /queue/test for messages
-
Injects the message into the msg property.
-
Tells Forklift to run this method when a message is received.
$ mvn package
Now your jar will be available in the target directory named MyExampleConsumer-0.0.1-SNAPSHOT.jar
$ sbt package
Now your jar will be available in the target directory named myexampleconsumer-0.1.jar
With your jar now existing, it is quite easy to deploy your jar into the running Forklift instance. Just copy your jar file into ~/forklift/deploy and you should see log messages showing that the consumer is ready to consume messages on /queue/test. It may take a second or two since the deployment scanner is on a timer thread. See the following output from the logger:
{"timestamp":"2015-06-15T17:18:32.439Z","level":"INFO","thread":"run-main-0","logger":"org.reflections.Reflections","message":"Reflections took 54 ms to scan 1 urls, producing 1 keys and 1 values ","context":"default"}
{"timestamp":"2015-06-15T17:18:32.444Z","level":"INFO","thread":"run-main-0","logger":"forklift.consumer.ConsumerDeploymentEvents","message":"Deploying: Deployment [queues=[class forklift.consumer.MyExampleConsumer], topics=[], cl=forklift.classloader.ChildFirstClassLoader@5cbdc534, deployedFile=/Users/dthompson/forklift/deploy/myexampleconsumer-0.1.jar, reflections=org.reflections.Reflections@4e6016c]","context":"default"}
{"timestamp":"2015-06-15T17:18:32.458Z","level":"INFO","thread":"consumer-deployment-events-1","logger":"consumer-thread-test:1","message":"starting consumer","context":"default"}
In this Quickstart guide, Forklift is using an embedded version of ActiveMQ, so there isn’t a built-in way to send messages to a queue or topic. Most languages have API libraries that allow the developer to send messages to a broker. Below is a list of several of the APIs available.
-
stomp.py - A Python library for sending messages to ActiveMQ that has the stomp protocol enabled. Works okay for Json objects, but for K/V pairs needs modifications since its not possible to send newline characters.
-
stomp-client - For node.js, this client library can help easily send messages to to ActiveMQ. Our example will show how to use this library.
-
Net::STOMP::Client - Perl library for working with communicating with the Stomp protocol. Contributed by a developer at CERN, this API is quite robust.
-
Forklift Producer - A little more heavy-weight API for Java, but uses the same ActiveMQ connector that Forklift uses. It is based on JMS not the Stomp protocol. More information can be found outside the Quickstart guide.
-
Install node.js. For a quick reference, please see this howto.
-
Install npm (node package manager). Using the same method for installing node.js, you should be able to install npm as well.
-
Create a directory where you can write a small node application and cd into that directory.
-
Install the node stomp-client module.
npm install stomp-client
-
Modify frames.js. There is a small issue that needs to be corrected in the stomp-client that needs fixed in order to let our node application work.
Within the following file, modify the code
if (this.body.length > 0) {
to
if (this.body.length > 0 && !this.headers.hasOwnProperty("suppress-content-length")) {
Once you’ve completed the modification, create the node program below:
var Stomp = require('stomp-client');
var dest = process.argv[2];
var client = new Stomp('localhost', 61613, null, null);
client.on('error', function(e) {
console.log(e);
});
client.connect(function(sessionId) {
var msg = process.argv[3];
client.publish(dest, msg, {"suppress-content-length":"true", "persistent":"true"});
client.disconnect();
});
From command line, run the node program which you just created.
$ node sendmessage.js /queue/test $'who=Batman\ntype=Bat signal\n'
Notice the $'
syntax. That syntax allows you to send newlines within your message.
At this point, you should be able to look at the log output of Forklift and you
should see your message logged out. For example:
{"timestamp":"2015-06-15T17:54:36.747Z","level":"INFO","thread":"consumer-deployment-events-1","logger":"forklift.consumer.MyExampleConsumer","message":"My message was: {type=Bat signal, who=Batman}","context":"default"}
You can now go and start playing with your example consumer to make it have different
behavior and sending different types of messages. Try adding an @OnValidate
method
to make sure you have a valid message or change the @Message
property type to an
object and send Json messages instead of k/v pairs.
TBD
@On(step)
is an annotation that can be added to a method. Its parameter is any step except Pending. When entering that step, the method will be called. This can be used for easy error handling or chaining consumers.
Example:
@On(ProcessStep.Error)
@On(ProcessStep.Invalid)
public void sendEmail() {
emailer.send(maintainer, "oops!");
}
@On(ProcessStep.Complete)
public void nextStep() {
messageQueue.send("step2", msg);
}
A very popular broker in the world of Java Messaging Services (JMS) is Apache’s ActiveMQ. Forklift has been used very successfully with ActiveMQ as its primary broker. Forklift provides an out of the box connector for immediate use with ActiveMQ (located in connectors/ActiveMQ).