Skip to content
dbos-kafkajs / 1.29.2-preview

dbos-kafkajs 1.29.2-preview

Install from the command line:
Learn more about npm packages
$ npm install @dbos-inc/dbos-kafkajs@1.29.2-preview
Install via package.json:
"@dbos-inc/dbos-kafkajs": "1.29.2-preview"

About this version

DBOS Kafka Library (KafkaJS Version)

Publish/subscribe message queues are a common building block for distributed systems. Message queues allow processing to occur at a different place or time, perhaps in multiple client programming environments. Due to its performance, flexibility, and simple, scalable design, Kafka is a popular choice for publish/subscribe.

This package includes a DBOS step for sending Kafka messages, as well as an event receiver for exactly-once processing of incoming messages (even using standard queues).

This package is based on KafkaJS. We are working on other client libraries for Kafka, please reach out to us if you are interested in a different client library.

Configuring a DBOS Application with Kafka

Ensure that the DBOS SQS package is installed into the application:

npm install --save @dbos-inc/dbos-kafkajs

Sending Messages

Imports

First, ensure that the package classes are imported:

import {
  KafkaConfig,
  logLevel,
  KafkaProduceStep,
  Partitioners,
} from "@dbos-inc/dbos-kafkajs";

Selecting A Configuration

KafkaProduceStep is a configured class. This means that the configuration (or config file key name) must be provided when a class instance is created, for example:

const kafkaConfig: KafkaConfig = {
  clientId: 'dbos-kafka-test',
  brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
  requestTimeout: 100, // FOR TESTING
  retry: { // FOR TESTING
    retries: 5
  },
  logLevel: logLevel.NOTHING, // FOR TESTING
}

kafkaCfg = configureInstance(KafkaProduceStep, 'defKafka', kafkaConfig, defTopic, {
    createPartitioner: Partitioners.DefaultPartitioner
});

Sending

Within a DBOS Transact Workflow, invoke the KafkaProduceStep function from the workflow context:

const sendRes = await wfCtx.invoke(kafkaCfg).sendMessage({value: ourMessage});

Receiving Messages

A tutorial for receiving and processing Kafka messages can be found here. This library provides an alternate implementation of the Kafka consumer that can be updated independently of the DBOS Transact core packages.

Simple Testing

The kafkajs.test.ts file included in the source repository demonstrates sending and processing Kafka messages. Before running, set the following environment variables:

  • KAFKA_BROKER: Broker URL

Next Steps

Details


Assets

  • dbos-kafkajs-1.29.2-preview.tgz

Download activity

  • Total downloads 0
  • Last 30 days 0
  • Last week 0
  • Today 0