Skip to content

Commit

Permalink
feat(kafka): attempt to create topics on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
awlayton committed Oct 13, 2022
1 parent 62c180e commit d08cec4
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 77 deletions.
1 change: 1 addition & 0 deletions oada/.eslintrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extends:
plugins:
- '@typescript-eslint'
- node
- escompat
- github
- promise
- regexp
Expand Down
60 changes: 10 additions & 50 deletions oada/libs/lib-kafka/src/base.ts → oada/libs/lib-kafka/src/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import { config } from './config.js';
import { once } from 'node:events';
import process from 'node:process';

import type { Consumer, EachMessagePayload, Producer, logLevel } from 'kafkajs';
import type { Consumer, EachMessagePayload, Producer } from 'kafkajs';
import EventEmitter from 'eventemitter3';
import { Kafka } from 'kafkajs';
import debug from 'debug';

import Kafka from './Kafka.js';

// Const info = debug('@oada/lib-kafka:info');
const error = debug('@oada/lib-kafka:error');

Expand Down Expand Up @@ -55,7 +56,7 @@ function die(reason: Error) {
}

export interface ConstructorOptions {
consumeTopic: string | string[];
consumeTopic: string | readonly string[];
// eslint-disable-next-line @typescript-eslint/ban-types
produceTopic?: string | null;
group: string;
Expand Down Expand Up @@ -86,34 +87,14 @@ export interface KafkaBase {
domain?: string;
}

/**
* Make kafkajs logging nicer?
*/
type KafkajsDebug = Record<
keyof Omit<typeof logLevel, 'NOTHING'>,
debug.Debugger
>;
const kafkajsDebugs = new Map<string, KafkajsDebug>();
function getKafkajsDebug(namespace: string): KafkajsDebug {
const d = kafkajsDebugs.get(namespace);
if (d) {
return d;
}

const newDebug = {
ERROR: debug(`kafkajs:${namespace}:error`),
WARN: debug(`kafkajs:${namespace}:warn`),
INFO: debug(`kafkajs:${namespace}:info`),
DEBUG: debug(`kafkajs:${namespace}:debug`),
};
kafkajsDebugs.set(namespace, newDebug);
return newDebug;
function isArray(value: unknown): value is unknown[] | readonly unknown[] {
return Array.isArray(value);
}

export class Base extends EventEmitter {
protected static done = Symbol('kafka-base-done');

readonly consumeTopic;
readonly consumeTopics;
readonly produceTopic;
readonly group;
readonly #kafka: Kafka;
Expand All @@ -130,29 +111,11 @@ export class Base extends EventEmitter {
}: ConstructorOptions) {
super();

this.consumeTopic = consumeTopic;
this.consumeTopics = isArray(consumeTopic) ? consumeTopic : [consumeTopic];
this.produceTopic = produceTopic;
this.group = group;

this.#kafka = new Kafka({
/**
* Make kafkajs logging nicer?
*/
logCreator() {
return ({ namespace, label, log }) => {
const l = label as keyof KafkajsDebug;
// eslint-disable-next-line security/detect-object-injection
const logger = getKafkajsDebug(namespace)[l];
if (log instanceof Error) {
logger({ err: log }, log.message);
} else {
const { message, ...extra } = log;
logger(extra, message);
}
};
},
brokers: config.get('kafka.broker'),
});
this.#kafka = new Kafka();

this.consumer =
consumer ??
Expand Down Expand Up @@ -245,10 +208,7 @@ export class Base extends EventEmitter {
await this.consumer.connect();
await this.producer.connect();

for (const topic of Array.isArray(this.consumeTopic)
? this.consumeTopic
: [this.consumeTopic]) {
// eslint-disable-next-line no-await-in-loop
for await (const topic of this.consumeTopics) {
await this.consumer.subscribe({ topic });
}

Expand Down
77 changes: 77 additions & 0 deletions oada/libs/lib-kafka/src/Kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* @license
* Copyright 2022 Open Ag Data Alliance
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { config } from './config.js';

import { Kafka, type KafkaConfig, type logLevel } from 'kafkajs';
import debug from 'debug';

/**
* Make kafkajs logging nicer?
*/
type KafkajsDebug = Record<
keyof Omit<typeof logLevel, 'NOTHING'>,
debug.Debugger
>;
const kafkajsDebugs = new Map<string, KafkajsDebug>();
function getKafkajsDebug(namespace: string): KafkajsDebug {
const d = kafkajsDebugs.get(namespace);
if (d) {
return d;
}

const newDebug = {
ERROR: debug(`kafkajs:${namespace}:error`),
WARN: debug(`kafkajs:${namespace}:warn`),
INFO: debug(`kafkajs:${namespace}:info`),
DEBUG: debug(`kafkajs:${namespace}:debug`),
};
kafkajsDebugs.set(namespace, newDebug);
return newDebug;
}

/**
* Wraps the `Kafka` client class to add our own defaults etc.
* @see {@link Kafka}
*/
export default class IKafka extends Kafka {
constructor({
brokers = config.get('kafka.broker'),
...rest
}: Partial<KafkaConfig> = {}) {
super({
...rest,
/**
* Make kafkajs logging nicer?
*/
logCreator() {
return ({ namespace, label, log }) => {
const l = label as keyof KafkajsDebug;
// eslint-disable-next-line security/detect-object-injection
const logger = getKafkajsDebug(namespace)[l];
if (log instanceof Error) {
logger({ err: log }, log.message);
} else {
const { message, ...extra } = log;
logger(extra, message);
}
};
},
brokers,
});
}
}
2 changes: 1 addition & 1 deletion oada/libs/lib-kafka/src/ReResponder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

import { DATA, REQ_ID_KEY } from './base.js';
import { DATA, REQ_ID_KEY } from './Base.js';
import { Responder } from './Responder.js';

import ksuid from 'ksuid';
Expand Down
13 changes: 10 additions & 3 deletions oada/libs/lib-kafka/src/Requester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ import { setTimeout } from 'node:timers/promises';
import EventEmitter from 'eventemitter3';
import ksuid from 'ksuid';

import { Base, CANCEL_KEY, DATA, REQ_ID_KEY, topicTimeout } from './base.js';
import type { ConstructorOptions, KafkaBase } from './base.js';
import {
Base,
CANCEL_KEY,
type ConstructorOptions,
DATA,
type KafkaBase,
REQ_ID_KEY,
topicTimeout,
} from './Base.js';

export class KafkaRequestTimeoutError extends Error {}

Expand Down Expand Up @@ -132,4 +139,4 @@ export class Requester extends Base {
}
}

export type { ConstructorOptions } from './base.js';
export type { ConstructorOptions } from './Base.js';
14 changes: 9 additions & 5 deletions oada/libs/lib-kafka/src/Responder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

import util from 'node:util';

import { Base, CANCEL_KEY, DATA, REQ_ID_KEY, topicTimeout } from './base.js';
import type {
ConstructorOptions as BaseConstructorOptions,
KafkaBase,
} from './base.js';
import {
Base,
type ConstructorOptions as BaseConstructorOptions,
CANCEL_KEY,
DATA,
type KafkaBase,
REQ_ID_KEY,
topicTimeout,
} from './Base.js';

import type { EachMessagePayload } from 'kafkajs';
import debug from 'debug';
Expand Down
21 changes: 11 additions & 10 deletions oada/libs/lib-kafka/src/ResponderRequester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
import type { EachMessagePayload } from 'kafkajs';
import type EventEmitter from 'eventemitter3';

import { Base, DATA } from './base.js';
import type {
ConstructorOptions as ResponderOptions,
Response,
import { Base, DATA, type KafkaBase } from './Base.js';
import {
Requester,
type ConstructorOptions as RequesterOptions,
} from './Requester.js';
import {
Responder,
type ConstructorOptions as ResponderOptions,
type Response,
} from './Responder.js';
import type { KafkaBase } from './base.js';
import { Requester } from './Requester.js';
import type { ConstructorOptions as RequesterOptions } from './Requester.js';
import { Responder } from './Responder.js';

import debug from 'debug';

Expand Down Expand Up @@ -109,12 +110,12 @@ export class ResponderRequester extends Base {
// Mux the consumer between requester and responder
this.on(DATA, (value: KafkaBase, data, ...rest) => {
trace(data, 'Received data: %o', value);
if (data.topic === this.#requester.consumeTopic) {
if (this.#requester.consumeTopics.includes(data.topic)) {
trace('Muxing data to requester');
this.#requester.emit(DATA, value, data, ...rest);
}

if (data.topic === this.#responder.consumeTopic) {
if (this.#responder.consumeTopics.includes(data.topic)) {
if (!this.#respondOwn && value.group === this.group) {
// Don't respond to own requests
return;
Expand Down
3 changes: 2 additions & 1 deletion oada/libs/lib-kafka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// @ts-expect-error the types are not correct
export { KafkaJSError as KafkaError } from 'kafkajs/src/errors.js';

export type { KafkaBase } from './base.js';
export * as init from './init.js';
export type { KafkaBase } from './Base.js';
export { Responder } from './Responder.js';
export { ReResponder } from './ReResponder.js';
export { Requester } from './Requester.js';
Expand Down
39 changes: 39 additions & 0 deletions oada/libs/lib-kafka/src/init.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* @license
* Copyright 2022 Open Ag Data Alliance
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { config } from './config.js';

import Kafka from './Kafka.js';

/**
* Ensure our Kafka topics exist
*/
export async function run(): Promise<void> {
const kafka = new Kafka();
const topics = config.get('kafka.topics');

const admin = kafka.admin();
await admin.connect();
try {
await admin.createTopics({
waitForLeaders: false,
topics: Object.values(topics).map((topic) => ({ topic })),
});
} finally {
await admin.disconnect();
}
}
2 changes: 1 addition & 1 deletion oada/libs/lib-kafka/test/Requester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { Kafka } from 'kafkajs';
import { v4 as uuid } from 'uuid';

import { KafkaRequestTimeoutError, Requester } from '../dist/Requester.js';
import type { KafkaBase } from '../src/base.js';
import type { KafkaBase } from '../src/Base.js';

const REQ_TOPIC = 'test_requests';
const RES_TOPIC = 'test_responses';
Expand Down
2 changes: 1 addition & 1 deletion oada/libs/lib-kafka/test/Responder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { Consumer, Producer } from 'kafkajs';
import { Kafka } from 'kafkajs';
import { v4 as uuid } from 'uuid';

import type { KafkaBase } from '../src/base.js';
import type { KafkaBase } from '../src/Base.js';
import { Responder } from '../dist/Responder.js';

const REQ_TOPIC = 'test_requests';
Expand Down
1 change: 1 addition & 0 deletions oada/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"eslint-import-resolver-node": "^0.3.6",
"eslint-plugin-array-func": "^3.1.7",
"eslint-plugin-ava": "^13.2.0",
"eslint-plugin-escompat": "^3.3.3",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-filenames": "^1.3.2",
"eslint-plugin-github": "^4.4.0",
Expand Down
1 change: 1 addition & 0 deletions oada/services/startup/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"homepage": "https://github.com/oada/oada-srvc-startup#readme",
"dependencies": {
"@oada/lib-arangodb": "^3.5.1",
"@oada/lib-kafka": "^3.5.1",
"@oada/pino-debug": "^3.5.1",
"debug": "^4.3.4",
"tslib": "^2.4.0"
Expand Down
9 changes: 5 additions & 4 deletions oada/services/startup/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import http from 'node:http';

import { init } from '@oada/lib-arangodb';
import { init as initArangoDB } from '@oada/lib-arangodb';
import { init as initKafka } from '@oada/lib-kafka';

import debug from 'debug';

Expand All @@ -27,9 +28,9 @@ const info = debug('startup:info');
const port = process.env.PORT ?? 8080;
const exit = process.env.EXIT ?? false;

info('Startup is creating the database');
await init.run();
info('Database created/ensured.');
info('Startup is initializing ArangoDB and Kafka');
await Promise.all([initArangoDB.run(), initKafka.run()]);
info('Initialization complete');

if (exit) {
// eslint-disable-next-line no-process-exit, unicorn/no-process-exit
Expand Down
Loading

0 comments on commit d08cec4

Please sign in to comment.