Skip to content

Commit

Permalink
feat: message queue
Browse files Browse the repository at this point in the history
- if the server happens to go offline while sending, it will attempt to send the message, or it will timeout
- future build in #21, that the messages could be saved to a RabbitMQ or REDDIS or a FILE for rehydrating

#15

[ci skip]
  • Loading branch information
Bugs5382 committed Dec 13, 2023
1 parent 631026a commit 5a1bb26
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ Benefits:
- No dependencies, making this ultra-fast.
- Automatically re-connect or retry sending
- Written in typescript and published with heavily commented type definitions
- Peer `node-hl7-server' npm package that in conjunction with this one could create a powerfull Hl7 system.
- Works in Windows or Linux based systems
- Peer `node-hl7-server' npm package that in conjunction with this one could create a powerful HL7 system.
- Works in Windows or Linux-based systems

## Table of Contents

Expand Down
66 changes: 62 additions & 4 deletions src/client/hl7Outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export class HL7Outbound extends EventEmitter {
private readonly _sockets: Map<any, any>
/** @internal */
protected _readyState: READY_STATE
/** @internal */
_pendingSetup: Promise<boolean> | boolean

constructor (client: Client, props: ClientListenerOptions, handler: OutboundHandler) {
super()
Expand All @@ -45,6 +47,7 @@ export class HL7Outbound extends EventEmitter {
this._main = client
this._nodeId = randomString(5)
this._opt = normalizeClientListenerOptions(props)
this._pendingSetup = true
this._sockets = new Map()
this._retryCount = 1
this._retryTimer = undefined
Expand All @@ -66,17 +69,70 @@ export class HL7Outbound extends EventEmitter {
* @since 1.0.0
*/
async sendMessage (message: Message | Batch): Promise<boolean> {
// if we are waiting for an ack before we can send something else, and we are in that process.
if (this._opt.waitAck && this._awaitingResponse) {
throw new HL7FatalError(500, 'Can\'t send message while we are waiting for a response.')
let attempts = 0
const maxAttempts = typeof this._opt.maxAttempts === 'undefined' ? this._main._opt.maxAttempts : this._opt.maxAttempts

const checkConnection = async (): Promise<boolean> => {
return this._readyState === READY_STATE.CONNECTED
}

const checkAck = async (): Promise<boolean> => {
return this._awaitingResponse
}

const checkSend = async (_message: string): Promise<boolean> => {
// noinspection InfiniteLoopJS
while (true) {
try {
// first, if we are closed, sorry, no more sending messages
if ((this._readyState === READY_STATE.CLOSED) || (this._readyState === READY_STATE.CLOSING)) {
// noinspection ExceptionCaughtLocallyJS
throw new HL7FatalError(500, 'In an invalid state to be able to send message.')
}
if (this._readyState !== READY_STATE.CONNECTED) {
// if we are not connected,
// check to see if we are now connected.
if (this._pendingSetup === false) {
// @todo in the future, add here to store the messages in a file or a
this._pendingSetup = checkConnection().finally(() => { this._pendingSetup = false })
}
} else if (this._readyState === READY_STATE.CONNECTED && this._opt.waitAck && this._awaitingResponse) {
// Ok, we ar now conformed connected.
// However, since we are checking
// to make sure we wait for an ACKNOWLEDGEMENT from the server,
// that the message was gotten correctly from the last one we sent.
// We are still waiting, we need to recheck again
// if we are not connected,
// check to see if we are now connected.
if (this._pendingSetup === false) {
this._pendingSetup = checkAck().finally(() => { this._pendingSetup = false })
}
}
return await this._pendingSetup
} catch (err: any) {
Error.captureStackTrace(err)
if (++attempts >= maxAttempts) {
throw err
} else {
emitter.emit('retry', err)
}
}
}
}

const emitter = new EventEmitter()

const theMessage = message.toString()

// check to see if we should be sending
await checkSend(theMessage)

// ok, if our options are to wait for an acknowledgement, set the var to "true"
if (this._opt.waitAck) {
this._awaitingResponse = true
}

const messageToSend = Buffer.from(message.toString())
const messageToSend = Buffer.from(theMessage)

const header = Buffer.alloc(6)
header.writeInt32BE(messageToSend.length + 6, 1)
Expand Down Expand Up @@ -164,6 +220,8 @@ export class HL7Outbound extends EventEmitter {
async close (): Promise<boolean> {
// mark that we set our internal that we are closing, so we do not try to re-connect
this._readyState = READY_STATE.CLOSING
// @todo Remove all pending messages that might be pending sending.
// @todo Do we dare save them as a file so if the kube process fails and restarts up, it can send them again??
this._sockets.forEach((socket) => {
if (typeof socket.destroyed !== 'undefined') {
socket.end()
Expand Down
18 changes: 17 additions & 1 deletion src/utils/normalizedClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const DEFAULT_CLIENT_OPTS = {
const DEFAULT_LISTEN_CLIENT_OPTS = {
connectionTimeout: 10000,
encoding: 'utf-8',
maxAttempts: 10,
maxConnections: 10,
retryHigh: 30000,
retryLow: 1000,
Expand All @@ -39,6 +40,10 @@ export interface ClientOptions {
/** Keep the connection alive after sending data and getting a response.
* @default true */
keepAlive?: boolean
/** Max Connections this connection makes.
* Has to be greater than 1. You cannot exceed 50.
* @default 10 */
maxAttempts?: number
/** Max delay, in milliseconds, for exponential-backoff when reconnecting
* @default 30_000 */
retryHigh?: number
Expand All @@ -57,6 +62,10 @@ export interface ClientListenerOptions {
* @default "utf-8"
*/
encoding?: BufferEncoding
/** Max Connections this connection makes.
* Has to be greater than 1. You cannot exceed 50.
* @default 10 */
maxAttempts?: number
/** Max Connections this connection makes.
* Has to be greater than 1.
* @default 10 */
Expand All @@ -76,12 +85,16 @@ export interface ClientListenerOptions {
type ValidatedClientKeys =
| 'connectionTimeout'
| 'host'
| 'maxAttempts'

type ValidatedClientListenerKeys =
| 'port'
| 'maxAttempts'
| 'maxConnections'

interface ValidatedClientOptions extends Pick<Required<ClientOptions>, ValidatedClientKeys> {
host: string
maxAttempts: number
retryHigh: number
retryLow: number
socket?: TcpSocketConnectOpts
Expand All @@ -91,6 +104,7 @@ interface ValidatedClientOptions extends Pick<Required<ClientOptions>, Validated
interface ValidatedClientListenerOptions extends Pick<Required<ClientListenerOptions>, ValidatedClientListenerKeys> {
encoding: BufferEncoding
port: number
maxAttempts: number
maxConnections: number
retryHigh: number
retryLow: number
Expand Down Expand Up @@ -122,7 +136,7 @@ export function normalizeClientOptions (raw?: ClientOptions): ValidatedClientOpt
}

assertNumber(props, 'connectionTimeout', 0)
assertNumber(props, 'maxConnections', 1)
assertNumber(props, 'maxConnections', 1, 50)

if (props.tls === true) {
props.tls = {}
Expand All @@ -144,6 +158,8 @@ export function normalizeClientListenerOptions (raw?: ClientListenerOptions): Va
}

assertNumber(props, 'connectionTimeout', 0)
assertNumber(props, 'maxAttempts', 1, 50)
assertNumber(props, 'maxConnections', 1, 50)
assertNumber(props, 'port', 0, 65353)

return props
Expand Down

0 comments on commit 5a1bb26

Please sign in to comment.