Skip to content

Commit

Permalink
fix: many MSH sends
Browse files Browse the repository at this point in the history
* now the client parses the messages correctly when it gets a response back from a server
  • Loading branch information
Bugs5382 committed Dec 22, 2023
1 parent f88f4df commit 76410fd
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 9 deletions.
136 changes: 134 additions & 2 deletions __tests__/hl7.end2end.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import path from "node:path";
import portfinder from "portfinder";
import {createDeferred, Deferred, expectEvent, sleep} from "./__utils__";

describe('node hl7 end to end', () => {
describe('node hl7 end to end - client', () => {

let dfd: Deferred<void>

Expand Down Expand Up @@ -270,7 +270,7 @@ describe('node hl7 end to end', () => {
await res.sendResponse("AA")
})

//await expectEvent(IB_ADT, 'listen')
//await expectEvent(IB_ADT, 'listen')

const client = new Client({host: '0.0.0.0'})
const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => {
Expand Down Expand Up @@ -368,6 +368,138 @@ describe('node hl7 end to end', () => {

})

describe('...send batch with two message, get proper ACK', () => {

let LISTEN_PORT: number
beforeEach(async () => {
LISTEN_PORT = await portfinder.getPortPromise({
port: 3000,
stopPort: 65353
})

dfd = createDeferred<void>()

})

test('...no tls', async () => {

const server = new Server({bindAddress: '0.0.0.0'})
const IB_ADT = server.createInbound({ port: LISTEN_PORT }, async (req, res) => {
const messageReq = req.getMessage()
const messageType = req.getType()
expect(messageType).toBe('batch')
expect(messageReq.get('MSH.12').toString()).toBe('2.7')
await res.sendResponse("AA")
})

// await expectEvent(IB_ADT, 'listen')

let count: number = 0
const client = new Client({host: '0.0.0.0'})
const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => {
const messageRes = res.getMessage()
expect(messageRes.get('MSA.1').toString()).toBe('AA')
count = count + 1
if (count == 2) {
dfd.resolve()
}
})

await expectEvent(OB_ADT, 'connect')

let batch = new Batch()
batch.start()

let message = new Message({
messageHeader: {
msh_9_1: "ADT",
msh_9_2: "A01",
msh_10: 'CONTROL_ID1',
msh_11_1: "D"
}
})

batch.add(message)
batch.add(message)

batch.end()

await OB_ADT.sendMessage(batch)

await sleep(10)

dfd.promise

await OB_ADT.close()
await IB_ADT.close()

})

test('...tls', async () => {

const server = new Server(
{
bindAddress: '0.0.0.0',
tls:
{
key: fs.readFileSync(path.join('certs/', 'server-key.pem')),
cert: fs.readFileSync(path.join('certs/', 'server-crt.pem')),
rejectUnauthorized: false
}
})
const IB_ADT = server.createInbound({port: LISTEN_PORT}, async (req, res) => {
const messageReq = req.getMessage()
const messageType = req.getType()
expect(messageType).toBe('batch')
expect(messageReq.get('MSH.12').toString()).toBe('2.7')
await res.sendResponse("AA")
})

// await expectEvent(IB_ADT, 'listen')

let count: number = 0
const client = new Client({host: '0.0.0.0', tls: { rejectUnauthorized: false }})
const OB_ADT = client.createOutbound({ port: LISTEN_PORT }, async (res) => {
const messageRes = res.getMessage()
expect(messageRes.get('MSA.1').toString()).toBe('AA')
count = count + 1
if (count == 2) {
dfd.resolve()
}
})

await expectEvent(OB_ADT, 'connect')

let batch = new Batch()
batch.start()

let message = new Message({
messageHeader: {
msh_9_1: "ADT",
msh_9_2: "A01",
msh_10: 'CONTROL_ID',
msh_11_1: "D"
}
})

batch.add(message)
batch.add(message)

batch.end()

await OB_ADT.sendMessage(batch)

await sleep(10)

dfd.promise

await OB_ADT.close()
await IB_ADT.close()

})

})

describe('...send file with one message, get proper ACK', () => {

let LISTEN_PORT: number
Expand Down
27 changes: 20 additions & 7 deletions src/client/hl7Outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export class HL7Outbound extends EventEmitter {
protected _readyState: ReadyState
/** @internal */
_pendingSetup: Promise<boolean> | boolean
/** @internal */
private _responseBuffer: string

/**
* @since 1.0.0
Expand All @@ -66,6 +68,7 @@ export class HL7Outbound extends EventEmitter {
this._retryCount = 1
this._retryTimer = undefined
this._readyState = ReadyState.CONNECTING
this._responseBuffer = ''

this._connect = this._connect.bind(this)
this._socket = this._connect()
Expand Down Expand Up @@ -260,13 +263,23 @@ export class HL7Outbound extends EventEmitter {

socket.on('data', (buffer: Buffer) => {
this._awaitingResponse = false
let loadedMessage = buffer.toString().replace(VT, '')
// is there is F5 and CR in this message?
if (loadedMessage.includes(FS + CR)) {
// strip them out
loadedMessage = loadedMessage.replace(FS + CR, '')
const response = new InboundResponse(loadedMessage)
this._handler(response)
this._responseBuffer += buffer.toString()

while (this._responseBuffer !== '') {
const indexOfVT = this._responseBuffer.indexOf(VT)
const indexOfFSCR = this._responseBuffer.indexOf(FS + CR)

let loadedMessage = this._responseBuffer.substring(indexOfVT, indexOfFSCR + 2)
this._responseBuffer = this._responseBuffer.slice(indexOfFSCR + 2, this._responseBuffer.length)

loadedMessage = loadedMessage.replace(VT, '')
// is there is F5 and CR in this message?
if (loadedMessage.includes(FS + CR)) {
// strip them out
loadedMessage = loadedMessage.replace(FS + CR, '')
const response = new InboundResponse(loadedMessage)
this._handler(response)
}
}
})

Expand Down

0 comments on commit 76410fd

Please sign in to comment.