diff --git a/src/client.ts b/src/client.ts index 4805d676..2197b7f7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -4,7 +4,7 @@ import { Compression, CompressionType, GzipCompression, NoneCompression } from " import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer" import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes" import { Logger, NullLogger } from "./logger" -import { Message, Publisher, StreamPublisher } from "./publisher" +import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher" import { ConsumerUpdateResponse } from "./requests/consumer_update_response" import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request" import { CreditRequest, CreditRequestParams } from "./requests/credit_request" @@ -41,6 +41,7 @@ import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request import { lt, coerce } from "semver" import { ConnectionInfo, Connection, errorMessageOf } from "./connection" import { ConnectionPool } from "./connection_pool" +import { DeliverResponseV2 } from "./responses/deliver_response_v2" export type ConnectionClosedListener = (hadError: boolean) => void @@ -125,7 +126,7 @@ export class Client { return res.streams } - public async declarePublisher(params: DeclarePublisherParams): Promise { + public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { const { stream, publisherRef } = params const publisherId = this.incPublisherId() @@ -137,16 +138,22 @@ export class Client { await connection.close() throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - const publisher = new StreamPublisher({ - connection: connection, - stream: params.stream, - publisherId: publisherId, - publisherRef: params.publisherRef, - boot: params.boot, - maxFrameSize: this.maxFrameSize, - maxChunkLength: params.maxChunkLength, - logger: this.logger, - }) + if (filter && !connection.isFilteringEnabled) { + throw new Error(`Broker does not support message filtering.`) + } + const publisher = new StreamPublisher( + { + connection: connection, + stream: params.stream, + publisherId: publisherId, + publisherRef: params.publisherRef, + boot: params.boot, + maxFrameSize: this.maxFrameSize, + maxChunkLength: params.maxChunkLength, + logger: this.logger, + }, + filter + ) this.publishers.set(publisherId, { publisher: publisher, connection: connection }) this.logger.info( `New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` @@ -410,14 +417,28 @@ export class Client { return consumerId } - private getDeliverCallback() { + private getDeliverV1Callback() { return async (response: DeliverResponse) => { const consumer = this.consumers.get(response.subscriptionId) if (!consumer) { - this.logger.error(`On deliver no consumer found`) + this.logger.error(`On deliverV1 no consumer found`) + return + } + this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`) + this.logger.debug(`response.messages.length: ${response.messages.length}`) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) + response.messages.map((x) => consumer.handle(x)) + } + } + + private getDeliverV2Callback() { + return async (response: DeliverResponseV2) => { + const consumer = this.consumers.get(response.subscriptionId) + if (!consumer) { + this.logger.error(`On deliverV2 no consumer found`) return } - this.logger.debug(`on deliver -> ${consumer.consumerRef}`) + this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`) this.logger.debug(`response.messages.length: ${response.messages.length}`) await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) response.messages.map((x) => consumer.handle(x)) @@ -493,7 +514,8 @@ export class Client { const connectionListeners = { ...this.params.listeners, connection_closed: connectionClosedListener, - deliver: this.getDeliverCallback(), + deliverV1: this.getDeliverV1Callback(), + deliverV2: this.getDeliverV2Callback(), consumer_update_query: this.getConsumerUpdateCallback(), } return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName } diff --git a/src/connection.ts b/src/connection.ts index ab4299c6..343061ef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -16,6 +16,7 @@ import { TuneRequest } from "./requests/tune_request" import { ConsumerUpdateQueryListener, DeliverListener, + DeliverV2Listener, MetadataUpdateListener, PublishConfirmListener, PublishErrorListener, @@ -29,7 +30,7 @@ import { Response } from "./responses/response" import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response" import { SaslHandshakeResponse } from "./responses/sasl_handshake_response" import { TuneResponse } from "./responses/tune_response" -import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util" +import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, removeFrom } from "./util" import { Version, checkServerDeclaredVersions, getClientSupportedVersions } from "./versions" import { WaitingResponse } from "./waiting_response" import { ClientListenersParams, ClientParams, ClosingParams, QueryOffsetParams, StoreOffsetParams } from "./client" @@ -38,11 +39,13 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request" import { StoreOffsetRequest } from "./requests/store_offset_request" import { QueryOffsetResponse } from "./responses/query_offset_response" import { QueryOffsetRequest } from "./requests/query_offset_request" +import { coerce, lt } from "semver" export type ConnectionClosedListener = (hadError: boolean) => void export type ConnectionProxyListenersParams = ClientListenersParams & { - deliver?: DeliverListener + deliverV1?: DeliverListener + deliverV2?: DeliverV2Listener consumer_update_query?: ConsumerUpdateQueryListener } @@ -82,6 +85,7 @@ export class Connection { private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 } private readonly serverDeclaredVersions: Version[] = [] private refs: number = 0 + private filteringEnabled: boolean = false constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) { this.hostname = params.hostname @@ -125,6 +129,7 @@ export class Connection { this.socket.on("connect", async () => { this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`) this.peerProperties = (await this.exchangeProperties()).properties + this.filteringEnabled = lt(coerce(this.rabbitManagementVersion)!, REQUIRED_MANAGEMENT_VERSION) ? false : true await this.auth({ username: this.params.username, password: this.params.password }) const { heartbeat } = await this.tune(this.params.heartbeat ?? 0) await this.open({ virtualHost: this.params.vhost }) @@ -151,15 +156,23 @@ export class Connection { public on(event: "metadata_update", listener: MetadataUpdateListener): void public on(event: "publish_confirm", listener: PublishConfirmListener): void public on(event: "publish_error", listener: PublishErrorListener): void - public on(event: "deliver", listener: DeliverListener): void + public on(event: "deliverV1", listener: DeliverListener): void + public on(event: "deliverV2", listener: DeliverV2Listener): void public on(event: "consumer_update_query", listener: ConsumerUpdateQueryListener): void public on( - event: "metadata_update" | "publish_confirm" | "publish_error" | "deliver" | "consumer_update_query", + event: + | "metadata_update" + | "publish_confirm" + | "publish_error" + | "deliverV1" + | "deliverV2" + | "consumer_update_query", listener: | MetadataUpdateListener | PublishConfirmListener | PublishErrorListener | DeliverListener + | DeliverV2Listener | ConsumerUpdateQueryListener ) { switch (event) { @@ -172,8 +185,11 @@ export class Connection { case "publish_error": this.decoder.on("publish_error", listener as PublishErrorListener) break - case "deliver": - this.decoder.on("deliver", listener as DeliverListener) + case "deliverV1": + this.decoder.on("deliverV1", listener as DeliverListener) + break + case "deliverV2": + this.decoder.on("deliverV2", listener as DeliverV2Listener) break case "consumer_update_query": this.decoder.on("consumer_update_query", listener as ConsumerUpdateQueryListener) @@ -187,7 +203,8 @@ export class Connection { if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update) if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm) if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error) - if (listeners?.deliver) this.decoder.on("deliver", listeners.deliver) + if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1) + if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2) if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query) } @@ -332,6 +349,10 @@ export class Connection { return this.peerProperties.version } + public get isFilteringEnabled() { + return this.filteringEnabled + } + private async auth(params: { username: string; password: string }) { this.logger.debug(`Start authentication process ...`) this.logger.debug(`Start SASL handshake ...`) diff --git a/src/publisher.ts b/src/publisher.ts index fbad5e19..b6f08355 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -11,6 +11,7 @@ import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" import { MetadataUpdateListener } from "./response_decoder" import { ConnectionInfo, Connection } from "./connection" import { ConnectionPool } from "./connection_pool" +import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record @@ -70,6 +71,7 @@ export interface Publisher { readonly publisherId: number } +export type FilterFunc = (msg: Message) => string | undefined type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void export class StreamPublisher implements Publisher { private connection: Connection @@ -85,16 +87,19 @@ export class StreamPublisher implements Publisher { private maxChunkLength: number private closed = false - constructor(params: { - connection: Connection - stream: string - publisherId: number - publisherRef?: string - boot?: boolean - maxFrameSize: number - maxChunkLength?: number - logger: Logger - }) { + constructor( + params: { + connection: Connection + stream: string + publisherId: number + publisherRef?: string + boot?: boolean + maxFrameSize: number + maxChunkLength?: number + logger: Logger + }, + private readonly filter?: FilterFunc + ) { this.connection = params.connection this.stream = params.stream this.publisherId = params.publisherId @@ -187,6 +192,12 @@ export class StreamPublisher implements Publisher { } private async enqueue(publishRequestMessage: PublishRequestMessage) { + if (this.filter) { + publishRequestMessage.filterValue = this.filter(publishRequestMessage.message) + } + if (!this.connection.isFilteringEnabled && this.filter) { + throw new Error(`Your rabbit server management version does not support filtering.`) + } this.checkMessageSize(publishRequestMessage) const sendCycleNeeded = this.add(publishRequestMessage) let sent = false @@ -211,12 +222,19 @@ export class StreamPublisher implements Publisher { private async sendBuffer() { const chunk = this.popChunk() if (chunk.length > 0) { - await this.connection.send( - new PublishRequest({ - publisherId: this.publisherId, - messages: chunk, - }) - ) + this.filter + ? await this.connection.send( + new PublishRequestV2({ + publisherId: this.publisherId, + messages: chunk, + }) + ) + : await this.connection.send( + new PublishRequest({ + publisherId: this.publisherId, + messages: chunk, + }) + ) } } diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index ae248352..b2adb430 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -44,6 +44,12 @@ export class BufferDataWriter implements DataWriter { this._offset = this.buffer.writeInt8(data, this._offset) } + writeInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data, this._offset) + } + writeUInt8(data: number): void { const bytes = 1 this.growIfNeeded(bytes) @@ -113,7 +119,9 @@ export class BufferDataWriter implements DataWriter { export abstract class AbstractRequest implements Request { abstract get key(): number abstract get responseKey(): number - readonly version = 1 + get version() { + return 1 + } toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer { const initialSize = bufferSizeParams?.initialSize ?? 65536 diff --git a/src/requests/buffer_data_writer.ts b/src/requests/buffer_data_writer.ts new file mode 100644 index 00000000..556871f8 --- /dev/null +++ b/src/requests/buffer_data_writer.ts @@ -0,0 +1,118 @@ +import { DEFAULT_UNLIMITED_FRAME_MAX } from "../util" +import { DataWriter } from "./data_writer" +import { BufferSizeParams } from "./request" + +export class BufferDataWriter implements DataWriter { + private _offset = 0 + private readonly maxBufferSize: number + private readonly growthTriggerRatio: number + private readonly sizeMultiplier: number + + constructor(private buffer: Buffer, startFrom: number, bufferSizeParameters?: BufferSizeParams) { + this._offset = startFrom + this.maxBufferSize = bufferSizeParameters?.maxSize ?? 1048576 + this.growthTriggerRatio = bufferSizeParameters?.maxRatio ?? 0.9 + this.sizeMultiplier = bufferSizeParameters?.multiplier ?? 2 + } + + get offset() { + return this._offset + } + + writePrefixSize() { + this.buffer.writeUInt32BE(this._offset - 4, 0) + } + + writeData(data: string | Buffer): void { + this.growIfNeeded(Buffer.byteLength(data, "utf-8")) + if (Buffer.isBuffer(data)) { + this._offset += data.copy(this.buffer, this._offset) + return + } + this._offset += this.buffer.write(data, this._offset) + } + + writeByte(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeInt8(data: number) { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt8(data, this._offset) + } + + writeUInt8(data: number): void { + const bytes = 1 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt8(data, this._offset) + } + + writeInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data, this._offset) + } + + writeUInt16(data: number) { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt16BE(data, this._offset) + } + + writeUInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeUInt32BE(data, this._offset) + } + + writeInt32(data: number): void { + const bytes = 4 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt32BE(data, this._offset) + } + + writeUInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigUInt64BE(data, this._offset) + } + + writeInt64(data: bigint): void { + const bytes = 8 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeBigInt64BE(data, this._offset) + } + + writeString(data: string): void { + const bytes = 2 + this.growIfNeeded(bytes) + this._offset = this.buffer.writeInt16BE(data.length, this._offset) + this.writeData(data) + } + + toBuffer(): Buffer { + return this.buffer.slice(0, this._offset) + } + + private growIfNeeded(additionalBytes: number) { + if ((this._offset + additionalBytes) / this.buffer.length > this.growthTriggerRatio) { + this.growBuffer(additionalBytes) + } + } + + private growBuffer(requiredBytes: number) { + const newSize = this.getNewSize(requiredBytes) + const data = Buffer.from(this.buffer) + this.buffer = Buffer.alloc(newSize) + data.copy(this.buffer, 0) + } + + private getNewSize(requiredBytes: number) { + const requiredNewSize = this.buffer.length * this.sizeMultiplier + this._offset + requiredBytes + if (this.maxBufferSize === DEFAULT_UNLIMITED_FRAME_MAX) return requiredNewSize + return Math.min(requiredNewSize, this.maxBufferSize) + } +} diff --git a/src/requests/data_writer.ts b/src/requests/data_writer.ts index 22474fc4..a7a46afb 100644 --- a/src/requests/data_writer.ts +++ b/src/requests/data_writer.ts @@ -1,6 +1,7 @@ export interface DataWriter { writeByte(Described: number): void writeInt8(data: number): void + writeInt16(data: number): void writeUInt8(data: number): void writeUInt16(data: number): void writeUInt32(data: number): void diff --git a/src/requests/publish_request.ts b/src/requests/publish_request.ts index da39f44d..8bfe0f4b 100644 --- a/src/requests/publish_request.ts +++ b/src/requests/publish_request.ts @@ -5,6 +5,7 @@ import { DataWriter } from "./data_writer" export type PublishRequestMessage = { publishingId: bigint + filterValue?: string message: Message } diff --git a/src/requests/publish_request_v2.ts b/src/requests/publish_request_v2.ts new file mode 100644 index 00000000..b6158de5 --- /dev/null +++ b/src/requests/publish_request_v2.ts @@ -0,0 +1,34 @@ +import { amqpEncode } from "../amqp10/encoder" +import { AbstractRequest } from "./abstract_request" +import { DataWriter } from "./data_writer" +import { PublishRequestMessage } from "./publish_request" + +interface PublishRequestParams { + publisherId: number + messages: Array +} + +export class PublishRequestV2 extends AbstractRequest { + static readonly Key = 0x02 + static readonly Version = 2 + readonly key = PublishRequestV2.Key + readonly responseKey = -1 + + constructor(private params: PublishRequestParams) { + super() + } + + protected writeContent(writer: DataWriter): void { + writer.writeUInt8(this.params.publisherId) + writer.writeUInt32(this.params.messages.length) + this.params.messages.forEach(({ publishingId, filterValue, message }) => { + writer.writeUInt64(publishingId) + filterValue ? writer.writeString(filterValue) : writer.writeInt16(-1) + amqpEncode(writer, message) + }) + } + + get version(): number { + return PublishRequestV2.Version + } +} diff --git a/src/requests/request.ts b/src/requests/request.ts index 3b8ebbd1..93f8f447 100644 --- a/src/requests/request.ts +++ b/src/requests/request.ts @@ -10,5 +10,5 @@ export interface Request { toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer readonly responseKey: number readonly key: number - readonly version: 1 + readonly version: number } diff --git a/src/requests/requests.ts b/src/requests/requests.ts index bd0fa27f..1596bab4 100644 --- a/src/requests/requests.ts +++ b/src/requests/requests.ts @@ -13,6 +13,7 @@ export { MetadataUpdateRequest } from "./metadata_update_request" export { OpenRequest } from "./open_request" export { PeerPropertiesRequest } from "./peer_properties_request" export { PublishRequest } from "./publish_request" +export { PublishRequestV2 } from "./publish_request_v2" export { QueryOffsetRequest } from "./query_offset_request" export { QueryPublisherRequest } from "./query_publisher_request" export { SaslAuthenticateRequest } from "./sasl_authenticate_request" diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 20429b9c..b438b5c4 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -36,6 +36,7 @@ import { RawConsumerUpdateQueryResponse as RawConsumerUpdateQuery, RawCreditResponse, RawDeliverResponse, + RawDeliverResponseV2, RawHeartbeatResponse, RawMetadataUpdateResponse, RawPublishConfirmResponse, @@ -57,6 +58,7 @@ import { PartitionsResponse } from "./responses/partitions_response" import { ConsumerUpdateQuery } from "./responses/consumer_update_query" import { CreateSuperStreamResponse } from "./responses/create_super_stream_response" import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response" +import { DeliverResponseV2 } from "./responses/deliver_response_v2" // Frame => Size (Request | Response | Command) // Size => uint32 (size without the 4 bytes of the size element) @@ -71,12 +73,14 @@ const UINT32_SIZE = 4 export type MetadataUpdateListener = (metadata: MetadataUpdateResponse) => void export type CreditListener = (creditResponse: CreditResponse) => void export type DeliverListener = (response: DeliverResponse) => void +export type DeliverV2Listener = (response: DeliverResponseV2) => void export type PublishConfirmListener = (confirm: PublishConfirmResponse) => void export type PublishErrorListener = (confirm: PublishErrorResponse) => void export type ConsumerUpdateQueryListener = (metadata: ConsumerUpdateQuery) => void type DeliveryResponseDecoded = { subscriptionId: number + committedChunkId?: bigint messages: Message[] } @@ -86,6 +90,7 @@ type PossibleRawResponses = | RawHeartbeatResponse | RawMetadataUpdateResponse | RawDeliverResponse + | RawDeliverResponseV2 | RawCreditResponse | RawPublishConfirmResponse | RawPublishErrorResponse @@ -116,15 +121,20 @@ function decodeResponse( const key = dataResponse.readUInt16() const version = dataResponse.readUInt16() if (key === DeliverResponse.key) { - const { subscriptionId, messages } = decodeDeliverResponse(dataResponse, getCompressionBy, logger) - const response: RawDeliverResponse = { + const { subscriptionId, committedChunkId, messages } = decodeDeliverResponse( + dataResponse, + getCompressionBy, + logger, + version + ) + return { size, key: key as DeliverResponse["key"], version, subscriptionId, + committedChunkId, messages, } - return response } if (key === TuneResponse.key) { const frameMax = dataResponse.readUInt32() @@ -193,9 +203,11 @@ function decodeResponse( function decodeDeliverResponse( dataResponse: DataReader, getCompressionBy: (type: CompressionType) => Compression, - logger: Logger + logger: Logger, + version = 1 ): DeliveryResponseDecoded { const subscriptionId = dataResponse.readUInt8() + const committedChunkId = version === 2 ? dataResponse.readUInt64() : undefined const magicVersion = dataResponse.readInt8() const chunkType = dataResponse.readInt8() const numEntries = dataResponse.readUInt16() @@ -211,6 +223,7 @@ function decodeDeliverResponse( const messages: Message[] = [] const data = { + committedChunkId, magicVersion, chunkType, numEntries, @@ -237,7 +250,7 @@ function decodeDeliverResponse( messages.push(...decodeSubEntries(dataResponse, compression, logger)) } - return { subscriptionId, messages } + return { subscriptionId, committedChunkId, messages } } const EmptyBuffer = Buffer.from("") @@ -581,8 +594,12 @@ function isMetadataUpdateResponse(params: PossibleRawResponses): params is RawMe return params.key === MetadataUpdateResponse.key } -function isDeliverResponse(params: PossibleRawResponses): params is RawDeliverResponse { - return params.key === DeliverResponse.key +function isDeliverResponseV1(params: PossibleRawResponses): params is RawDeliverResponse { + return params.key === DeliverResponse.key && params.version === DeliverResponse.Version +} + +function isDeliverResponseV2(params: PossibleRawResponses): params is RawDeliverResponseV2 { + return params.key === DeliverResponseV2.key && params.version === DeliverResponseV2.Version } function isCreditResponse(params: PossibleRawResponses): params is RawCreditResponse { @@ -649,9 +666,12 @@ export class ResponseDecoder { } else if (isMetadataUpdateResponse(response)) { this.emitter.emit("metadata_update", new MetadataUpdateResponse(response)) this.logger.debug(`metadata update received from the server: ${inspect(response)}`) - } else if (isDeliverResponse(response)) { - this.emitter.emit("deliver", new DeliverResponse(response)) - this.logger.debug(`deliver received from the server: ${inspect(response)}`) + } else if (isDeliverResponseV1(response)) { + this.emitter.emit("deliverV1", new DeliverResponse(response)) + this.logger.debug(`deliverV1 received from the server: ${inspect(response)}`) + } else if (isDeliverResponseV2(response)) { + this.emitter.emit("deliverV2", new DeliverResponseV2(response)) + this.logger.debug(`deliverV2 received from the server: ${inspect(response)}`) } else if (isCreditResponse(response)) { this.logger.debug(`credit received from the server: ${inspect(response)}`) this.emitter.emit("credit_response", new CreditResponse(response)) @@ -672,18 +692,21 @@ export class ResponseDecoder { public on(event: "credit_response", listener: CreditListener): void public on(event: "publish_confirm", listener: PublishConfirmListener): void public on(event: "publish_error", listener: PublishErrorListener): void - public on(event: "deliver", listener: DeliverListener): void + public on(event: "deliverV1", listener: DeliverListener): void + public on(event: "deliverV2", listener: DeliverV2Listener): void public on( event: | "metadata_update" | "credit_response" | "publish_confirm" | "publish_error" - | "deliver" + | "deliverV1" + | "deliverV2" | "consumer_update_query", listener: | MetadataUpdateListener | DeliverListener + | DeliverV2Listener | CreditListener | PublishConfirmListener | PublishErrorListener diff --git a/src/responses/consumer_update_query.ts b/src/responses/consumer_update_query.ts index 9d64770e..80e8cc0b 100644 --- a/src/responses/consumer_update_query.ts +++ b/src/responses/consumer_update_query.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawConsumerUpdateQueryResponse as RawConsumerUpdateQuery } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/credit_response.ts b/src/responses/credit_response.ts index 444457d4..28ce1adf 100644 --- a/src/responses/credit_response.ts +++ b/src/responses/credit_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawCreditResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/deliver_response.ts b/src/responses/deliver_response.ts index 6b7840e5..234370f6 100755 --- a/src/responses/deliver_response.ts +++ b/src/responses/deliver_response.ts @@ -1,5 +1,5 @@ import { Message } from "../publisher" -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawDeliverResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/deliver_response_v2.ts b/src/responses/deliver_response_v2.ts new file mode 100755 index 00000000..1afc41bd --- /dev/null +++ b/src/responses/deliver_response_v2.ts @@ -0,0 +1,55 @@ +import { Message } from "../publisher" +import { BufferDataWriter } from "../requests/buffer_data_writer" +import { RawDeliverResponseV2 } from "./raw_response" +import { Response } from "./response" + +export class DeliverResponseV2 implements Response { + static key = 0x0008 + static readonly Version = 2 + + constructor(private response: RawDeliverResponseV2) { + if (this.response.key !== DeliverResponseV2.key) { + throw new Error(`Unable to create ${DeliverResponseV2.name} from data of type ${this.response.key}`) + } + } + + toBuffer(): Buffer { + const bufferSize = 1024 + const bufferSizeParams = { maxSize: bufferSize } + const dw = new BufferDataWriter(Buffer.alloc(bufferSize), 4, bufferSizeParams) + dw.writeUInt16(DeliverResponseV2.key) + dw.writeUInt16(2) + dw.writeUInt8(this.response.subscriptionId) + dw.writeUInt64(this.response.committedChunkId) + dw.writePrefixSize() + return dw.toBuffer() + } + + get key() { + return this.response.key + } + + get correlationId(): number { + return -1 + } + + get code(): number { + return -1 + } + + get ok(): boolean { + return true + } + + get subscriptionId(): number { + return this.response.subscriptionId + } + + get committedChunkId(): bigint { + return this.response.committedChunkId + } + + get messages(): Message[] { + return this.response.messages + } +} diff --git a/src/responses/metadata_update_response.ts b/src/responses/metadata_update_response.ts index 9a005bbd..a122ea1f 100644 --- a/src/responses/metadata_update_response.ts +++ b/src/responses/metadata_update_response.ts @@ -1,5 +1,5 @@ +import { BufferDataWriter } from "../requests/buffer_data_writer" import { MetadataInfo, RawMetadataUpdateResponse } from "./raw_response" -import { BufferDataWriter } from "../requests/abstract_request" import { Response } from "./response" export class MetadataUpdateResponse implements Response { diff --git a/src/responses/publish_confirm_response.ts b/src/responses/publish_confirm_response.ts index a1f25364..a5164db5 100644 --- a/src/responses/publish_confirm_response.ts +++ b/src/responses/publish_confirm_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawPublishConfirmResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/publish_error_response.ts b/src/responses/publish_error_response.ts index fd196963..56928feb 100644 --- a/src/responses/publish_error_response.ts +++ b/src/responses/publish_error_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawPublishErrorResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/responses/raw_response.ts b/src/responses/raw_response.ts index 5d13a632..ca6c1f73 100644 --- a/src/responses/raw_response.ts +++ b/src/responses/raw_response.ts @@ -55,6 +55,15 @@ export interface RawDeliverResponse { messages: Message[] } +export interface RawDeliverResponseV2 { + size: number + key: 0x0008 + version: number + subscriptionId: number + committedChunkId: bigint + messages: Message[] +} + export interface RawMetadataUpdateResponse { size: number key: 0x0010 diff --git a/src/responses/responses.ts b/src/responses/responses.ts index 2efefa79..36f001f2 100644 --- a/src/responses/responses.ts +++ b/src/responses/responses.ts @@ -7,6 +7,7 @@ export { DeletePublisherResponse } from "./delete_publisher_response" export { DeleteStreamResponse } from "./delete_stream_response" export { DeleteSuperStreamResponse } from "./delete_super_stream_response" export { DeliverResponse } from "./deliver_response" +export { DeliverResponseV2 } from "./deliver_response_v2" export { ExchangeCommandVersionsResponse } from "./exchange_command_versions_response" export { HeartbeatResponse } from "./heartbeat_response" export { MetadataResponse } from "./metadata_response" diff --git a/src/responses/tune_response.ts b/src/responses/tune_response.ts index b544415d..9342ff00 100644 --- a/src/responses/tune_response.ts +++ b/src/responses/tune_response.ts @@ -1,4 +1,4 @@ -import { BufferDataWriter } from "../requests/abstract_request" +import { BufferDataWriter } from "../requests/buffer_data_writer" import { RawTuneResponse } from "./raw_response" import { Response } from "./response" diff --git a/src/versions.ts b/src/versions.ts index 689f95fe..a499a81b 100644 --- a/src/versions.ts +++ b/src/versions.ts @@ -25,6 +25,7 @@ const supportedRequests = [ requests.OpenRequest, requests.PeerPropertiesRequest, requests.PublishRequest, + requests.PublishRequestV2, requests.QueryOffsetRequest, requests.QueryPublisherRequest, requests.SaslAuthenticateRequest, @@ -40,6 +41,7 @@ const supportedRequests = [ const supportedResponses = [ responses.DeliverResponse, + responses.DeliverResponseV2, responses.PublishConfirmResponse, responses.PublishErrorResponse, responses.ConsumerUpdateQuery, @@ -76,9 +78,15 @@ export function getClientSupportedVersions(serverVersion?: string) { } if (serverVersion && lt(coerce(serverVersion)!, REQUIRED_MANAGEMENT_VERSION)) { - return result.filter( - (r) => r.key !== requests.CreateSuperStreamRequest.Key && r.key !== requests.DeleteSuperStreamRequest.Key + const filteredResult = result.filter( + (r) => ![requests.CreateSuperStreamRequest.Key, requests.DeleteSuperStreamRequest.Key].includes(r.key) ) + return filteredResult.map((r) => { + if (r.key === requests.PublishRequest.Key || r.key === responses.DeliverResponse.key) { + return { key: r.key, minVersion: r.minVersion, maxVersion: 1 } + } + return r + }) } return result diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts new file mode 100644 index 00000000..8d02b15f --- /dev/null +++ b/test/e2e/filtering.test.ts @@ -0,0 +1,50 @@ +import { expect } from "chai" +import { randomUUID } from "crypto" +import { Client } from "../../src" +import { createClient, createStreamName } from "../support/fake_data" +import { Rabbit } from "../support/rabbit" +import { eventually, username, password } from "../support/util" +import { coerce, lt } from "semver" + +describe("filtering", () => { + const rabbit = new Rabbit(username, password) + let client: Client + let streamName: string + + beforeEach(async function () { + client = await createClient(username, password) + // eslint-disable-next-line no-invalid-this + if (lt(coerce(client.rabbitManagementVersion)!, "3.13.0")) this.skip() + streamName = createStreamName() + await client.createStream({ stream: streamName, arguments: {} }) + }) + + afterEach(async () => { + try { + await client.close() + await client.deleteStream({ stream: streamName }) + await rabbit.closeAllConnections() + await rabbit.deleteAllQueues({ match: /my-stream-/ }) + } catch (e) {} + }) + + it("can publish with filter value", async () => { + const publisher = await client.declarePublisher( + { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, + (msg) => msg.applicationProperties!["test"].toString() + ) + const message1 = "test1" + const message2 = "test2" + const message3 = "test3" + const applicationProperties1 = { test: "A" } + const applicationProperties2 = { test: "B" } + + await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 }) + await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 }) + await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 }) + + await eventually(async () => { + expect((await rabbit.getQueueInfo(streamName)).messages).eql(3) + }, 10000) + }).timeout(10000) +}) diff --git a/test/unit/buffer_data_writer.test.ts b/test/unit/buffer_data_writer.test.ts index d958e943..37e6e34b 100644 --- a/test/unit/buffer_data_writer.test.ts +++ b/test/unit/buffer_data_writer.test.ts @@ -1,6 +1,6 @@ import { expect } from "chai" -import { BufferDataWriter } from "../../src/requests/abstract_request" import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX } from "../../src/util" +import { BufferDataWriter } from "../../src/requests/buffer_data_writer" describe("Buffer Data Writer functionalities", () => { const bufferMaxSize = 1024 const bufferInitialSize = 1 diff --git a/test/unit/response_decoder.test.ts b/test/unit/response_decoder.test.ts index 99f3d1b1..a916382b 100644 --- a/test/unit/response_decoder.test.ts +++ b/test/unit/response_decoder.test.ts @@ -1,11 +1,11 @@ import { expect } from "chai" import { NoneCompression } from "../../src/compression" import { DecoderListenerFunc } from "../../src/decoder_listener" -import { BufferDataWriter } from "../../src/requests/abstract_request" import { ResponseDecoder } from "../../src/response_decoder" import { PeerPropertiesResponse } from "../../src/responses/peer_properties_response" import { Response } from "../../src/responses/response" import { createConsoleLog } from "../support/util" +import { BufferDataWriter } from "../../src/requests/buffer_data_writer" class MockDecoderListener { readonly responses: Response[] = [] diff --git a/test/unit/versions.test.ts b/test/unit/versions.test.ts index 4277385b..d65ea7a9 100644 --- a/test/unit/versions.test.ts +++ b/test/unit/versions.test.ts @@ -21,7 +21,7 @@ describe("Versions", () => { { key: 16, maxVersion: 1, minVersion: 1 }, { key: 21, maxVersion: 1, minVersion: 1 }, { key: 17, maxVersion: 1, minVersion: 1 }, - { key: 2, maxVersion: 1, minVersion: 1 }, + { key: 2, maxVersion: 2, minVersion: 1 }, { key: 11, maxVersion: 1, minVersion: 1 }, { key: 5, maxVersion: 1, minVersion: 1 }, { key: 19, maxVersion: 1, minVersion: 1 }, @@ -33,7 +33,7 @@ describe("Versions", () => { { key: 12, maxVersion: 1, minVersion: 1 }, { key: 24, maxVersion: 1, minVersion: 1 }, { key: 25, maxVersion: 1, minVersion: 1 }, - { key: 8, maxVersion: 1, minVersion: 1 }, + { key: 8, maxVersion: 2, minVersion: 1 }, { key: 3, maxVersion: 1, minVersion: 1 }, { key: 4, maxVersion: 1, minVersion: 1 }, { key: 26, maxVersion: 1, minVersion: 1 },