Skip to content

Commit bf5d53f

Browse files
authored
docs(examples): Kafka protocol example (#625)
* added package.json for kafka example Signed-off-by: Femi <alayesanmifemi@gmail.com> * feat: added client and admin for kafka topics Signed-off-by: Femi <alayesanmifemi@gmail.com> * feat: added producer for kafka sample Signed-off-by: Femi <alayesanmifemi@gmail.com> * feat: added consumer for kafka sample Signed-off-by: Femi <alayesanmifemi@gmail.com> * chore: remove dotenv import in sample file for kafka client Signed-off-by: Femi <alayesanmifemi@gmail.com> * chore: added readme with usage and kafka installation guide for easy usage Signed-off-by: Femi <alayesanmifemi@gmail.com> --------- Signed-off-by: Femi <alayesanmifemi@gmail.com>
1 parent c238951 commit bf5d53f

File tree

8 files changed

+235
-0
lines changed

8 files changed

+235
-0
lines changed

examples/kafka-ex/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Kafka Example
2+
3+
## Summary
4+
This is an example on how to use the cloudevents javascript sdk with Kafka in NodeJs.
5+
6+
7+
## Description
8+
A simple cli application sending user input as a cloudevent message through a kafka producer to a topic. And eventually, the cloudevent message is handled and deserialized correctly by a consumer within a consumer group subscribed to the same topic.
9+
10+
## Dependencies
11+
- NodeJS (>18)
12+
- Kafka running locally or remotely
13+
14+
## Local Kafka Setup with Docker
15+
16+
#### Option 1: Run Zookeeper and Kafka Dccker Images sequentially with these commands
17+
18+
```bash
19+
docker run -d \
20+
--name zookeeper \
21+
-e ZOOKEEPER_CLIENT_PORT=2181 \
22+
-e ZOOKEEPER_TICK_TIME=2000 \
23+
confluentinc/cp-zookeeper:7.3.2
24+
25+
```
26+
```bash
27+
docker run -d \
28+
--name kafka \
29+
-p 9092:9092 \
30+
-e KAFKA_BROKER_ID=1 \
31+
-e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \
32+
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
33+
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
34+
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
35+
--link zookeeper:zookeeper \
36+
confluentinc/cp-kafka:7.3.2
37+
38+
```
39+
40+
#### Option 2: Run both images using the docker compose file
41+
42+
```bash
43+
cd ${directory of the docker compose file}
44+
45+
docker compose up -d
46+
```
47+
48+
## Then, run the producer (cli) and consumer
49+
50+
#### To Start the Producer
51+
```bash
52+
npm run start:producer
53+
```
54+
55+
#### To Start the Consumer
56+
```bash
57+
npm run start:consumer ${groupId}
58+
```
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
version: '3'
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:7.3.2
6+
container_name: zookeeper
7+
environment:
8+
ZOOKEEPER_CLIENT_PORT: 2181
9+
ZOOKEEPER_TICK_TIME: 2000
10+
11+
kafka:
12+
image: confluentinc/cp-kafka:7.3.2
13+
container_name: kafka
14+
depends_on:
15+
- zookeeper
16+
ports:
17+
- "9092:9092"
18+
environment:
19+
KAFKA_BROKER_ID: 1
20+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
21+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
22+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
23+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

examples/kafka-ex/package.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"name": "kafka-ex",
3+
"version": "1.0.0",
4+
"description": "kafka example using CloudEvents",
5+
"repository": "https://github.com/cloudevents/sdk-javascript.git",
6+
"scripts": {
7+
"build": "tsc",
8+
"start:producer": "ts-node src/producer.ts",
9+
"start:consumer": "ts-node src/consumer.ts"
10+
},
11+
"keywords": [],
12+
"author": "",
13+
"license": "ISC",
14+
"type": "commonjs",
15+
"dependencies": {
16+
"cloudevents": "^10.0.0",
17+
"kafkajs": "^2.2.4",
18+
"ts-node": "^10.9.2"
19+
},
20+
"devDependencies": {
21+
"@types/node": "^22.13.2",
22+
"typescript": "^5.7.3"
23+
}
24+
}

