From d08cec47f2b3de7c93a5980bab4068ca68f15b00 Mon Sep 17 00:00:00 2001 From: Alex Layton Date: Thu, 13 Oct 2022 17:42:01 -0400 Subject: [PATCH] feat(kafka): attempt to create topics on startup --- oada/.eslintrc.yaml | 1 + oada/libs/lib-kafka/src/{base.ts => Base.ts} | 60 +++------------ oada/libs/lib-kafka/src/Kafka.ts | 77 +++++++++++++++++++ oada/libs/lib-kafka/src/ReResponder.ts | 2 +- oada/libs/lib-kafka/src/Requester.ts | 13 +++- oada/libs/lib-kafka/src/Responder.ts | 14 ++-- oada/libs/lib-kafka/src/ResponderRequester.ts | 21 ++--- oada/libs/lib-kafka/src/index.ts | 3 +- oada/libs/lib-kafka/src/init.ts | 39 ++++++++++ oada/libs/lib-kafka/test/Requester.test.ts | 2 +- oada/libs/lib-kafka/test/Responder.test.ts | 2 +- oada/package.json | 1 + oada/services/startup/package.json | 1 + oada/services/startup/src/index.ts | 9 ++- oada/services/startup/tsconfig.json | 3 +- oada/yarn.lock | 2 + 16 files changed, 173 insertions(+), 77 deletions(-) rename oada/libs/lib-kafka/src/{base.ts => Base.ts} (78%) create mode 100644 oada/libs/lib-kafka/src/Kafka.ts create mode 100644 oada/libs/lib-kafka/src/init.ts diff --git a/oada/.eslintrc.yaml b/oada/.eslintrc.yaml index 97a91040..55b8b231 100644 --- a/oada/.eslintrc.yaml +++ b/oada/.eslintrc.yaml @@ -19,6 +19,7 @@ extends: plugins: - '@typescript-eslint' - node + - escompat - github - promise - regexp diff --git a/oada/libs/lib-kafka/src/base.ts b/oada/libs/lib-kafka/src/Base.ts similarity index 78% rename from oada/libs/lib-kafka/src/base.ts rename to oada/libs/lib-kafka/src/Base.ts index 5e58b5cd..96884bda 100644 --- a/oada/libs/lib-kafka/src/base.ts +++ b/oada/libs/lib-kafka/src/Base.ts @@ -20,11 +20,12 @@ import { config } from './config.js'; import { once } from 'node:events'; import process from 'node:process'; -import type { Consumer, EachMessagePayload, Producer, logLevel } from 'kafkajs'; +import type { Consumer, EachMessagePayload, Producer } from 'kafkajs'; import EventEmitter from 'eventemitter3'; -import { Kafka } from 'kafkajs'; import debug from 'debug'; +import Kafka from './Kafka.js'; + // Const info = debug('@oada/lib-kafka:info'); const error = debug('@oada/lib-kafka:error'); @@ -55,7 +56,7 @@ function die(reason: Error) { } export interface ConstructorOptions { - consumeTopic: string | string[]; + consumeTopic: string | readonly string[]; // eslint-disable-next-line @typescript-eslint/ban-types produceTopic?: string | null; group: string; @@ -86,34 +87,14 @@ export interface KafkaBase { domain?: string; } -/** - * Make kafkajs logging nicer? - */ -type KafkajsDebug = Record< - keyof Omit, - debug.Debugger ->; -const kafkajsDebugs = new Map(); -function getKafkajsDebug(namespace: string): KafkajsDebug { - const d = kafkajsDebugs.get(namespace); - if (d) { - return d; - } - - const newDebug = { - ERROR: debug(`kafkajs:${namespace}:error`), - WARN: debug(`kafkajs:${namespace}:warn`), - INFO: debug(`kafkajs:${namespace}:info`), - DEBUG: debug(`kafkajs:${namespace}:debug`), - }; - kafkajsDebugs.set(namespace, newDebug); - return newDebug; +function isArray(value: unknown): value is unknown[] | readonly unknown[] { + return Array.isArray(value); } export class Base extends EventEmitter { protected static done = Symbol('kafka-base-done'); - readonly consumeTopic; + readonly consumeTopics; readonly produceTopic; readonly group; readonly #kafka: Kafka; @@ -130,29 +111,11 @@ export class Base extends EventEmitter { }: ConstructorOptions) { super(); - this.consumeTopic = consumeTopic; + this.consumeTopics = isArray(consumeTopic) ? consumeTopic : [consumeTopic]; this.produceTopic = produceTopic; this.group = group; - this.#kafka = new Kafka({ - /** - * Make kafkajs logging nicer? - */ - logCreator() { - return ({ namespace, label, log }) => { - const l = label as keyof KafkajsDebug; - // eslint-disable-next-line security/detect-object-injection - const logger = getKafkajsDebug(namespace)[l]; - if (log instanceof Error) { - logger({ err: log }, log.message); - } else { - const { message, ...extra } = log; - logger(extra, message); - } - }; - }, - brokers: config.get('kafka.broker'), - }); + this.#kafka = new Kafka(); this.consumer = consumer ?? @@ -245,10 +208,7 @@ export class Base extends EventEmitter { await this.consumer.connect(); await this.producer.connect(); - for (const topic of Array.isArray(this.consumeTopic) - ? this.consumeTopic - : [this.consumeTopic]) { - // eslint-disable-next-line no-await-in-loop + for await (const topic of this.consumeTopics) { await this.consumer.subscribe({ topic }); } diff --git a/oada/libs/lib-kafka/src/Kafka.ts b/oada/libs/lib-kafka/src/Kafka.ts new file mode 100644 index 00000000..1b2def20 --- /dev/null +++ b/oada/libs/lib-kafka/src/Kafka.ts @@ -0,0 +1,77 @@ +/** + * @license + * Copyright 2022 Open Ag Data Alliance + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { config } from './config.js'; + +import { Kafka, type KafkaConfig, type logLevel } from 'kafkajs'; +import debug from 'debug'; + +/** + * Make kafkajs logging nicer? + */ +type KafkajsDebug = Record< + keyof Omit, + debug.Debugger +>; +const kafkajsDebugs = new Map(); +function getKafkajsDebug(namespace: string): KafkajsDebug { + const d = kafkajsDebugs.get(namespace); + if (d) { + return d; + } + + const newDebug = { + ERROR: debug(`kafkajs:${namespace}:error`), + WARN: debug(`kafkajs:${namespace}:warn`), + INFO: debug(`kafkajs:${namespace}:info`), + DEBUG: debug(`kafkajs:${namespace}:debug`), + }; + kafkajsDebugs.set(namespace, newDebug); + return newDebug; +} + +/** + * Wraps the `Kafka` client class to add our own defaults etc. + * @see {@link Kafka} + */ +export default class IKafka extends Kafka { + constructor({ + brokers = config.get('kafka.broker'), + ...rest + }: Partial = {}) { + super({ + ...rest, + /** + * Make kafkajs logging nicer? + */ + logCreator() { + return ({ namespace, label, log }) => { + const l = label as keyof KafkajsDebug; + // eslint-disable-next-line security/detect-object-injection + const logger = getKafkajsDebug(namespace)[l]; + if (log instanceof Error) { + logger({ err: log }, log.message); + } else { + const { message, ...extra } = log; + logger(extra, message); + } + }; + }, + brokers, + }); + } +} diff --git a/oada/libs/lib-kafka/src/ReResponder.ts b/oada/libs/lib-kafka/src/ReResponder.ts index 0937b9c8..bcc8f69f 100644 --- a/oada/libs/lib-kafka/src/ReResponder.ts +++ b/oada/libs/lib-kafka/src/ReResponder.ts @@ -15,7 +15,7 @@ * limitations under the License. */ -import { DATA, REQ_ID_KEY } from './base.js'; +import { DATA, REQ_ID_KEY } from './Base.js'; import { Responder } from './Responder.js'; import ksuid from 'ksuid'; diff --git a/oada/libs/lib-kafka/src/Requester.ts b/oada/libs/lib-kafka/src/Requester.ts index dde2fcee..df1af15a 100644 --- a/oada/libs/lib-kafka/src/Requester.ts +++ b/oada/libs/lib-kafka/src/Requester.ts @@ -21,8 +21,15 @@ import { setTimeout } from 'node:timers/promises'; import EventEmitter from 'eventemitter3'; import ksuid from 'ksuid'; -import { Base, CANCEL_KEY, DATA, REQ_ID_KEY, topicTimeout } from './base.js'; -import type { ConstructorOptions, KafkaBase } from './base.js'; +import { + Base, + CANCEL_KEY, + type ConstructorOptions, + DATA, + type KafkaBase, + REQ_ID_KEY, + topicTimeout, +} from './Base.js'; export class KafkaRequestTimeoutError extends Error {} @@ -132,4 +139,4 @@ export class Requester extends Base { } } -export type { ConstructorOptions } from './base.js'; +export type { ConstructorOptions } from './Base.js'; diff --git a/oada/libs/lib-kafka/src/Responder.ts b/oada/libs/lib-kafka/src/Responder.ts index 29820ef4..6dc46dc5 100644 --- a/oada/libs/lib-kafka/src/Responder.ts +++ b/oada/libs/lib-kafka/src/Responder.ts @@ -17,11 +17,15 @@ import util from 'node:util'; -import { Base, CANCEL_KEY, DATA, REQ_ID_KEY, topicTimeout } from './base.js'; -import type { - ConstructorOptions as BaseConstructorOptions, - KafkaBase, -} from './base.js'; +import { + Base, + type ConstructorOptions as BaseConstructorOptions, + CANCEL_KEY, + DATA, + type KafkaBase, + REQ_ID_KEY, + topicTimeout, +} from './Base.js'; import type { EachMessagePayload } from 'kafkajs'; import debug from 'debug'; diff --git a/oada/libs/lib-kafka/src/ResponderRequester.ts b/oada/libs/lib-kafka/src/ResponderRequester.ts index ae2ec714..4de35cf8 100644 --- a/oada/libs/lib-kafka/src/ResponderRequester.ts +++ b/oada/libs/lib-kafka/src/ResponderRequester.ts @@ -18,15 +18,16 @@ import type { EachMessagePayload } from 'kafkajs'; import type EventEmitter from 'eventemitter3'; -import { Base, DATA } from './base.js'; -import type { - ConstructorOptions as ResponderOptions, - Response, +import { Base, DATA, type KafkaBase } from './Base.js'; +import { + Requester, + type ConstructorOptions as RequesterOptions, +} from './Requester.js'; +import { + Responder, + type ConstructorOptions as ResponderOptions, + type Response, } from './Responder.js'; -import type { KafkaBase } from './base.js'; -import { Requester } from './Requester.js'; -import type { ConstructorOptions as RequesterOptions } from './Requester.js'; -import { Responder } from './Responder.js'; import debug from 'debug'; @@ -109,12 +110,12 @@ export class ResponderRequester extends Base { // Mux the consumer between requester and responder this.on(DATA, (value: KafkaBase, data, ...rest) => { trace(data, 'Received data: %o', value); - if (data.topic === this.#requester.consumeTopic) { + if (this.#requester.consumeTopics.includes(data.topic)) { trace('Muxing data to requester'); this.#requester.emit(DATA, value, data, ...rest); } - if (data.topic === this.#responder.consumeTopic) { + if (this.#responder.consumeTopics.includes(data.topic)) { if (!this.#respondOwn && value.group === this.group) { // Don't respond to own requests return; diff --git a/oada/libs/lib-kafka/src/index.ts b/oada/libs/lib-kafka/src/index.ts index f7b99669..f92cfdf0 100644 --- a/oada/libs/lib-kafka/src/index.ts +++ b/oada/libs/lib-kafka/src/index.ts @@ -18,7 +18,8 @@ // @ts-expect-error the types are not correct export { KafkaJSError as KafkaError } from 'kafkajs/src/errors.js'; -export type { KafkaBase } from './base.js'; +export * as init from './init.js'; +export type { KafkaBase } from './Base.js'; export { Responder } from './Responder.js'; export { ReResponder } from './ReResponder.js'; export { Requester } from './Requester.js'; diff --git a/oada/libs/lib-kafka/src/init.ts b/oada/libs/lib-kafka/src/init.ts new file mode 100644 index 00000000..2e6a7af2 --- /dev/null +++ b/oada/libs/lib-kafka/src/init.ts @@ -0,0 +1,39 @@ +/** + * @license + * Copyright 2022 Open Ag Data Alliance + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { config } from './config.js'; + +import Kafka from './Kafka.js'; + +/** + * Ensure our Kafka topics exist + */ +export async function run(): Promise { + const kafka = new Kafka(); + const topics = config.get('kafka.topics'); + + const admin = kafka.admin(); + await admin.connect(); + try { + await admin.createTopics({ + waitForLeaders: false, + topics: Object.values(topics).map((topic) => ({ topic })), + }); + } finally { + await admin.disconnect(); + } +} diff --git a/oada/libs/lib-kafka/test/Requester.test.ts b/oada/libs/lib-kafka/test/Requester.test.ts index 576d874a..c305459a 100644 --- a/oada/libs/lib-kafka/test/Requester.test.ts +++ b/oada/libs/lib-kafka/test/Requester.test.ts @@ -26,7 +26,7 @@ import { Kafka } from 'kafkajs'; import { v4 as uuid } from 'uuid'; import { KafkaRequestTimeoutError, Requester } from '../dist/Requester.js'; -import type { KafkaBase } from '../src/base.js'; +import type { KafkaBase } from '../src/Base.js'; const REQ_TOPIC = 'test_requests'; const RES_TOPIC = 'test_responses'; diff --git a/oada/libs/lib-kafka/test/Responder.test.ts b/oada/libs/lib-kafka/test/Responder.test.ts index a9024304..be13bc21 100644 --- a/oada/libs/lib-kafka/test/Responder.test.ts +++ b/oada/libs/lib-kafka/test/Responder.test.ts @@ -25,7 +25,7 @@ import type { Consumer, Producer } from 'kafkajs'; import { Kafka } from 'kafkajs'; import { v4 as uuid } from 'uuid'; -import type { KafkaBase } from '../src/base.js'; +import type { KafkaBase } from '../src/Base.js'; import { Responder } from '../dist/Responder.js'; const REQ_TOPIC = 'test_requests'; diff --git a/oada/package.json b/oada/package.json index 817c1b91..809f8260 100644 --- a/oada/package.json +++ b/oada/package.json @@ -30,6 +30,7 @@ "eslint-import-resolver-node": "^0.3.6", "eslint-plugin-array-func": "^3.1.7", "eslint-plugin-ava": "^13.2.0", + "eslint-plugin-escompat": "^3.3.3", "eslint-plugin-eslint-comments": "^3.2.0", "eslint-plugin-filenames": "^1.3.2", "eslint-plugin-github": "^4.4.0", diff --git a/oada/services/startup/package.json b/oada/services/startup/package.json index a92d3757..a0248c61 100644 --- a/oada/services/startup/package.json +++ b/oada/services/startup/package.json @@ -30,6 +30,7 @@ "homepage": "https://github.com/oada/oada-srvc-startup#readme", "dependencies": { "@oada/lib-arangodb": "^3.5.1", + "@oada/lib-kafka": "^3.5.1", "@oada/pino-debug": "^3.5.1", "debug": "^4.3.4", "tslib": "^2.4.0" diff --git a/oada/services/startup/src/index.ts b/oada/services/startup/src/index.ts index 9cf47520..5be2da90 100644 --- a/oada/services/startup/src/index.ts +++ b/oada/services/startup/src/index.ts @@ -17,7 +17,8 @@ import http from 'node:http'; -import { init } from '@oada/lib-arangodb'; +import { init as initArangoDB } from '@oada/lib-arangodb'; +import { init as initKafka } from '@oada/lib-kafka'; import debug from 'debug'; @@ -27,9 +28,9 @@ const info = debug('startup:info'); const port = process.env.PORT ?? 8080; const exit = process.env.EXIT ?? false; -info('Startup is creating the database'); -await init.run(); -info('Database created/ensured.'); +info('Startup is initializing ArangoDB and Kafka'); +await Promise.all([initArangoDB.run(), initKafka.run()]); +info('Initialization complete'); if (exit) { // eslint-disable-next-line no-process-exit, unicorn/no-process-exit diff --git a/oada/services/startup/tsconfig.json b/oada/services/startup/tsconfig.json index b0f6fd6e..bf16c0fa 100644 --- a/oada/services/startup/tsconfig.json +++ b/oada/services/startup/tsconfig.json @@ -7,6 +7,7 @@ "include": ["src"], "references": [ { "path": "../../libs/pino-debug" }, - { "path": "../../libs/lib-arangodb" } + { "path": "../../libs/lib-arangodb" }, + { "path": "../../libs/lib-kafka" } ] } diff --git a/oada/yarn.lock b/oada/yarn.lock index 40c71ad2..f87e897e 100644 --- a/oada/yarn.lock +++ b/oada/yarn.lock @@ -966,6 +966,7 @@ __metadata: resolution: "@oada/startup@workspace:services/startup" dependencies: "@oada/lib-arangodb": ^3.5.1 + "@oada/lib-kafka": ^3.5.1 "@oada/pino-debug": ^3.5.1 "@types/debug": ^4.1.7 "@types/node": ^18.8.5 @@ -7300,6 +7301,7 @@ __metadata: eslint-import-resolver-node: ^0.3.6 eslint-plugin-array-func: ^3.1.7 eslint-plugin-ava: ^13.2.0 + eslint-plugin-escompat: ^3.3.3 eslint-plugin-eslint-comments: ^3.2.0 eslint-plugin-filenames: ^1.3.2 eslint-plugin-github: ^4.4.0