From 76410fd0ebef35bd914042ceade29df4db8635e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CBugs5382=E2=80=9D?= Date: Fri, 22 Dec 2023 13:06:50 -0500 Subject: [PATCH] fix: many MSH sends * now the client parses the messages correctly when it gets a response back from a server --- __tests__/hl7.end2end.test.ts | 136 +++++++++++++++++++++++++++++++++- src/client/hl7Outbound.ts | 27 +++++-- 2 files changed, 154 insertions(+), 9 deletions(-) diff --git a/__tests__/hl7.end2end.test.ts b/__tests__/hl7.end2end.test.ts index 1219337..db4045c 100644 --- a/__tests__/hl7.end2end.test.ts +++ b/__tests__/hl7.end2end.test.ts @@ -5,7 +5,7 @@ import path from "node:path"; import portfinder from "portfinder"; import {createDeferred, Deferred, expectEvent, sleep} from "./__utils__"; -describe('node hl7 end to end', () => { +describe('node hl7 end to end - client', () => { let dfd: Deferred @@ -270,7 +270,7 @@ describe('node hl7 end to end', () => { await res.sendResponse("AA") }) - //await expectEvent(IB_ADT, 'listen') + //await expectEvent(IB_ADT, 'listen') const client = new Client({host: '0.0.0.0'}) const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => { @@ -368,6 +368,138 @@ describe('node hl7 end to end', () => { }) + describe('...send batch with two message, get proper ACK', () => { + + let LISTEN_PORT: number + beforeEach(async () => { + LISTEN_PORT = await portfinder.getPortPromise({ + port: 3000, + stopPort: 65353 + }) + + dfd = createDeferred() + + }) + + test('...no tls', async () => { + + const server = new Server({bindAddress: '0.0.0.0'}) + const IB_ADT = server.createInbound({ port: LISTEN_PORT }, async (req, res) => { + const messageReq = req.getMessage() + const messageType = req.getType() + expect(messageType).toBe('batch') + expect(messageReq.get('MSH.12').toString()).toBe('2.7') + await res.sendResponse("AA") + }) + + // await expectEvent(IB_ADT, 'listen') + + let count: number = 0 + const client = new Client({host: '0.0.0.0'}) + const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => { + const messageRes = res.getMessage() + expect(messageRes.get('MSA.1').toString()).toBe('AA') + count = count + 1 + if (count == 2) { + dfd.resolve() + } + }) + + await expectEvent(OB_ADT, 'connect') + + let batch = new Batch() + batch.start() + + let message = new Message({ + messageHeader: { + msh_9_1: "ADT", + msh_9_2: "A01", + msh_10: 'CONTROL_ID1', + msh_11_1: "D" + } + }) + + batch.add(message) + batch.add(message) + + batch.end() + + await OB_ADT.sendMessage(batch) + + await sleep(10) + + dfd.promise + + await OB_ADT.close() + await IB_ADT.close() + + }) + + test('...tls', async () => { + + const server = new Server( + { + bindAddress: '0.0.0.0', + tls: + { + key: fs.readFileSync(path.join('certs/', 'server-key.pem')), + cert: fs.readFileSync(path.join('certs/', 'server-crt.pem')), + rejectUnauthorized: false + } + }) + const IB_ADT = server.createInbound({port: LISTEN_PORT}, async (req, res) => { + const messageReq = req.getMessage() + const messageType = req.getType() + expect(messageType).toBe('batch') + expect(messageReq.get('MSH.12').toString()).toBe('2.7') + await res.sendResponse("AA") + }) + + // await expectEvent(IB_ADT, 'listen') + + let count: number = 0 + const client = new Client({host: '0.0.0.0', tls: { rejectUnauthorized: false }}) + const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => { + const messageRes = res.getMessage() + expect(messageRes.get('MSA.1').toString()).toBe('AA') + count = count + 1 + if (count == 2) { + dfd.resolve() + } + }) + + await expectEvent(OB_ADT, 'connect') + + let batch = new Batch() + batch.start() + + let message = new Message({ + messageHeader: { + msh_9_1: "ADT", + msh_9_2: "A01", + msh_10: 'CONTROL_ID', + msh_11_1: "D" + } + }) + + batch.add(message) + batch.add(message) + + batch.end() + + await OB_ADT.sendMessage(batch) + + await sleep(10) + + dfd.promise + + await OB_ADT.close() + await IB_ADT.close() + + }) + + }) + describe('...send file with one message, get proper ACK', () => { let LISTEN_PORT: number diff --git a/src/client/hl7Outbound.ts b/src/client/hl7Outbound.ts index dca21fc..3c62159 100644 --- a/src/client/hl7Outbound.ts +++ b/src/client/hl7Outbound.ts @@ -41,6 +41,8 @@ export class HL7Outbound extends EventEmitter { protected _readyState: ReadyState /** @internal */ _pendingSetup: Promise | boolean + /** @internal */ + private _responseBuffer: string /** * @since 1.0.0 @@ -66,6 +68,7 @@ export class HL7Outbound extends EventEmitter { this._retryCount = 1 this._retryTimer = undefined this._readyState = ReadyState.CONNECTING + this._responseBuffer = '' this._connect = this._connect.bind(this) this._socket = this._connect() @@ -260,13 +263,23 @@ export class HL7Outbound extends EventEmitter { socket.on('data', (buffer: Buffer) => { this._awaitingResponse = false - let loadedMessage = buffer.toString().replace(VT, '') - // is there is F5 and CR in this message? - if (loadedMessage.includes(FS + CR)) { - // strip them out - loadedMessage = loadedMessage.replace(FS + CR, '') - const response = new InboundResponse(loadedMessage) - this._handler(response) + this._responseBuffer += buffer.toString() + + while (this._responseBuffer !== '') { + const indexOfVT = this._responseBuffer.indexOf(VT) + const indexOfFSCR = this._responseBuffer.indexOf(FS + CR) + + let loadedMessage = this._responseBuffer.substring(indexOfVT, indexOfFSCR + 2) + this._responseBuffer = this._responseBuffer.slice(indexOfFSCR + 2, this._responseBuffer.length) + + loadedMessage = loadedMessage.replace(VT, '') + // is there is F5 and CR in this message? + if (loadedMessage.includes(FS + CR)) { + // strip them out + loadedMessage = loadedMessage.replace(FS + CR, '') + const response = new InboundResponse(loadedMessage) + this._handler(response) + } } })