examples/kafka-ex/src/admin.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/* eslint-disable */
2+
import kafka from "./client";
3+
4+
(async () => {
5+
const admin = kafka.admin();
6+
await admin.connect();
7+
await admin.createTopics({
8+
topics: [
9+
{
10+
topic: "events.cloudevents.test",
11+
numPartitions: 2,
12+
},
13+
],
14+
});
15+
await admin.disconnect();
16+
})();

examples/kafka-ex/src/client.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/* eslint-disable */
2+
import { Kafka } from "kafkajs";
3+
4+
const kafka = new Kafka({
5+
clientId: 'kafka-ex-client-id',
6+
brokers: ['localhost:9092'],
7+
});
8+
9+
export default kafka;

examples/kafka-ex/src/consumer.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/* eslint-disable */
2+
3+
import { Headers, Kafka, Message } from "cloudevents";
4+
import kafka from "./client";
5+
6+
const groupId = process.argv[2];
7+
8+
(async () => {
9+
const consumer = kafka.consumer({ groupId });
10+
await consumer.connect();
11+
12+
consumer.subscribe({ topic: "events.cloudevents.test" });
13+
14+
consumer.run({
15+
eachMessage: async ({ topic, partition, message }) => {
16+
console.log("Raw Kafka message:", {
17+
topic,
18+
partition,
19+
offset: message.offset,
20+
headers: message.headers,
21+
value: message.value?.toString(),
22+
});
23+
24+
try {
25+
const newHeaders: Headers = {};
26+
Object.keys(message.headers as Headers).forEach((key) => {
27+
// this is needed here because the headers are buffer values
28+
// when it gets to the consumer and the buffer headers are not valid for the
29+
// toEvent api from cloudevents, so this converts each key value to a string
30+
// as expected by the toEvent api
31+
newHeaders[key] = message!.headers![key]?.toString() ?? "";
32+
});
33+
34+
message.headers = newHeaders;
35+
const messageValue = Kafka.toEvent(
36+
message as unknown as Message<string>
37+
);
38+
39+
console.log("Deserialized CloudEvent:", messageValue);
40+
// message is automatically acknowledged when the callback is finished
41+
} catch (error) {
42+
console.error("Error deserializing CloudEvent:", error);
43+
console.log("Raw message value:", message.value?.toString());
44+
}
45+
},
46+
});
47+
})();

examples/kafka-ex/src/producer.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/* eslint-disable */
2+
3+
import { CloudEvent, Kafka } from "cloudevents";
4+
import readline from "readline";
5+
import kafka from "./client";
6+
7+
const rl = readline.createInterface({
8+
input: process.stdin,
9+
output: process.stdout,
10+
});
11+
12+
(async () => {
13+
const producer = kafka.producer();
14+
await producer.connect();
15+
16+
rl.setPrompt("Enter message > ");
17+
rl.prompt();
18+
rl.on("line", async (line) => {
19+
const event = new CloudEvent({
20+
source: "cloudevents-producer",
21+
type: "events.cloudevents.test",
22+
datacontenttype: "text/plain",
23+
partitionkey: "1",
24+
data: line,
25+
});
26+
27+
const message = Kafka.structured(event);
28+
29+
console.log("Sending CloudEvent:", message);
30+
31+
await producer.send({
32+
topic: "events.cloudevents.test",
33+
messages: [message],
34+
});
35+
rl.prompt();
36+
});
37+
38+
rl.on("close", async () => {
39+
await producer.disconnect();
40+
});
41+
})();

examples/kafka-ex/tsconfig.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"compilerOptions": {
3+
"target": "ES2020",
4+
"module": "commonjs",
5+
"allowJs": true,
6+
"checkJs": false,
7+
"strict": true,
8+
"noImplicitAny": true,
9+
"moduleResolution": "node",
10+
"esModuleInterop": true
11+
},
12+
"include": [
13+
"src/**/*.ts",
14+
"src/**/*.js",
15+
],
16+
"exclude": ["node_modules"]
17+
}

0 commit comments

Comments
 (0)