Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 4 additions & 25 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"@types/node": "^20.11.5",
"@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0",
"amqplib": "^0.10.3",
"amqplib": "^0.10.5",
"chai": "^4.3.7",
"chai-as-promised": "^7.1.1",
"chai-spies": "^1.1.0",
Expand Down
3 changes: 1 addition & 2 deletions src/amqp10/applicationProperties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ export class ApplicationProperties {
return range(numEntries).reduce((acc: MessageApplicationProperties, _) => {
const propertyKey = readUTF8String(dataReader)
const nextByteType = dataReader.readUInt8()
dataReader.rewind(1)
const propertyValue = decodeFormatCode(dataReader, nextByteType, true)
const propertyValue = decodeFormatCode(dataReader, nextByteType)
if (!propertyValue) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`)
acc[propertyKey] = propertyValue as string | number
return acc
Expand Down
3 changes: 1 addition & 2 deletions src/amqp10/messageAnnotations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ export class Annotations {
return range(numEntries).reduce((acc: MessageAnnotations, _) => {
const propertyKey = readUTF8String(dataReader)
const nextByteType = dataReader.readUInt8()
dataReader.rewind(1)
const propertyValue = decodeFormatCode(dataReader, nextByteType, true)
const propertyValue = decodeFormatCode(dataReader, nextByteType)
if (propertyValue === undefined) throw new Error(`invalid nextByteType %#02x: ${nextByteType}`)
acc[propertyKey] = propertyValue as string | number
return acc
Expand Down
47 changes: 23 additions & 24 deletions src/amqp10/messageHeader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,34 @@ import { DataReader } from "../responses/raw_response"
import { MessageHeader } from "../publisher"
import { range } from "../util"
import { decodeFormatCode, decodeBooleanType } from "../response_decoder"
import { FormatCode } from "./decoder"

export class Header {
public static parse(dataResponse: DataReader, fields: number): MessageHeader {
return range(fields).reduce((acc: MessageHeader, index) => {
if (dataResponse.isAtEnd()) return acc
switch (index) {
case 0:
const formatCode = dataResponse.readUInt8()
dataResponse.rewind(1)
const decodedBoolean = decodeFormatCode(dataResponse, formatCode)
if (!decodedBoolean) throw new Error(`invalid formatCode %#02x: ${formatCode}`)
acc.durable = decodedBoolean as boolean
break
case 1:
dataResponse.readUInt8() // Read type
acc.priority = dataResponse.readUInt8()
break
case 2:
const type = dataResponse.readUInt8()
acc.ttl = decodeFormatCode(dataResponse, type) as number
break
case 3:
acc.firstAcquirer = decodeBooleanType(dataResponse, true)
break
case 4:
acc.deliveryCount = dataResponse.readUInt32()
break
default:
throw new Error(`PropertiesError`)

const type = dataResponse.readUInt8()
if (type !== FormatCode.Null) {
switch (index) {
case 0:
acc.durable = decodeBooleanType(dataResponse, type)
break
case 1:
acc.priority = decodeFormatCode(dataResponse, type) as number
break
case 2:
acc.ttl = decodeFormatCode(dataResponse, type) as number
break
case 3:
acc.firstAcquirer = decodeBooleanType(dataResponse, type)
break
case 4:
acc.deliveryCount = decodeFormatCode(dataResponse, type) as number
break
default:
throw new Error(`HeaderError`)
}
}
return acc
}, {})
Expand Down
25 changes: 10 additions & 15 deletions src/amqp10/properties.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MessageProperties } from "../publisher"
import { readUTF8String } from "../response_decoder"
import { decodeFormatCode } from "../response_decoder"
import { DataReader } from "../responses/raw_response"
import { range } from "../util"
import { FormatCode } from "./decoder"
Expand All @@ -12,52 +12,47 @@ export class Properties {
if (formatCode === FormatCode.Null) {
return acc
}
dataResponse.rewind(1)
switch (index) {
case 0:
acc.messageId = readUTF8String(dataResponse)
acc.messageId = decodeFormatCode(dataResponse, formatCode) as string
break
case 1:
// Reading of binary type
dataResponse.readUInt8()
const userIdLength = dataResponse.readUInt8()
acc.userId = dataResponse.readBufferOf(userIdLength)
break
case 2:
acc.to = readUTF8String(dataResponse)
acc.to = decodeFormatCode(dataResponse, formatCode) as string
break
case 3:
acc.subject = readUTF8String(dataResponse)
acc.subject = decodeFormatCode(dataResponse, formatCode) as string
break
case 4:
acc.replyTo = readUTF8String(dataResponse)
acc.replyTo = decodeFormatCode(dataResponse, formatCode) as string
break
case 5:
acc.correlationId = readUTF8String(dataResponse)
acc.correlationId = decodeFormatCode(dataResponse, formatCode) as string
break
case 6:
acc.contentType = readUTF8String(dataResponse)
acc.contentType = decodeFormatCode(dataResponse, formatCode) as string
break
case 7:
acc.contentEncoding = readUTF8String(dataResponse)
acc.contentEncoding = decodeFormatCode(dataResponse, formatCode) as string
break
case 8:
dataResponse.readUInt8()
acc.absoluteExpiryTime = new Date(Number(dataResponse.readInt64()))
break
case 9:
dataResponse.readUInt8()
acc.creationTime = new Date(Number(dataResponse.readInt64()))
break
case 10:
acc.groupId = readUTF8String(dataResponse)
acc.groupId = decodeFormatCode(dataResponse, formatCode) as string
break
case 11:
dataResponse.readUInt8()
acc.groupSequence = dataResponse.readUInt32()
break
case 12:
acc.replyToGroupId = readUTF8String(dataResponse)
acc.replyToGroupId = decodeFormatCode(dataResponse, formatCode) as string
break
default:
throw new Error(`PropertiesError`)
Expand Down
31 changes: 11 additions & 20 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ function decodeMessageProperties(dataResponse: DataReader) {
dataResponse.rewind(3)
const type = dataResponse.readInt8()
if (type !== 0) {
throw new Error(`invalid composite header: ${type}`)
throw new Error(`invalid message properties: ${type}`)
}

const nextType = dataResponse.readInt8()
Expand All @@ -347,8 +347,7 @@ function decodeMessageHeader(dataResponse: DataReader) {
throw new Error(`invalid composite header: ${type}`)
}

// next, the composite type is encoded as an AMQP uint8
dataResponse.readUInt64()
decodeAmqpValue(dataResponse)

const formatCode = dataResponse.readUInt8()
const headerLength = decodeFormatCode(dataResponse, formatCode)
Expand All @@ -365,8 +364,7 @@ function decodeApplicationData(dataResponse: DataReader) {

function decodeAmqpValue(dataResponse: DataReader) {
const amqpFormatCode = dataResponse.readUInt8()
dataResponse.rewind(1)
return decodeFormatCode(dataResponse, amqpFormatCode, true) as string
return decodeFormatCode(dataResponse, amqpFormatCode) as string
}

function readFormatCodeType(dataResponse: DataReader) {
Expand All @@ -379,13 +377,12 @@ function readFormatCodeType(dataResponse: DataReader) {
export function readUTF8String(dataResponse: DataReader) {
const formatCode = dataResponse.readUInt8()
const decodedString = decodeFormatCode(dataResponse, formatCode)
if (!decodedString) throw new Error(`invalid formatCode %#02x: ${formatCode}`)
if (!decodedString) throw new Error(`invalid formatCode 0x${formatCode.toString(16)}`)

return decodedString as string
}

export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolean) {
const boolType = dataResponse.readInt8()
export function decodeBooleanType(dataResponse: DataReader, boolType: number) {
switch (boolType) {
case FormatCode.Bool:
const boolValue = dataResponse.readInt8()
Expand All @@ -395,11 +392,11 @@ export function decodeBooleanType(dataResponse: DataReader, defaultValue: boolea
case FormatCode.BoolFalse:
return false
default:
return defaultValue
throw new Error(`Expected boolean format code, got 0x${boolType.toString(16)}`)
}
}

export function decodeFormatCode(dataResponse: DataReader, formatCode: number, skipByte = false) {
export function decodeFormatCode(dataResponse: DataReader, formatCode: number) {
switch (formatCode) {
case FormatCode.Map8:
// Read first empty byte
Expand All @@ -412,7 +409,6 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
case FormatCode.SmallUlong:
return dataResponse.readInt8() // Read a SmallUlong
case FormatCode.Ubyte:
dataResponse.forward(1)
return dataResponse.readUInt8()
case FormatCode.ULong:
return dataResponse.readUInt64() // Read an ULong
Expand All @@ -430,35 +426,30 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
return dataResponse.readUInt32()
case FormatCode.Str8:
case FormatCode.Sym8:
if (skipByte) dataResponse.forward(1)
return dataResponse.readString8()
case FormatCode.Str32:
case FormatCode.Sym32:
if (skipByte) dataResponse.forward(1)
return dataResponse.readString32()
case FormatCode.Uint0:
return 0
case FormatCode.SmallUint:
dataResponse.forward(1) // Skipping formatCode
return dataResponse.readUInt8()
case FormatCode.Uint:
dataResponse.forward(1) // Skipping formatCode
return dataResponse.readUInt32()
case FormatCode.SmallInt:
dataResponse.forward(1) // Skipping formatCode
return dataResponse.readInt8()
case FormatCode.Int:
dataResponse.forward(1) // Skipping formatCode
return dataResponse.readInt32()
case FormatCode.Bool:
case FormatCode.BoolTrue:
case FormatCode.BoolFalse:
return decodeBooleanType(dataResponse, true)
return decodeBooleanType(dataResponse, formatCode)
case FormatCode.Null:
dataResponse.forward(1) // Skipping formatCode
return 0
case FormatCode.ULong0:
return 0
default:
throw new Error(`ReadCompositeHeader Invalid type ${formatCode}`)
throw new Error(`FormatCode Invalid type ${formatCode}`)
}
}

Expand Down
Loading