Skip to content

Commit

Permalink
v2 event receivers; Confluent Kafka (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuck-dbos authored Jan 9, 2025
1 parent 94ff5d2 commit eb311d8
Show file tree
Hide file tree
Showing 13 changed files with 1,026 additions and 31 deletions.
394 changes: 388 additions & 6 deletions package-lock.json

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions packages/dbos-confluent-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# DBOS Kafka Client Library (Confluent Version)

Publish/subscribe message queues are a common building block for distributed systems. Message queues allow processing to occur at a different place or time, perhaps in multiple client programming environments. Due to its performance, flexibility, and simple, scalable design, [Kafka](https://www.confluent.io/cloud-kafka) is a popular choice for publish/subscribe.

This package includes a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/typescript/tutorials/step-tutorial) for sending Kafka messages, as well as an event receiver for exactly-once processing of incoming messages (even using standard queues).

This package is based on [KafkaJS](https://kafka.js.org/). We are working on other client libraries for Kafka, please reach out to [us](https://www.dbos.dev/) if you are interested in a different client library.

## Configuring a DBOS Application with Kafka
Ensure that the DBOS Kafka (Confluent version) package is installed into the application:
```
npm install --save @dbos-inc/dbos-confluent-kafka
```

## Sending Messages

### Imports
First, ensure that the package classes are imported:
```typescript
import {
KafkaProducer,
KafkaConfig,
logLevel,
} from "@dbos-inc/dbos-confluent-kafka";
```

### Selecting A Configuration
`KafkaProducer` is a configured class. This means that the configuration (or config file key name) must be provided when a class instance is created, for example:
```typescript
const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.INFO, // FOR TESTING
}

kafkaCfg = DBOS.configureInstance(KafkaProducer, 'defKafka', kafkaConfig, kafkaTopic);
```

### Sending
Within a [DBOS Transact Workflow](https://docs.dbos.dev/typescript/tutorials/workflow-tutorial), call the `KafkaProducer` function from a workflow:
```typescript
const sendRes = await kafkaCfg.send({value: ourMessage});
```

## Receiving Messages
A tutorial for receiving and processing Kafka messages can be found [here](https://docs.dbos.dev/tutorials/requestsandevents/kafka-integration). This library provides an alternate implementation of the Kafka consumer that can be updated independently of the DBOS Transact core packages.

## Simple Testing
The `confluent-kafkajs.test.ts` file included in the source repository demonstrates setting up topics, and sending and processing Kafka messages. Before running, set the following environment variables:
- `KAFKA_BROKER`: Broker URL

## Next Steps
- To start a DBOS app from a template, visit our [quickstart](https://docs.dbos.dev/quickstart).
- For DBOS Transact programming tutorials, check out our [programming guide](https://docs.dbos.dev/typescript/programming-guide).
- To learn more about DBOS, take a look at [our documentation](https://docs.dbos.dev/) or our [source code](https://github.com/dbos-inc/dbos-transact).
230 changes: 230 additions & 0 deletions packages/dbos-confluent-kafka/confluent-kafka.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import {
DBOS,
parseConfigFile,
} from "@dbos-inc/dbos-sdk";

import {
KafkaProducer,
CKafkaConsume,
CKafka,
KafkaConfig,
Message,
logLevel,
}
from "./index";

import {
KafkaJS
}
from "@confluentinc/kafka-javascript"

// These tests require local Kafka to run.
// Without it, they're automatically skipped.
// Here's a docker-compose script you can use to set up local Kafka:

const _ = `
version: "3.7"
services:
broker:
image: bitnami/kafka:latest
hostname: broker
container_name: broker
ports:
- '9092:9092'
- '29093:29093'
- '19092:19092'
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092'
KAFKA_CFG_PROCESS_ROLES: 'broker,controller'
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_CFG_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
`

const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
//requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.INFO,
}

const ensureTopicExists = async (topicName: string) => {
const admin = new KafkaJS.Kafka({"bootstrap.servers": `${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`}).admin();

try {
// Connect to the admin client
await admin.connect();

// Check existing topics
const topics = await admin.listTopics();
if (topics.includes(topicName)) {
console.log(`Topic "${topicName}" already exists.`);
return;
}

// Create the topic
console.log(`Creating topic "${topicName}"...`);
await admin.createTopics({
topics: [
{
topic: topicName,
numPartitions: 1,
replicationFactor: 1,
},
],
});

console.log(`Topic "${topicName}" created successfully.`);
} catch (e) {
const error = e as Error;
console.error(`Failed to ensure topic exists: ${error.message}`);
} finally {
// Disconnect from the admin client
await admin.disconnect();
}
};

const wf1Topic = 'dbos-test-wf-topic';
const wf2Topic = 'dbos-test-wf-topic2';
const wfMessage = 'dbos-wf'
let wfCounter = 0;

const patternTopic = new RegExp(/^dbos-test-.*/);
let patternTopicCounter = 0;

const arrayTopics = [wf1Topic, wf2Topic];
let arrayTopicsCounter = 0;

describe("kafka-tests", () => {
let kafkaIsAvailable = true;
let wfKafkaCfg: KafkaProducer | undefined = undefined;
let wf2KafkaCfg: KafkaProducer | undefined = undefined;

beforeAll(async () => {
// Check if Kafka is available, skip the test if it's not
if (process.env['KAFKA_BROKER']) {
kafkaIsAvailable = true;
} else {
kafkaIsAvailable = false;
return;
}

await ensureTopicExists(wf1Topic);
await ensureTopicExists(wf2Topic);

const [cfg, rtCfg] = parseConfigFile({configfile: 'confluentkafka-test-dbos-config.yaml'});
DBOS.setConfig(cfg, rtCfg);

// This would normally be a global or static or something
wfKafkaCfg = DBOS.configureInstance(KafkaProducer, 'wfKafka', kafkaConfig, wf1Topic);
wf2KafkaCfg = DBOS.configureInstance(KafkaProducer, 'wf2Kafka', kafkaConfig, wf2Topic);
await DBOS.launch();
}, 30000);

afterAll(async() => {
await wfKafkaCfg?.disconnect();
await wf2KafkaCfg?.disconnect();
await DBOS.shutdown();
}, 30000);

test("txn-kafka", async () => {
if (!kafkaIsAvailable) {
console.log("Kafka unavailable, skipping Kafka tests")
return
}
else {
console.log("Kafka tests running...")
}
// Create a producer to send a message
await wf2KafkaCfg!.sendMessage({
value: wfMessage,
});
await wfKafkaCfg!.sendMessage({
value: wfMessage,
});
console.log("Messages sent");

// Check that both messages are consumed
console.log("Waiting for regular topic");
await DBOSTestClass.wfPromise;
expect(wfCounter).toBe(1);

console.log("Waiting for pattern topic");
await DBOSTestClass.patternTopicPromise;
expect(patternTopicCounter).toBe(2);

console.log("Waiting for array topic");
await DBOSTestClass.arrayTopicsPromise;
expect(arrayTopicsCounter).toBe(2);

console.log("Done");
}, 30000);
});

@CKafka(kafkaConfig)
class DBOSTestClass {
static wfResolve: () => void;
static wfPromise = new Promise<void>((r) => {
DBOSTestClass.wfResolve = r;
});

static patternTopicResolve: () => void;
static patternTopicPromise = new Promise<void>((r) => {
DBOSTestClass.patternTopicResolve = r;
});

static arrayTopicsResolve: () => void;
static arrayTopicsPromise = new Promise<void>((r) => {
DBOSTestClass.arrayTopicsResolve = r;
});

@CKafkaConsume(wf1Topic)
@DBOS.workflow()
static async testWorkflow(topic: string, _partition: number, message: Message) {
console.log(`got something 1 ${topic} ${message.value?.toString()}`);
if (topic === wf1Topic && message.value?.toString() === wfMessage) {
wfCounter = wfCounter + 1;
DBOSTestClass.wfResolve();
}
else {
console.warn(`Got strange message on wf1Topic: ${JSON.stringify(message)}`);
}
await DBOSTestClass.wfPromise;
}

@DBOS.workflow()
@CKafkaConsume(patternTopic)
static async testConsumeTopicsByPattern(topic: string, _partition: number, message: Message) {
console.log(`got something 2 ${topic}`);
const isWfMessage = topic === wf1Topic && message.value?.toString() === wfMessage;
const isWf2Message = topic === wf2Topic && message.value?.toString() === wfMessage;
if ( isWfMessage || isWf2Message ) {
patternTopicCounter = patternTopicCounter + 1;
if (patternTopicCounter === 2) {
DBOSTestClass.patternTopicResolve();
}
}
await DBOSTestClass.patternTopicPromise;
}

@CKafkaConsume(arrayTopics)
@DBOS.workflow()
static async testConsumeTopicsArray(topic: string, _partition: number, message: Message) {
console.log(`got something 3 ${topic}`);
const isWfMessage = topic === wf1Topic && message.value?.toString() === wfMessage;
const isWf2Message = topic === wf2Topic && message.value?.toString() === wfMessage;
if ( isWfMessage || isWf2Message) {
arrayTopicsCounter = arrayTopicsCounter + 1;
if (arrayTopicsCounter === 2) {
DBOSTestClass.arrayTopicsResolve();
}
}
await DBOSTestClass.arrayTopicsPromise;
}
}
17 changes: 17 additions & 0 deletions packages/dbos-confluent-kafka/confluentkafka-test-dbos-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# To enable auto-completion and validation for this file in VSCode, install the RedHat YAML extension
# https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml

# yaml-language-server: $schema=https://raw.githubusercontent.com/dbos-inc/dbos-transact/main/dbos-config.schema.json

database:
hostname: 'localhost'
port: 5432
username: 'postgres'
password: ${PGPASSWORD}
app_db_name: 'hello'
connectionTimeoutMillis: 3000
app_db_client: 'knex'
migrate:
- npx knex migrate:latest
rollback:
- npx knex migrate:rollback
Loading

0 comments on commit eb311d8

Please sign in to comment.