diff --git a/package-lock.json b/package-lock.json index 6e66d669..56f663f4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "@types/node": "^20.11.5", "@typescript-eslint/eslint-plugin": "^6.19.0", "@typescript-eslint/parser": "^6.19.0", - "amqplib": "^0.10.3", + "amqplib": "^0.10.5", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", @@ -1163,13 +1163,13 @@ } }, "node_modules/amqplib": { - "version": "0.10.4", + "version": "0.10.5", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.5.tgz", + "integrity": "sha512-Dx5zmy0Ur+Q7LPPdhz+jx5IzmJBoHd15tOeAfQ8SuvEtyPJ20hBemhOBA4b1WeORCRa0ENM/kHCzmem1w/zHvQ==", "dev": true, - "license": "MIT", "dependencies": { "@acuminous/bitsyntax": "^0.1.2", "buffer-more-ints": "~1.0.0", - "readable-stream": "1.x >=1.1.9", "url-parse": "~1.5.10" }, "engines": { @@ -3775,11 +3775,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/isarray": { - "version": "0.0.1", - "dev": true, - "license": "MIT" - }, "node_modules/isexe": { "version": "2.0.0", "dev": true, @@ -4476,17 +4471,6 @@ "safe-buffer": "^5.1.0" } }, - "node_modules/readable-stream": { - "version": "1.1.14", - "dev": true, - "license": "MIT", - "dependencies": { - "core-util-is": "~1.0.0", - "inherits": "~2.0.1", - "isarray": "0.0.1", - "string_decoder": "~0.10.x" - } - }, "node_modules/readdirp": { "version": "3.6.0", "dev": true, @@ -4827,11 +4811,6 @@ "node": "*" } }, - "node_modules/string_decoder": { - "version": "0.10.31", - "dev": true, - "license": "MIT" - }, "node_modules/string-width": { "version": "4.2.3", "dev": true, diff --git a/package.json b/package.json index 4f2804c9..392d74aa 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,7 @@ "@types/node": "^20.11.5", "@typescript-eslint/eslint-plugin": "^6.19.0", "@typescript-eslint/parser": "^6.19.0", - "amqplib": "^0.10.3", + "amqplib": "^0.10.5", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", diff --git a/src/amqp10/applicationProperties.ts b/src/amqp10/applicationProperties.ts index 400507a3..5fe49522 100644 --- a/src/amqp10/applicationProperties.ts +++ b/src/amqp10/applicationProperties.ts @@ -10,8 +10,7 @@ export class ApplicationProperties { return range(numEntries).reduce((acc: MessageApplicationProperties, _) => { const propertyKey = readUTF8String(dataReader) const nextByteType = dataReader.readUInt8() - dataReader.rewind(1) - const propertyValue = decodeFormatCode(dataReader, nextByteType, true) + const propertyValue = decodeFormatCode(dataReader, nextByteType) if (!propertyValue) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`) acc[propertyKey] = propertyValue as string | number return acc diff --git a/src/amqp10/messageAnnotations.ts b/src/amqp10/messageAnnotations.ts index 642e5d0a..0679b0cf 100644 --- a/src/amqp10/messageAnnotations.ts +++ b/src/amqp10/messageAnnotations.ts @@ -10,8 +10,7 @@ export class Annotations { return range(numEntries).reduce((acc: MessageAnnotations, _) => { const propertyKey = readUTF8String(dataReader) const nextByteType = dataReader.readUInt8() - dataReader.rewind(1) - const propertyValue = decodeFormatCode(dataReader, nextByteType, true) + const propertyValue = decodeFormatCode(dataReader, nextByteType) if (propertyValue === undefined) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`) acc[propertyKey] = propertyValue as string | number return acc diff --git a/src/amqp10/messageHeader.ts b/src/amqp10/messageHeader.ts index ef7f2ed7..1b7b925d 100644 --- a/src/amqp10/messageHeader.ts +++ b/src/amqp10/messageHeader.ts @@ -2,35 +2,34 @@ import { DataReader } from "../responses/raw_response" import { MessageHeader } from "../publisher" import { range } from "../util" import { decodeFormatCode, decodeBooleanType } from "../response_decoder" +import { FormatCode } from "./decoder" export class Header { public static parse(dataResponse: DataReader, fields: number): MessageHeader { return range(fields).reduce((acc: MessageHeader, index) => { if (dataResponse.isAtEnd()) return acc - switch (index) { - case 0: - const formatCode = dataResponse.readUInt8() - dataResponse.rewind(1) - const decodedBoolean = decodeFormatCode(dataResponse, formatCode) - if (!decodedBoolean) throw new Error(`invalid formatCode %#02x: ${formatCode}`) - acc.durable = decodedBoolean as boolean - break - case 1: - dataResponse.readUInt8() // Read type - acc.priority = dataResponse.readUInt8() - break - case 2: - const type = dataResponse.readUInt8() - acc.ttl = decodeFormatCode(dataResponse, type) as number - break - case 3: - acc.firstAcquirer = decodeBooleanType(dataResponse, true) - break - case 4: - acc.deliveryCount = dataResponse.readUInt32() - break - default: - throw new Error(`PropertiesError`) + + const type = dataResponse.readUInt8() + if (type !== FormatCode.Null) { + switch (index) { + case 0: + acc.durable = decodeBooleanType(dataResponse, type) + break + case 1: + acc.priority = decodeFormatCode(dataResponse, type) as number + break + case 2: + acc.ttl = decodeFormatCode(dataResponse, type) as number + break + case 3: + acc.firstAcquirer = decodeBooleanType(dataResponse, type) + break + case 4: + acc.deliveryCount = decodeFormatCode(dataResponse, type) as number + break + default: + throw new Error(`HeaderError`) + } } return acc }, {}) diff --git a/src/amqp10/properties.ts b/src/amqp10/properties.ts index 22ff7b0c..bbbcfaa0 100644 --- a/src/amqp10/properties.ts +++ b/src/amqp10/properties.ts @@ -1,5 +1,5 @@ import { MessageProperties } from "../publisher" -import { readUTF8String } from "../response_decoder" +import { decodeFormatCode } from "../response_decoder" import { DataReader } from "../responses/raw_response" import { range } from "../util" import { FormatCode } from "./decoder" @@ -12,52 +12,47 @@ export class Properties { if (formatCode === FormatCode.Null) { return acc } - dataResponse.rewind(1) switch (index) { case 0: - acc.messageId = readUTF8String(dataResponse) + acc.messageId = decodeFormatCode(dataResponse, formatCode) as string break case 1: // Reading of binary type - dataResponse.readUInt8() const userIdLength = dataResponse.readUInt8() acc.userId = dataResponse.readBufferOf(userIdLength) break case 2: - acc.to = readUTF8String(dataResponse) + acc.to = decodeFormatCode(dataResponse, formatCode) as string break case 3: - acc.subject = readUTF8String(dataResponse) + acc.subject = decodeFormatCode(dataResponse, formatCode) as string break case 4: - acc.replyTo = readUTF8String(dataResponse) + acc.replyTo = decodeFormatCode(dataResponse, formatCode) as string break case 5: - acc.correlationId = readUTF8String(dataResponse) + acc.correlationId = decodeFormatCode(dataResponse, formatCode) as string break case 6: - acc.contentType = readUTF8String(dataResponse) + acc.contentType = decodeFormatCode(dataResponse, formatCode) as string break case 7: - acc.contentEncoding = readUTF8String(dataResponse) + acc.contentEncoding = decodeFormatCode(dataResponse, formatCode) as string break case 8: - dataResponse.readUInt8() acc.absoluteExpiryTime = new Date(Number(dataResponse.readInt64())) break case 9: - dataResponse.readUInt8() acc.creationTime = new Date(Number(dataResponse.readInt64())) break case 10: - acc.groupId = readUTF8String(dataResponse) + acc.groupId = decodeFormatCode(dataResponse, formatCode) as string break case 11: - dataResponse.readUInt8() acc.groupSequence = dataResponse.readUInt32() break case 12: - acc.replyToGroupId = readUTF8String(dataResponse) + acc.replyToGroupId = decodeFormatCode(dataResponse, formatCode) as string break default: throw new Error(`PropertiesError`) diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 795864e3..ac6a1be4 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -328,7 +328,7 @@ function decodeMessageProperties(dataResponse: DataReader) { dataResponse.rewind(3) const type = dataResponse.readInt8() if (type !== 0) { - throw new Error(`invalid composite header: ${type}`) + throw new Error(`invalid message properties: ${type}`) } const nextType = dataResponse.readInt8() @@ -347,8 +347,7 @@ function decodeMessageHeader(dataResponse: DataReader) { throw new Error(`invalid composite header: ${type}`) } - // next, the composite type is encoded as an AMQP uint8 - dataResponse.readUInt64() + decodeAmqpValue(dataResponse) const formatCode = dataResponse.readUInt8() const headerLength = decodeFormatCode(dataResponse, formatCode) @@ -365,8 +364,7 @@ function decodeApplicationData(dataResponse: DataReader) { function decodeAmqpValue(dataResponse: DataReader) { const amqpFormatCode = dataResponse.readUInt8() - dataResponse.rewind(1) - return decodeFormatCode(dataResponse, amqpFormatCode, true) as string + return decodeFormatCode(dataResponse, amqpFormatCode) as string } function readFormatCodeType(dataResponse: DataReader) { @@ -379,13 +377,12 @@ function readFormatCodeType(dataResponse: DataReader) { export function readUTF8String(dataResponse: DataReader) { const formatCode = dataResponse.readUInt8() const decodedString = decodeFormatCode(dataResponse, formatCode) - if (!decodedString) throw new Error(`invalid formatCode %#02x: ${formatCode}`) + if (!decodedString) throw new Error(`invalid formatCode 0x${formatCode.toString(16)}`) return decodedString as string } -export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolean) { - const boolType = dataResponse.readInt8() +export function decodeBooleanType(dataResponse: DataReader, boolType: number) { switch (boolType) { case FormatCode.Bool: const boolValue = dataResponse.readInt8() @@ -395,11 +392,11 @@ export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolea case FormatCode.BoolFalse: return false default: - return defaultValue + throw new Error(`Expected boolean format code, got 0x${boolType.toString(16)}`) } } -export function decodeFormatCode(dataResponse: DataReader, formatCode: number, skipByte = false) { +export function decodeFormatCode(dataResponse: DataReader, formatCode: number) { switch (formatCode) { case FormatCode.Map8: // Read first empty byte @@ -412,7 +409,6 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s case FormatCode.SmallUlong: return dataResponse.readInt8() // Read a SmallUlong case FormatCode.Ubyte: - dataResponse.forward(1) return dataResponse.readUInt8() case FormatCode.ULong: return dataResponse.readUInt64() // Read an ULong @@ -430,35 +426,30 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s return dataResponse.readUInt32() case FormatCode.Str8: case FormatCode.Sym8: - if (skipByte) dataResponse.forward(1) return dataResponse.readString8() case FormatCode.Str32: case FormatCode.Sym32: - if (skipByte) dataResponse.forward(1) return dataResponse.readString32() case FormatCode.Uint0: return 0 case FormatCode.SmallUint: - dataResponse.forward(1) // Skipping formatCode return dataResponse.readUInt8() case FormatCode.Uint: - dataResponse.forward(1) // Skipping formatCode return dataResponse.readUInt32() case FormatCode.SmallInt: - dataResponse.forward(1) // Skipping formatCode return dataResponse.readInt8() case FormatCode.Int: - dataResponse.forward(1) // Skipping formatCode return dataResponse.readInt32() case FormatCode.Bool: case FormatCode.BoolTrue: case FormatCode.BoolFalse: - return decodeBooleanType(dataResponse, true) + return decodeBooleanType(dataResponse, formatCode) case FormatCode.Null: - dataResponse.forward(1) // Skipping formatCode + return 0 + case FormatCode.ULong0: return 0 default: - throw new Error(`ReadCompositeHeader Invalid type ${formatCode}`) + throw new Error(`FormatCode Invalid type ${formatCode}`) } } diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 22d8b03b..cb145df8 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -20,6 +20,7 @@ import { createConsumerRef, createPublisher, createStreamName, + createAmqpClient, } from "../support/fake_data" import { Rabbit, RabbitConnectionResponse } from "../support/rabbit" import { @@ -31,6 +32,7 @@ import { username, waitSleeping, } from "../support/util" +import { Connection, Channel } from "amqplib" describe("declare consumer", () => { let streamName: string @@ -38,6 +40,8 @@ describe("declare consumer", () => { const rabbit = new Rabbit(username, password) let client: Client let publisher: Publisher + let amqpClient: Connection + let amqpChannel: Channel const previousMaxSharedClientInstances = process.env.MAX_SHARED_CLIENT_INSTANCES before(() => { @@ -382,6 +386,50 @@ describe("declare consumer", () => { }, 5000) }).timeout(6000) }) + + describe("when receiving a message published from amqplib", () => { + beforeEach(async () => { + amqpClient = await createAmqpClient(username, password) + amqpChannel = await amqpClient.createChannel() + }) + + afterEach(async () => { + try { + await amqpChannel.close() + await amqpClient.close() + } catch (_e) {} + }) + + it("the message should be handled", async () => { + const message = "helloworld" + amqpChannel.sendToQueue(streamName, Buffer.from(message)) + + let receivedMessage: Message + await client.declareConsumer({ stream: streamName, offset: Offset.first() }, async (msg) => { + receivedMessage = msg + }) + + await eventually(async () => { + expect(receivedMessage?.content.toString()).eql(message) + }) + }).timeout(10000) + + it("the message with headers should be handled", async () => { + const message = "helloworld" + const headers = { priority: 5 } + amqpChannel.sendToQueue(streamName, Buffer.from(message), headers) + + let receivedMessage: Message + await client.declareConsumer({ stream: streamName, offset: Offset.first() }, async (msg) => { + receivedMessage = msg + }) + + await eventually(async () => { + expect(receivedMessage?.content.toString()).eql(message) + expect(receivedMessage?.messageHeader!.priority).deep.equal(headers.priority) + }) + }).timeout(10000) + }) }) function createProperties(): MessageProperties { @@ -421,7 +469,6 @@ function createMessageHeader(): MessageHeader { return { deliveryCount: 300, durable: true, - ttl: 0, firstAcquirer: true, priority: 100, } diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index 55658afd..dc34c910 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -7,6 +7,7 @@ import { Consumer, Publisher } from "../../src" import { getTestNodesFromEnv } from "./util" import { createLogger, format, transports } from "winston" import { inspect } from "util" +import { connect as amqpConnect, Connection } from "amqplib" export function createProperties(): MessageProperties { return { @@ -96,3 +97,8 @@ export const testLogger = createLogger({ ), transports: new transports.Console(), }) + +export async function createAmqpClient(username: string, password: string): Promise { + const [firstNode] = getTestNodesFromEnv() + return await amqpConnect(`amqp://${username}:${password}@${firstNode.host}:5672/`) +} diff --git a/test/support/util.ts b/test/support/util.ts index 3839ec85..2c922d95 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -201,8 +201,7 @@ export function decodeMessageTesting(dataResponse: DataReader, length: number): break case FormatCodeType.AmqpValue: const amqpFormatCode = dataResponse.readUInt8() - dataResponse.rewind(1) - amqpValue = decodeFormatCode(dataResponse, amqpFormatCode, true) as string + amqpValue = decodeFormatCode(dataResponse, amqpFormatCode) as string break default: throw new Error(`Not supported format code ${formatCode}`)