Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 event receivers; Confluent Kafka #699

Merged
merged 12 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 388 additions & 6 deletions package-lock.json

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions packages/dbos-confluent-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# DBOS Kafka Client Library (Confluent Version)

Publish/subscribe message queues are a common building block for distributed systems. Message queues allow processing to occur at a different place or time, perhaps in multiple client programming environments. Due to its performance, flexibility, and simple, scalable design, [Kafka](https://www.confluent.io/cloud-kafka) is a popular choice for publish/subscribe.

This package includes a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/typescript/tutorials/step-tutorial) for sending Kafka messages, as well as an event receiver for exactly-once processing of incoming messages (even using standard queues).

This package is based on [KafkaJS](https://kafka.js.org/). We are working on other client libraries for Kafka, please reach out to [us](https://www.dbos.dev/) if you are interested in a different client library.

## Configuring a DBOS Application with Kafka
Ensure that the DBOS Kafka (Confluent version) package is installed into the application:
```
npm install --save @dbos-inc/dbos-confluent-kafka
```

## Sending Messages

### Imports
First, ensure that the package classes are imported:
```typescript
import {
KafkaProducer,
KafkaConfig,
logLevel,
} from "@dbos-inc/dbos-confluent-kafka";
```

### Selecting A Configuration
`KafkaProducer` is a configured class. This means that the configuration (or config file key name) must be provided when a class instance is created, for example:
```typescript
const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.INFO, // FOR TESTING
}

kafkaCfg = DBOS.configureInstance(KafkaProducer, 'defKafka', kafkaConfig, kafkaTopic);
```

### Sending
Within a [DBOS Transact Workflow](https://docs.dbos.dev/typescript/tutorials/workflow-tutorial), call the `KafkaProducer` function from a workflow:
```typescript
const sendRes = await kafkaCfg.send({value: ourMessage});
```

## Receiving Messages
A tutorial for receiving and processing Kafka messages can be found [here](https://docs.dbos.dev/tutorials/requestsandevents/kafka-integration). This library provides an alternate implementation of the Kafka consumer that can be updated independently of the DBOS Transact core packages.

## Simple Testing
The `confluent-kafkajs.test.ts` file included in the source repository demonstrates setting up topics, and sending and processing Kafka messages. Before running, set the following environment variables:
- `KAFKA_BROKER`: Broker URL

## Next Steps
- To start a DBOS app from a template, visit our [quickstart](https://docs.dbos.dev/quickstart).
- For DBOS Transact programming tutorials, check out our [programming guide](https://docs.dbos.dev/typescript/programming-guide).
- To learn more about DBOS, take a look at [our documentation](https://docs.dbos.dev/) or our [source code](https://github.com/dbos-inc/dbos-transact).
230 changes: 230 additions & 0 deletions packages/dbos-confluent-kafka/confluent-kafka.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import {
DBOS,
parseConfigFile,
} from "@dbos-inc/dbos-sdk";

import {
KafkaProducer,
CKafkaConsume,
CKafka,
KafkaConfig,
Message,
logLevel,
}
from "./index";

import {
KafkaJS
}
from "@confluentinc/kafka-javascript"

// These tests require local Kafka to run.
// Without it, they're automatically skipped.
// Here's a docker-compose script you can use to set up local Kafka:

const _ = `
version: "3.7"
services:
broker:
image: bitnami/kafka:latest
hostname: broker
container_name: broker
ports:
- '9092:9092'
- '29093:29093'
- '19092:19092'
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092'
KAFKA_CFG_PROCESS_ROLES: 'broker,controller'
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_CFG_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
`

const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
//requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.INFO,
}

const ensureTopicExists = async (topicName: string) => {
const admin = new KafkaJS.Kafka({"bootstrap.servers": `${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`}).admin();

try {
// Connect to the admin client
await admin.connect();

// Check existing topics
const topics = await admin.listTopics();
if (topics.includes(topicName)) {
console.log(`Topic "${topicName}" already exists.`);
return;
}

// Create the topic
console.log(`Creating topic "${topicName}"...`);
await admin.createTopics({
topics: [
{
topic: topicName,
numPartitions: 1,
replicationFactor: 1,
},
],
});

console.log(`Topic "${topicName}" created successfully.`);
} catch (e) {
const error = e as Error;
console.error(`Failed to ensure topic exists: ${error.message}`);
} finally {
// Disconnect from the admin client
await admin.disconnect();
}
};

const wf1Topic = 'dbos-test-wf-topic';
const wf2Topic = 'dbos-test-wf-topic2';
const wfMessage = 'dbos-wf'
let wfCounter = 0;

const patternTopic = new RegExp(/^dbos-test-.*/);
let patternTopicCounter = 0;

const arrayTopics = [wf1Topic, wf2Topic];
let arrayTopicsCounter = 0;

describe("kafka-tests", () => {
let kafkaIsAvailable = true;
let wfKafkaCfg: KafkaProducer | undefined = undefined;
let wf2KafkaCfg: KafkaProducer | undefined = undefined;

beforeAll(async () => {
// Check if Kafka is available, skip the test if it's not
if (process.env['KAFKA_BROKER']) {
kafkaIsAvailable = true;
} else {
kafkaIsAvailable = false;
return;
}

await ensureTopicExists(wf1Topic);
await ensureTopicExists(wf2Topic);

const [cfg, rtCfg] = parseConfigFile({configfile: 'confluentkafka-test-dbos-config.yaml'});
DBOS.setConfig(cfg, rtCfg);

// This would normally be a global or static or something
wfKafkaCfg = DBOS.configureInstance(KafkaProducer, 'wfKafka', kafkaConfig, wf1Topic);
wf2KafkaCfg = DBOS.configureInstance(KafkaProducer, 'wf2Kafka', kafkaConfig, wf2Topic);
await DBOS.launch();
}, 30000);

afterAll(async() => {
await wfKafkaCfg?.disconnect();
await wf2KafkaCfg?.disconnect();
await DBOS.shutdown();
}, 30000);

test("txn-kafka", async () => {
if (!kafkaIsAvailable) {
console.log("Kafka unavailable, skipping Kafka tests")
return
}
else {
console.log("Kafka tests running...")
}
// Create a producer to send a message
await wf2KafkaCfg!.sendMessage({
value: wfMessage,
});
await wfKafkaCfg!.sendMessage({
value: wfMessage,
});
console.log("Messages sent");

// Check that both messages are consumed
console.log("Waiting for regular topic");
await DBOSTestClass.wfPromise;
expect(wfCounter).toBe(1);

console.log("Waiting for pattern topic");
await DBOSTestClass.patternTopicPromise;
expect(patternTopicCounter).toBe(2);

console.log("Waiting for array topic");
await DBOSTestClass.arrayTopicsPromise;
expect(arrayTopicsCounter).toBe(2);

console.log("Done");
}, 30000);
});

@CKafka(kafkaConfig)
class DBOSTestClass {
static wfResolve: () => void;
static wfPromise = new Promise<void>((r) => {
DBOSTestClass.wfResolve = r;
});

static patternTopicResolve: () => void;
static patternTopicPromise = new Promise<void>((r) => {
DBOSTestClass.patternTopicResolve = r;
});

static arrayTopicsResolve: () => void;
static arrayTopicsPromise = new Promise<void>((r) => {
DBOSTestClass.arrayTopicsResolve = r;
});

@CKafkaConsume(wf1Topic)
@DBOS.workflow()
static async testWorkflow(topic: string, _partition: number, message: Message) {
console.log(`got something 1 ${topic} ${message.value?.toString()}`);
if (topic === wf1Topic && message.value?.toString() === wfMessage) {
wfCounter = wfCounter + 1;
DBOSTestClass.wfResolve();
}
else {
console.warn(`Got strange message on wf1Topic: ${JSON.stringify(message)}`);
}
await DBOSTestClass.wfPromise;
}

@DBOS.workflow()
@CKafkaConsume(patternTopic)
static async testConsumeTopicsByPattern(topic: string, _partition: number, message: Message) {
console.log(`got something 2 ${topic}`);
const isWfMessage = topic === wf1Topic && message.value?.toString() === wfMessage;
const isWf2Message = topic === wf2Topic && message.value?.toString() === wfMessage;
if ( isWfMessage || isWf2Message ) {
patternTopicCounter = patternTopicCounter + 1;
if (patternTopicCounter === 2) {
DBOSTestClass.patternTopicResolve();
}
}
await DBOSTestClass.patternTopicPromise;
}

@CKafkaConsume(arrayTopics)
@DBOS.workflow()
static async testConsumeTopicsArray(topic: string, _partition: number, message: Message) {
console.log(`got something 3 ${topic}`);
const isWfMessage = topic === wf1Topic && message.value?.toString() === wfMessage;
const isWf2Message = topic === wf2Topic && message.value?.toString() === wfMessage;
if ( isWfMessage || isWf2Message) {
arrayTopicsCounter = arrayTopicsCounter + 1;
if (arrayTopicsCounter === 2) {
DBOSTestClass.arrayTopicsResolve();
}
}
await DBOSTestClass.arrayTopicsPromise;
}
}
17 changes: 17 additions & 0 deletions packages/dbos-confluent-kafka/confluentkafka-test-dbos-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# To enable auto-completion and validation for this file in VSCode, install the RedHat YAML extension
# https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml

# yaml-language-server: $schema=https://raw.githubusercontent.com/dbos-inc/dbos-transact/main/dbos-config.schema.json

database:
hostname: 'localhost'
port: 5432
username: 'postgres'
password: ${PGPASSWORD}
app_db_name: 'hello'
connectionTimeoutMillis: 3000
app_db_client: 'knex'
migrate:
- npx knex migrate:latest
rollback:
- npx knex migrate:rollback
Loading
Loading