From 35c6b0e50be35626305417f01f17e0cf96544c9c Mon Sep 17 00:00:00 2001 From: "geka.evk" Date: Thu, 25 Apr 2024 13:12:20 +0100 Subject: [PATCH 1/3] fix(mojaloop/#3067): removed disconnected producers from listOfProducers map --- .ncurc.js | 3 ++- audit-ci.jsonc | 3 ++- package-lock.json | 15 +++++++++++---- package.json | 3 ++- src/util/producer.js | 12 ++++++++++-- test/unit/util/producer.test.js | 19 +++++++++++++++++-- 6 files changed, 44 insertions(+), 11 deletions(-) diff --git a/.ncurc.js b/.ncurc.js index 7b003f1..c765348 100644 --- a/.ncurc.js +++ b/.ncurc.js @@ -1,6 +1,7 @@ module.exports = { reject: [ // TODO: Upgrading tape to v5+ causes tests to fail due to assert.end() being called multiple times. Will need to address this! Perhaps even move to Jest? - "tape" + "tape", + "node-rdkafka" // updating to the next major v3.0.0 should be done in a separate task ] } diff --git a/audit-ci.jsonc b/audit-ci.jsonc index 722b751..9870756 100644 --- a/audit-ci.jsonc +++ b/audit-ci.jsonc @@ -8,6 +8,7 @@ "GHSA-8cf7-32gw-wr33", "GHSA-hjrf-2m68-5959", "GHSA-qwph-4952-7xr6", - "GHSA-w5p7-h5w8-2hfq" + "GHSA-w5p7-h5w8-2hfq", + "GHSA-f5x3-32g6-xq36" // https://github.com/advisories/GHSA-f5x3-32g6-xq36 ] } diff --git a/package-lock.json b/package-lock.json index 7135151..9f2644d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ }, "devDependencies": { "audit-ci": "^6.6.1", - "npm-check-updates": "16.14.17", + "npm-check-updates": "16.14.18", "nyc": "15.1.0", "pre-commit": "1.2.2", "replace": "^1.2.2", @@ -1307,6 +1307,12 @@ "integrity": "sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw==", "dev": true }, + "node_modules/@types/semver-utils": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@types/semver-utils/-/semver-utils-1.1.3.tgz", + "integrity": "sha512-T+YwkslhsM+CeuhYUxyAjWm7mJ5am/K10UX40RuA6k6Lc7eGtq8iY2xOzy7Vq0GOqhl/xZl5l2FwURZMTPTUww==", + "dev": true + }, "node_modules/abbrev": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz", @@ -7293,11 +7299,12 @@ } }, "node_modules/npm-check-updates": { - "version": "16.14.17", - "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-16.14.17.tgz", - "integrity": "sha512-ElnDdXKe60f8S6RhzFeaGuH2TFJmt2cU2HjLdowldabdm27nWFCxV2ebeP3xGbQkzp2+RPDQNdW9HqU1lcY8ag==", + "version": "16.14.18", + "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-16.14.18.tgz", + "integrity": "sha512-9iaRe9ohx9ykdbLjPRIYcq1A0RkrPYUx9HmQK1JIXhfxtJCNE/+497H9Z4PGH6GWRALbz5KF+1iZoySK2uSEpQ==", "dev": true, "dependencies": { + "@types/semver-utils": "^1.1.1", "chalk": "^5.3.0", "cli-table3": "^0.6.3", "commander": "^10.0.1", diff --git a/package.json b/package.json index aad4984..62dfe63 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "pre-commit": [ "lint", "dep:check", + "audit:check", "test" ], "scripts": { @@ -56,7 +57,7 @@ }, "devDependencies": { "audit-ci": "^6.6.1", - "npm-check-updates": "16.14.17", + "npm-check-updates": "16.14.18", "nyc": "15.1.0", "pre-commit": "1.2.2", "replace": "^1.2.2", diff --git a/src/util/producer.js b/src/util/producer.js index 305474b..baf0143 100644 --- a/src/util/producer.js +++ b/src/util/producer.js @@ -108,6 +108,11 @@ const connectAll = async (configs) => { } } +const disconnectAndRemoveProducer = async (topicName) => { + await getProducer(topicName).disconnect() + delete listOfProducers[topicName] +} + /** * @function Disconnect * @@ -120,7 +125,8 @@ const connectAll = async (configs) => { const disconnect = async (topicName = null) => { if (topicName && typeof topicName === 'string') { try { - await getProducer(topicName).disconnect() + // await getProducer(topicName).disconnect() + await disconnectAndRemoveProducer(topicName) } catch (err) { Logger.isErrorEnabled && Logger.error(err) throw ErrorHandler.Factory.reformatFSPIOPError(err) @@ -131,7 +137,8 @@ const disconnect = async (topicName = null) => { let tpName for (tpName in listOfProducers) { try { - await getProducer(tpName).disconnect() + // await getProducer(tpName).disconnect() + await disconnectAndRemoveProducer(tpName) } catch (e) { isError = true errorTopicList.push({ topic: tpName, error: e.toString() }) @@ -160,6 +167,7 @@ const getProducer = (topicName) => { return listOfProducers[topicName] } else { throw ErrorHandler.Factory.createInternalServerFSPIOPError(`No producer found for topic ${topicName}`) + // clarify, why we throw an error here and not just return null? } } diff --git a/test/unit/util/producer.test.js b/test/unit/util/producer.test.js index ce19e1c..f485dda 100644 --- a/test/unit/util/producer.test.js +++ b/test/unit/util/producer.test.js @@ -94,6 +94,15 @@ const topicConf = { opaqueKey: 0 } +const getProducerWithoutThrowError = (topicName) => { + try { + return Producer.getProducer(topicName) + } catch (err) { + Logger.warn(`getProducer error: ${err?.message}`) + return null + } +} + Test('Producer', producerTest => { let sandbox const config = {} @@ -205,6 +214,7 @@ Test('Producer', producerTest => { sandbox.restore() t.end() }) + disconnectTest.test('disconnect from kafka', async test => { await Producer.produceMessage({}, { topicName: 'test' }, {}) test.ok(Producer.disconnect('test')) @@ -217,9 +227,11 @@ Test('Producer', producerTest => { test.ok(await Producer.produceMessage({}, { topicName }, {})) await Producer.disconnect(topicName) test.pass('Disconnect specific topic successfully') + const producer = getProducerWithoutThrowError(topicName) + test.equal(producer, null, 'No disconnected producer') test.end() } catch (e) { - test.fail('Error thrown') + test.fail(`Error thrown: ${e.message}`) test.end() } }) @@ -233,9 +245,11 @@ Test('Producer', producerTest => { test.ok(await Producer.produceMessage({}, { topicName }, {})) await Producer.disconnect() test.pass('Disconnected all topics successfully') + const producer = getProducerWithoutThrowError(topicName) + test.equal(producer, null, 'No disconnected producer') test.end() } catch (e) { - test.fail('Error thrown') + test.fail(`Error thrown: ${e.message}`) test.end() } }) @@ -530,5 +544,6 @@ Test('Producer', producerTest => { connectAllTest.end() }) + producerTest.end() }) From f49fc23f6f80398acb43996d3ff8e9b3bcf64adb Mon Sep 17 00:00:00 2001 From: "geka.evk" Date: Thu, 25 Apr 2024 13:15:49 +0100 Subject: [PATCH 2/3] fix(mojaloop/#3067): removed disconnected producers from listOfProducers map --- src/util/producer.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/util/producer.js b/src/util/producer.js index baf0143..a5c468b 100644 --- a/src/util/producer.js +++ b/src/util/producer.js @@ -125,7 +125,6 @@ const disconnectAndRemoveProducer = async (topicName) => { const disconnect = async (topicName = null) => { if (topicName && typeof topicName === 'string') { try { - // await getProducer(topicName).disconnect() await disconnectAndRemoveProducer(topicName) } catch (err) { Logger.isErrorEnabled && Logger.error(err) @@ -137,7 +136,6 @@ const disconnect = async (topicName = null) => { let tpName for (tpName in listOfProducers) { try { - // await getProducer(tpName).disconnect() await disconnectAndRemoveProducer(tpName) } catch (e) { isError = true From 28b437dba59ec2218a76b32eeb32976707ab92e9 Mon Sep 17 00:00:00 2001 From: "geka.evk" Date: Thu, 25 Apr 2024 13:31:02 +0100 Subject: [PATCH 3/3] fix(mojaloop/#3067): removed disconnected producers from listOfProducers map --- test/unit/util/producer.test.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/unit/util/producer.test.js b/test/unit/util/producer.test.js index f485dda..c29bb04 100644 --- a/test/unit/util/producer.test.js +++ b/test/unit/util/producer.test.js @@ -229,6 +229,9 @@ Test('Producer', producerTest => { test.pass('Disconnect specific topic successfully') const producer = getProducerWithoutThrowError(topicName) test.equal(producer, null, 'No disconnected producer') + await Producer.produceMessage({}, { topicName }, {}) + test.pass('created a new producer for the same topic') + test.ok(Producer.getProducer(topicName)) test.end() } catch (e) { test.fail(`Error thrown: ${e.message}`)