- Workshop prerequisites and setup
- 0️⃣ Provisioning Confluent Cloud cluster
- 1️⃣ Loading referential data with Kafka Connect
- 2️⃣ Implementing a Stream Processor with Kafka Streams
- 3️⃣ Enrich transaction results with ksqlDB
- ✅ It’s a wrap!
- Special Thanks!
Developing Event-driven Microservices with Spring Boot, Confluent Cloud, and Java.
Ensure you install the following toolset on your computer:
-
Tip
You should have your login and password information handy after you sign up for Confluent Cloud. The ccloud
init script will ask you for your login information. -
Docker
-
Docker Compose (installed with Docker Desktop)
-
Git
-
Your favorite IDE or text editor
-
Personally, I recommend IntelliJ IDEA.
-
Before you proceed, be sure to complete the following steps:
git clone https://github.com/confluentinc/demo-scene #(1)
cd event-driven-microservices-workshop #(2)
-
Clone the repository
-
Change directory of the workshop folder
If you will follow steps below you should checkout only directory that has source code relevant to this post.
mkdir ~/temp/demo-scene
cd ~/temp/demo-scene
git init .
git remote add origin -f https://github.com/confluentinc/demo-scene/
git config core.sparsecheckout true
echo "event-driven-microservices-workshop/*" >> .git/info/sparse-checkout
git pull --depth=2 origin master
cd event-driven-microservices-workshop
ls -lh
$ cd scripts/ccloud
$ ccloud login --save #(1)
$ ./ccloud_stack_create.sh #(2)
-
Login to your Confluent Cloud account.
-
The CCloud Stack script will ask you to login to your CCloud account.
It will automatically provision Kafka and ksqlDB cluster.
Among other things, this script generates a config that we need to pass to the docker-compose
start command to connector container connect to the cloud Kafka cluster.
When ready, move to the next section, where you will generate some reference data.
To leverage the full power of stream processing, it is best to preload the required data in topics. Kafka Streams and ksqlDB will allow you to join and lookup data from your events with any other topic.
This section of the workshop will set up a Kafka Connect JDBC Source connector instance that will synchronize any data from a PostgreSQL instance to an account
topic in Kafka.
This exercise simulates a Change Data Capture pattern where we bridge an existing data source to Kafka real-time.
./start_connect.sh stack-configs/java-service-account-103523.config #(1)
-
Replace with actual service account ID you did get during «Provisioning Confluent Cloud cluster» step.
Within the workshop project, you will find a data-generator
folder containing an application designed to generate some random accounts in our PostgreSQL Account
DB.
This utility application will generate about 1000
test accounts.
The Data Generator also contains a REST endpoint to help us submit transaction requests to Kafka later during the workshop.
Note
|
Open a new terminal window in the workshop project folder. |
$ source ./scripts/ccloud/delta_configs/env.delta
$ ./gradlew :data-generator:build #(1)
$ java -jar data-generator/build/libs/data-generator-0.0.1-SNAPSHOT.jar #(2)
-
To build.
-
To run after build.
Note
|
To run the Data Generator application in your IDE launch the main method from
src/main/java/io/confluent/developer/ccloud/demo/kstream/DataGeneratorApplication.java.
Make sure you have environment variables set according to the delta_configs/env.delta file.
|
After the dataset generated, you should see the following output:
2020-08-26 22:58:44.507 INFO 15959 --- [unt-Generator-1] Account Service : Generated account number 1000.
Open a new terminal window and run the following command from the root of the workshop project folder:
./scripts/connect/deploy-jdbc-connector.sh #(1)
-
This command will start a connector instance.
Note
|
To validate the status of the connector, you can run
|
-
Access Confluent Cloud user interface from https://confluent.cloud.
-
From the main screen, navigate to an environment that looks like`demo-env-<some-number>.`
-
Inside of this environment, you should see a cluster that looks like`demo-kafka-cluster-<some-number>.` On the left side, click on 'Topics.`
-
Click on the
account
topic and access themessages
tab. -
Click on the
offset
textbox and type0
and press Enter the user interface to load all messages from partition0
starting from0
.
With the connector running, you should see account
events in the user interface.
In the next section, we will implement a highly scalable stream processing application using Kafka Streams.
Now is the time to get into the heart of the action. We will implement a Kafka Streams topology to process atomic transactions to any request submitted to the transaction-request
topic.
Within the workshop project folder, you will find a kstreams-demo
subfolder representing a Kafka Streams application.
Spring Boot and the spring-kafka
project handled the boilerplate code required to connect to Kafka.
This workshop will focus on writing a Kafka Streams
topology with the function processing for our use case.
Our business requirement states that we must check whether the funds are sufficient for every request received before updating the balance of the account being processed. We should never have two transactions being processed at the same time for the same account. This would create a race condition for which we have no guarantee we can enforce the balance check before withdrawing funds.
The Data Generator writes transaction requests to the Kafka topic with a key equal to the transaction’s account number. Therefore, we can be sure all messages of an account will be processed by a single thread for our Transaction Service no matter how many instances are concurrently running.
Kafka Streams won’t commit any message offset until it completes our business logic of managing a transaction request.
Because of our stream processor’s transaction nature, we require a specific component from Kafka Streams named a Transformer.
This utility allows us to process events one by one while interacting with a State Store
–another
component of Kafka Streams that help us to persist our account balance in a local instance of an embedded database - RocksDB.
Open the io.confluent.developer.ccloud.demo.kstream.TransactionTransformer
Java class and implement the transform
function to return a TransactionResult
based on the validity of the transaction request.
The TransactionResult
contains a success
flag set to true
if the funds were successfully updated.
The transform
method also updates the store
State Store.
The class already has utility functions to help you execute our business logic.
@Override
public TransactionResult transform(Transaction transaction) {
if (transaction.getType().equals(Transaction.Type.DEPOSIT)) {
return new TransactionResult(transaction,
depositFunds(transaction),
true,
null);
}
if (hasEnoughFunds(transaction)) {
return new TransactionResult(transaction, withdrawFunds(transaction), true, null);
}
log.info("Not enough funds for account {}.", transaction.getAccount());
return new TransactionResult(transaction,
getFunds(transaction.getAccount()),
false,
TransactionResult.ErrorType.INSUFFICIENT_FUNDS);
}
In Kafka Streams, a Topology
is the definition of your data flow.
It’s a manifest for all operations and transformations to be applied to your data.
To start a stream processor, Kafka Streams only requires you to build a`Topology` and hand it over. Kafka Streams will take care of managing the underlying consumers and producers.
The io.confluent.developer.ccloud.demo.kstream.KStreamConfig
Java class already contains all the boilerplate code required by Kafka Streams to start our processor.
In this exercise, we will leverage a StreamsBuilder
to define and instantiate a Topology
that will handle our transaction processing.
Open the io.confluent.developer.ccloud.demo.kstream.KStreamConfig.defineStreams
method and get ready to write your first Kafka Streams Topology.
Use the stream
method of streamsBuilder
to turn a topic into a KStream.
KStream<String, Transaction> transactionStream = streamsBuilder.stream("transaction-request");
To inform Kafka Streams that we want to update the funds
State Store for all incoming requests atomically, we can leverage the transformValues
operator to plugin our TransactionTransformer.
This operator requires us to specify the funds
State Store that the Transformer
will use.
This also instructs Kafka Streams to keep track of events from our transaction-request
since they will result in a change of state for our store.
KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);
With a new derived stream containing TransactionResult,
we can now use the information contained in the payload to feed a success or failure topic.
We will achieve this by deriving two streams from our resultStream.
Each stream will be built by applying a filter
and filterNot
operator with a predicate on the success
flag from our TransactionResult
payload.
With the two derived streams, we can explicitly call the to
operator to instruct Kafka
Streams to write the mutated events to their respective topics.
resultStream
.filter(this::success)
.to("transaction-successs");
resultStream
.filterNot(this::success)
.to("transaction-failed");
Use this reference implementation to validate you have the right stream definition.
protected void defineStreams(StreamsBuilder streamsBuilder) {
KStream<String, Transaction> transactionStream = streamsBuilder.stream(transactionRequestConfiguration.getName());
final String storeName = fundsStoreConfig.getName();
KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);
resultStream
.filter(this::success)
.to(transactionSuccessConfiguration.getName());
resultStream
.filterNot(this::success)
.to(transactionFailedConfiguration.getName());
}
Note
|
If you are running the application from your IDE, launch the main method from io.confluent.developer.ccloud.demo.kstream.KStreamDemoApplication .
|
If you want to run with the CLI, you must build the application before launching it.
./gradlew :kstreams-demo:build
java -jar kstreams-demo/build/libs/kstreams-demo-0.0.1-SNAPSHOT.jar
Ensure your Data Generator application is still running from the previous section.
The utility script scripts/generate-transaction.sh
will let you generate transactions.
Generate a few transactions using the following commands:
scripts/generate-transaction.sh 1 DEPOSIT 100 CAD
scripts/generate-transaction.sh 1 DEPOSIT 200 CAD
scripts/generate-transaction.sh 1 DEPOSIT 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 10000 CAD
scripts/generate-transaction.sh 2 DEPOSIT 100 CAD
scripts/generate-transaction.sh 2 DEPOSIT 50 CAD
scripts/generate-transaction.sh 2 DEPOSIT 300 CAD
scripts/generate-transaction.sh 2 WITHDRAW 300 CAD
The script will pass in the following arguments:
-
The account number.
-
The amount.
-
The type of operation (
DEPOSIT
orWITHDRAW
). -
The currency.
-
Access Confluent Cloud user interface from https://confluent.cloud.
-
From the main screen, navigate to the environment that looks like
demo-env-<some-number>.
-
Inside of the environment, you should see a cluster that looks like
demo-kafka-cluster-<some-number>.
On the left side, click onTopics.
-
Click on the
transaction-success
topic and access themessages
tab. -
Click on the
offset
textbox and type0
and press enter to load all messages from partition 0 starting from offset 0.
You should see transaction-success
events in the user interface. If you
don’t see any messages, try your luck with partition 1 starting from offset 0.
-
Click on the
topic
tab from the cluster navigation menu. -
Select the
transaction-failed
topic and access themessages
tab. -
Click on the
offset
textbox and type0
and press enter to load all messages from partition 0 starting from offset 0.
You should see transaction-failed
events in the user interface.
If you don’t see any messages, try your lock with partition 1 starting from offset 0.
In the next section, we will explore how writing Stream Processor can be simplified with ksqlDB.
In the first section of this workshop, we configured a JDBC Source Connector to load all account details into an account
topic.
In the next exercise, we will write a second Stream Processor to generate a detailed transaction statement enriched with account details.
Rather than within this new service as another Kafka Streams application, we will leverage ksqlDB to declare a stream processor that will enrich our transaction data in real-time with our referential data coming from the account
topic.
The objective of this section is to show how you can use an SQL-like query language to generate streams processors like Kafka Streams, without having to compile and run any custom software.
Tip
|
Connect to ksqlDB with CLI
In this exercise, we’re going to use ksqlDB Cloud UI. But you also can run CLI using docker.
|
ksqlDB is built on top of Kafka Streams. As such, the KStream
and KTable
are both key constructs for defining stream processors.
The first step requires us to instruct ksqlDB that we wish to turn the account
topic into a Table
.
This table will allow us to join each transaction-success
event with the latest account
event of the underlying topic.
Run the following command in your ksqlDB CLI terminal:
CREATE TABLE ACCOUNT (
numkey string PRIMARY KEY,
number INT,
cityAddress STRING,
countryAddress STRING,
creationDate BIGINT,
firstName STRING,
lastName STRING,
numberAddress STRING,
streetAddress STRING,
updateDate BIGINT
) WITH (
KAFKA_TOPIC = 'account',
VALUE_FORMAT='JSON'
);
Note
|
If you are getting error about accessing the
where |
Before we create the Transaction Statement
stream processor, we must also inform ksqlDB that we wish to turn the transaction-success
into a Stream.
Run the following command in your ksqlDB CLI terminal:
CREATE STREAM TRANSACTION_SUCCESS (
numkey string KEY,
transaction STRUCT<guid STRING, account STRING, amount DOUBLE, type STRING, currency STRING, country STRING>,
funds STRUCT<account STRING, balance DOUBLE>,
success boolean,
errorType STRING
) WITH (
kafka_topic='transaction-success',
value_format='json'
);
Now that we have all the ingredients of our Transaction Statement
stream processor, we can now create a new stream derived from our transaction-success
events paired with the latest data from the account
topic.
We will instruct ksqlDB to create a new stream as a query.
By default, ksqlDB will publish any output to a new TRANSACTION_STATEMENT
topic.
The select query provides the details about with events to subscribe and which table to join each notification.
The output of this new stream processor will be a mix of the transaction details coupled with all the matching account details.
The key from transaction-success
and account
will be used as matching criteria for the LEFT JOIN
command.
EMIT CHANGES
informs ksqlDB that the query is long-running and should be kept alive–as if it were a Kafka Streams application to be 100% available to process all events.
Run the following command in your ksqlDB CLI prompt:
CREATE STREAM TRANSACTION_STATEMENT AS
SELECT *
FROM TRANSACTION_SUCCESS
LEFT JOIN ACCOUNT ON TRANSACTION_SUCCESS.numkey = ACCOUNT.numkey
EMIT CHANGES;
-
Access Confluent Cloud user interface from https://confluent.cloud
-
From the main screen, navigate to environment that looks like
demo-env-<some-number>
. -
Inside of this environment, you should see a cluster that looks like
demo-kafka-cluster-<some-number>
. -
On the left side, click on
Topics.
-
Click on the
TRANSACTION_STATEMENT
topic and access themessages
tab. -
Click on the
offset
textbox and type0
and press enter to load all messages from partition 0 starting from offset0
.
Congratulations! Now you know how to build event-driven microservices using Spring Boot, Kafka Streams, and ksqlDB.
Now next section is very important!
Important
|
Don’t forget to clean up
|
This workshop is based on the work of Daniel Lavoie.
Much