diff --git a/__tests__/hl7.client.test.ts b/__tests__/hl7.client.test.ts index 76c526f..59baa22 100644 --- a/__tests__/hl7.client.test.ts +++ b/__tests__/hl7.client.test.ts @@ -1,6 +1,6 @@ import portfinder from 'portfinder' -import {Server, Listener as ServerListener} from "../../node-hl7-server/src"; -import {Client, Listener, Message} from '../src' +import {Hl7Inbound, Server} from "../../node-hl7-server/src"; +import {Client, HL7Outbound, Message} from '../src' import {expectEvent} from "./__utils__/utils"; describe('node hl7 client', () => { @@ -10,7 +10,7 @@ describe('node hl7 client', () => { test(`error - hostname has to be string`, async () => { try { // @ts-expect-error this is not a string - new Client({hostname: 351123}) + new Client({host: 351123}) } catch (err: any) { expect(err.message).toBe('hostname is not valid string.') } @@ -18,7 +18,7 @@ describe('node hl7 client', () => { test(`error - ipv4 and ipv6 both can not be true exist`, async () => { try { - new Client({hostname: '5.8.6.1', ipv6: true, ipv4: true}) + new Client({host: '5.8.6.1', ipv6: true, ipv4: true}) } catch (err: any) { expect(err.message).toBe('ipv4 and ipv6 both can\'t be set to be both used exclusively.') } @@ -26,7 +26,7 @@ describe('node hl7 client', () => { test(`error - ipv4 not valid address`, async () => { try { - new Client({hostname: "123.34.52.455", ipv4: true}) + new Client({host: "123.34.52.455", ipv4: true}) } catch (err: any) { expect(err.message).toBe('hostname is not a valid IPv4 address.') } @@ -34,7 +34,7 @@ describe('node hl7 client', () => { test(`error - ipv4 valid address`, async () => { try { - new Client({hostname: "123.34.52.45", ipv4: true}) + new Client({host: "123.34.52.45", ipv4: true}) } catch (err: any) { expect(err.message).toBeUndefined() } @@ -42,7 +42,7 @@ describe('node hl7 client', () => { test(`error - ipv6 not valid address`, async () => { try { - new Client({hostname: "2001:0db8:85a3:0000:zz00:8a2e:0370:7334", ipv6: true}) + new Client({host: "2001:0db8:85a3:0000:zz00:8a2e:0370:7334", ipv6: true}) } catch (err: any) { expect(err.message).toBe('hostname is not a valid IPv6 address.') } @@ -50,15 +50,15 @@ describe('node hl7 client', () => { test(`error - ipv6 valid address`, async () => { try { - new Client({hostname: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", ipv6: true}) + new Client({host: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", ipv6: true}) } catch (err: any) { expect(err.message).toBeUndefined() } }) test(`properties exist`, async () => { - const client = new Client({ hostname: 'hl7.server.com'}) - expect(client).toHaveProperty("connectToListener") + const client = new Client({ host: 'hl7.server.com'}) + expect(client).toHaveProperty("createOutbound") }) }) @@ -68,14 +68,14 @@ describe('node hl7 client', () => { let client: Client beforeEach(() => { - client = new Client({hostname: 'localhost'}) + client = new Client({host: 'localhost'}) }) test('error - no port specified', async () => { try { // @ts-expect-error port is not specified - client.connectToListener() + client.createOutbound() } catch (err: any) { expect(err.message).toBe('port is not defined.') } @@ -85,7 +85,7 @@ describe('node hl7 client', () => { test('error - port not a number', async () => { try { // @ts-expect-error port is not specified as a number - client.connectToListener({ port: "12345"}, async () => {}) + client.createOutbound({ port: "12345"}, async () => {}) } catch (err: any) { expect(err.message).toBe('port is not valid number.') } @@ -93,7 +93,7 @@ describe('node hl7 client', () => { test('error - port less than 0', async () => { try { - client.connectToListener({ port: -1}, async () => {}) + client.createOutbound({ port: -1}, async () => {}) } catch (err: any) { expect(err.message).toBe('port must be a number (0, 65353).') } @@ -101,7 +101,7 @@ describe('node hl7 client', () => { test('error - port greater than 65353', async () => { try { - client.connectToListener({ port: 65354}, async () => {}) + client.createOutbound({ port: 65354}, async () => {}) } catch (err: any) { expect(err.message).toBe('port must be a number (0, 65353).') } @@ -119,10 +119,10 @@ describe('node hl7 client', () => { }) const server = new Server({ bindAddress: 'localhost'}) - const listener = server.createListener({port: LISTEN_PORT}, async () => {}) + const listener = server.createInbound({port: LISTEN_PORT}, async () => {}) - const client = new Client({ hostname: 'localhost'}) - const outGoing = client.connectToListener({ port: LISTEN_PORT }, () => {}) + const client = new Client({ host: 'localhost'}) + const outGoing = client.createOutbound({ port: LISTEN_PORT }, async () => {}) await expectEvent(listener, 'client.connect') await expectEvent(outGoing, 'connect') @@ -132,77 +132,17 @@ describe('node hl7 client', () => { }) - test('...should be able to connect to the same port from different clients', async () => { - - const LISTEN_PORT = await portfinder.getPortPromise({ - port: 3000, - stopPort: 65353 - }) - - const server = new Server({ bindAddress: 'localhost'}) - const listener = server.createListener({port: LISTEN_PORT}, async () => {}) - - const client = new Client({ hostname: 'localhost'}) - const outGoing = client.connectToListener({ port: LISTEN_PORT }, () => {}) - - const client_2 = new Client({ hostname: 'localhost'}) - const outGoing_2 = client_2.connectToListener({ port: LISTEN_PORT }, () => {}) - - await expectEvent(outGoing, 'connect') - await expectEvent(outGoing_2, 'connect') - - await outGoing.close() - await outGoing_2.close() - await listener.close() - - }) - - test('...two different ports', async () => { - - const LISTEN_PORT = await portfinder.getPortPromise({ - port: 3000, - stopPort: 65353 - }) - - const LISTEN_PORT_2 = await portfinder.getPortPromise({ - port: 3001, - stopPort: 65353 - }) - - const server = new Server({ bindAddress: 'localhost'}) - const listener = server.createListener({port: LISTEN_PORT}, async () => {}) - - const server_2 = new Server({ bindAddress: 'localhost'}) - const listener_2 = server_2.createListener({port: LISTEN_PORT_2}, async () => {}) - - - const client = new Client({ hostname: 'localhost'}) - const outGoing = client.connectToListener({ port: LISTEN_PORT }, () => {}) - - const client_2 = new Client({ hostname: 'localhost'}) - const outGoing_2 = client_2.connectToListener({ port: LISTEN_PORT_2 }, () => {}) - - await expectEvent(outGoing, 'connect') - await expectEvent(outGoing_2, 'connect') - - await outGoing.close() - await outGoing_2.close() - await listener.close() - await listener_2.close() - - }) - }) - describe('end to end testing', () => { + describe('server/client sanity checks', () => { let waitAck: number = 0 let server: Server - let listener: ServerListener + let listener: Hl7Inbound let client: Client - let outGoing: Listener + let outGoing: HL7Outbound beforeEach(async () => { @@ -212,10 +152,10 @@ describe('node hl7 client', () => { }) server = new Server({bindAddress: 'localhost'}) - listener = server.createListener({port: LISTEN_PORT}, async () => {}) + listener = server.createInbound({port: LISTEN_PORT}, async () => {}) - client = new Client({hostname: 'localhost'}) - outGoing = client.connectToListener({port: LISTEN_PORT, waitAck: waitAck !== 2}, async () => {}) + client = new Client({host: 'localhost'}) + outGoing = client.createOutbound({port: LISTEN_PORT, waitAck: waitAck !== 2}, async () => {}) }) @@ -295,4 +235,12 @@ describe('node hl7 client', () => { }) + describe('end to end tests', () => { + + test.skip('...send message, get proper ACK', async () => { + + }) + + }) + }) \ No newline at end of file diff --git a/src/client/client.ts b/src/client/client.ts index a4c8199..030c116 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,6 +1,6 @@ import EventEmitter from 'events' -import { Listener } from './listener.js' import { normalizeClientOptions, ClientListenerOptions, ClientOptions } from '../utils/normalizeClient.js' +import {HL7Outbound} from "./hl7Outbound"; /** * Client Class @@ -16,7 +16,12 @@ export class Client extends EventEmitter { /** Connect to a listener to a specified port. * @since 1.0.0 */ - connectToListener (props: ClientListenerOptions, handler: any): Listener { - return new Listener(this, props, handler) + createOutbound (props: ClientListenerOptions, handler?: any): HL7Outbound { + return new HL7Outbound(this, props, handler) } + + getHost(): string { + return this._opt.host + } + } diff --git a/src/client/hl7Outbound.ts b/src/client/hl7Outbound.ts new file mode 100644 index 0000000..8382f27 --- /dev/null +++ b/src/client/hl7Outbound.ts @@ -0,0 +1,176 @@ +import EventEmitter from 'events' +import * as net from 'net' +import * as tls from 'tls' +import {Batch} from '../builder/batch.js' +import {Message} from '../builder/message.js' +import {randomString} from "../utils"; +//import { CR, FS, VT } from '../utils/constants.js' +import {HL7FatalError} from '../utils/exception' +import {ClientListenerOptions, normalizeClientListenerOptions} from '../utils/normalizeClient.js' +import {Client} from './client.js' + +/** HL7 Outbound Class + * @since 1.0.0 */ +export class HL7Outbound extends EventEmitter { + /** @internal */ + private _awaitingResponse: boolean + /** @internal */ + _handler?: any | undefined // @todo is this needed? + /** @internal */ + private _main: Client + /** @internal */ + private _nodeId: string + /** @internal */ + private _opt: ReturnType + /** @internal */ + private _server: net.Socket | tls.TLSSocket + /** @internal */ + private _sockets: Map + + constructor (client: Client, props: ClientListenerOptions, handler?: any) { + super() + this._main = client + this._nodeId = randomString(5) + + this._awaitingResponse = false + + // process listener options + this._opt = normalizeClientListenerOptions(props) + + this._sockets = new Map() + this._handler = handler + + this._connect = this._connect.bind(this) + this._server = this._connect() + } + + getPort(): string { + return this._opt.port.toString() + } + + /** Send a HL7 Message to the Listener + * @since 1.0.0 + */ + async sendMessage (message: Message | Batch): Promise { + // if we are waiting for an ack before we can send something else, and we are in that process. + if (this._opt.waitAck && this._awaitingResponse) { + throw new HL7FatalError(500, 'Can\'t send message while we are waiting for a response.') + } + + // ok, if our options are to wait for an acknowledgement, set the var to "true" + if (this._opt.waitAck) { + this._awaitingResponse = true + } + + const toSendData = Buffer.from(message.toString()) + + return this._server?.write(toSendData, 'utf8', () => { + // console.log(toSendData) + }) + + } + + /** @internal */ + private _connect (): net.Socket | tls.TLSSocket { + let server: net.Socket | tls.TLSSocket + let host = this._main._opt.host + let port = this._opt.port + let _opt_tls = this._main._opt.tls + + if (typeof _opt_tls !== 'undefined') { + // @todo this needs to be expanded on for TLS options + server = tls.connect({host, port}) + } else { + + server = net.createConnection({host, port}, () => { + + // set no delay + server.setNoDelay(true) + + // add socket + this._addSocket(this._nodeId, server, true) + + // check to make sure we do not max out on connections, we shouldn't... + if (this._sockets.size > this._opt.maxConnections) { + this._manageConnections() + } + }) + } + + server.on('connect', () => { + this.emit('connect') + }) + + server.on('data', buffer => { + this.emit('data', buffer.toString()) + }) + + server.on('error', err => { + this._removeSocket(this._nodeId) + this.emit('client.error', err, this._nodeId) + throw new HL7FatalError(500, 'Unable to connect to remote host.') + }) + + server.on('end', () => { + this._removeSocket(this._nodeId) + this.emit('client.end') + }) + + server.unref() + + return server + + } + + /** Close Client Listener Instance. + * @since 1.0.0 */ + async close (): Promise { + this._sockets.forEach((socket) => { + if (typeof socket.destroyed !== 'undefined') { + socket.end() + socket.destroy() + } + }) + this._sockets.clear() + + this.emit('client.close') + + return true + } + + /** @internal */ + private _addSocket (nodeId: string, socket: any, b: boolean): void { + const s = this._sockets.get(nodeId) + if (!b && typeof s !== 'undefined' && typeof s.destroyed !== 'undefined') { + return + } + this._sockets.set(nodeId, socket) + } + + /** @internal */ + private _removeSocket (nodeId: string): void { + const socket = this._sockets.get(nodeId) + if (typeof socket !== 'undefined' && typeof socket.destroyed !== 'undefined') { + socket.destroy() + } + this._sockets.delete(nodeId) + } + + /**@internal */ + private _manageConnections() { + let count = this._sockets.size - this._opt.maxConnections; + if (count <= 0) { + return + } + + const list: { nodeID: any; lastUsed: any }[] = []; + this._sockets.forEach((socket, nodeID) => list.push({ nodeID, lastUsed: socket.lastUsed })); + list.sort((a, b) => a.lastUsed - b.lastUsed); + + count = Math.min(count, list.length - 1); + const removable = list.slice(0, count); + + removable.forEach(({ nodeID }) => this._removeSocket(nodeID)); + + } +} diff --git a/src/client/listener.ts b/src/client/listener.ts deleted file mode 100644 index 1e9b62b..0000000 --- a/src/client/listener.ts +++ /dev/null @@ -1,120 +0,0 @@ -import EventEmitter from 'events' -import { Socket } from 'net' -import * as net from 'net' -import * as tls from 'tls' -import { Batch } from '../builder/batch.js' -import { Message } from '../builder/message.js' -import { CR, FS, VT } from '../utils/constants.js' -import { HL7FatalError } from '../utils/exception' -import { Client } from './client.js' -import { ClientListenerOptions, normalizeClientListenerOptions } from '../utils/normalizeClient.js' - -/** Listener Class - * @since 1.0.0 */ -export class Listener extends EventEmitter { - /** @internal */ - _awaitingResponse: boolean - /** @internal */ - _handler?: any | undefined - /** @internal */ - _lastUsed?: Date - /** @internal */ - _main: Client - /** @internal */ - _opt: ReturnType - /** @internal */ - _server: net.Socket | tls.TLSSocket | undefined - /** @internal */ - _socket?: Socket | undefined - - constructor (client: Client, props: ClientListenerOptions, handler?: any) { - super() - this._main = client - this._awaitingResponse = false - - // process listener options - this._opt = normalizeClientListenerOptions(props) - - this._socket = undefined - this._handler = handler - - this._connect = this._connect.bind(this) - this._server = this._connect() - } - - /** Send a HL7 Message to the Listener - * @since 1.0.0 - */ - async sendMessage (message: Message | Batch): Promise { - // if we are waiting for an ack before we can send something else, and we are in that process. - if (this._opt.waitAck && this._awaitingResponse) { - throw new HL7FatalError(500, 'Can\'t send message while we are waiting for a response.') - } - - if (typeof this._socket !== 'undefined' && this._socket.destroyed) { - // if we have auto connection and retry, this might take a while to fire. - throw new HL7FatalError(500, 'The socket/connect has already been destroyed. Please reconnect.') - } - - if (typeof this._socket === 'undefined') { - throw new HL7FatalError(500, 'There is no valid connection.') - } - - // ok, if our options are to wait for an acknowledgement, set the var to "true" - if (this._opt.waitAck) { - this._awaitingResponse = true - } - - const toSendData = message.toString() - - this._server?.write(`${VT}${toSendData}${FS}${CR}`) - } - - /** @internal */ - private _connect (): Socket | tls.TLSSocket { - let server: Socket | tls.TLSSocket - - if (this._main._opt.tls != null) { - server = tls.connect({ port: this._opt.port }) - } else { - server = net.createConnection({ host: this._main._opt.hostname, port: this._opt.port }, () => { - this._lastUsed = new Date() - server.setNoDelay(true) - this._socket = server - this.emit('connect', server) - }) - } - - server.on('close', () => { - this.emit('close') - }) - - server.on('data', data => { - this.emit('data', data) - }) - - server.on('error', err => { - this.emit('error', err) - throw new HL7FatalError(500, 'Unable to connect to remote host.') - }) - - server.on('end', () => { - this.emit('end') - }) - - server.unref() - - return server - } - - /** Close Client Listener Instance. - * @since 1.0.0 */ - async close (): Promise { - if (typeof this._socket !== 'undefined') { - this._socket.end() - this._socket.destroy() - this.emit('client.close') - } - return true - } -} diff --git a/src/index.ts b/src/index.ts index 5720e7b..42e1cc3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,6 @@ import { Component } from './builder/modules/component.js' import { Delimiters } from './builder/decorators/delimiters.js' import { Field } from './builder/modules/field.js' import { FieldRepetition } from './builder/modules/fieldRepetition.js' -import { Listener } from './client/listener.js' import { Message } from './builder/message.js' import { Segment } from './builder/modules/segment.js' import { SegmentList } from './builder/modules/segmentList.js' @@ -12,9 +11,10 @@ import { HL7_SPEC, HL7_SPEC_BASE } from './specification/specification.js' import { SubComponent } from './builder/modules/subComponent.js' import { Batch } from './builder/batch.js' import { ParserPlan } from './utils/parserPlan.js' +import { HL7Outbound } from './client/hl7Outbound.js' export default Client -export { Client, Listener, ParserPlan, Batch, Message, Delimiters, Segment, SegmentList, Component, SubComponent, Field, FieldRepetition } +export { Client, HL7Outbound, ParserPlan, Batch, Message, Delimiters, Segment, SegmentList, Component, SubComponent, Field, FieldRepetition } export { assertNumber, isBatch, isFile, createHL7Date, validIPv6, validIPv4 } from './utils/index.js' /** HL7 Specs **/ diff --git a/src/utils/index.ts b/src/utils/index.ts index 4a9a7be..72e640f 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -122,3 +122,16 @@ export const decodeHexString = (value: string): string => { } return result.join('') } + +/** @internal */ +export const randomString = (length = 20): string => { + let result = '' + const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_' + const charactersLength = characters.length + let counter = 0 + while (counter < length) { + result += characters.charAt(Math.floor(Math.random() * charactersLength)) + counter += 1 + } + return result +} diff --git a/src/utils/normalizeClient.ts b/src/utils/normalizeClient.ts index 48078a7..3d86a18 100644 --- a/src/utils/normalizeClient.ts +++ b/src/utils/normalizeClient.ts @@ -5,6 +5,7 @@ import * as Util from './index.js' const DEFAULT_CLIENT_OPTS = { acquireTimeout: 20000, connectionTimeout: 10000, + maxConnections: 10, waitAck: true } @@ -20,8 +21,8 @@ export interface ClientOptions { /** Max wait time, in milliseconds, for a connection attempt * @default 10_000 */ connectionTimeout?: number - /** Hostname - You can do a FQDN or the IPv(4|6) address. */ - hostname: string + /** Host - You can do a FQDN or the IPv(4|6) address. */ + host: string /** IPv4 - If this is set to true, only IPv4 address will be used and also validated upon installation from the hostname property. * @default false */ ipv4?: boolean @@ -50,6 +51,10 @@ export interface ClientListenerOptions { /** Keep the connection alive after sending data and getting a response. * @default true */ keepAlive?: boolean + /** Max Connections this connection makes. + * Has to be greater than 1. + * @default 10 */ + maxConnections?: number /** Additional options when creating the TCP socket with net.connect(). */ socket?: TcpSocketConnectOpts /** The port we should connect on the server. */ @@ -61,25 +66,20 @@ export interface ClientListenerOptions { type ValidatedClientKeys = | 'acquireTimeout' | 'connectionTimeout' - | 'hostname' + | 'host' type ValidatedClientListenerKeys = | 'port' interface ValidatedClientOptions extends Pick, ValidatedClientKeys> { - hostname: string - socket?: TcpSocketConnectOpts - tls?: TLSOptions -} - -interface ValidatedClientOptions extends Pick, ValidatedClientKeys> { - hostname: string + host: string socket?: TcpSocketConnectOpts tls?: TLSOptions } interface ValidatedClientListenerOptions extends Pick, ValidatedClientListenerKeys> { port: number + maxConnections: number waitAck: boolean } @@ -87,7 +87,7 @@ interface ValidatedClientListenerOptions extends Pick