diff --git a/README.md b/README.md index c6f77f07..a069aa60 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,8 @@ const consumerOptions = { stream: "stream-name", offset: Offset.next() } - Offset.offset(x) ---> Start reading from the specified offset. The parameter has to be a bigint. - Offset.timestamp(t) ---> Start reading from the messages stored after the timestamp t. + Optionally a consumer identifier can be set in the consumer option. + It's an optional property called consumerTag. */ const consumer = await client.declareConsumer(consumerOptions, (message: Message) => { diff --git a/src/client.ts b/src/client.ts index 2ac42865..bb972d88 100644 --- a/src/client.ts +++ b/src/client.ts @@ -209,6 +209,7 @@ export class Client { stream: params.stream, consumerId, consumerRef: params.consumerRef, + consumerTag: params.consumerTag, offset: params.offset, creditPolicy: params.creditPolicy, }, @@ -503,6 +504,9 @@ export class Client { } properties["match-unfiltered"] = `${params.filter.matchUnfiltered}` } + if (params.consumerTag) { + properties["identifier"] = params.consumerTag + } const creditPolicy = params.creditPolicy || defaultCreditPolicy @@ -774,6 +778,7 @@ export interface DeclareConsumerParams { singleActive?: boolean filter?: ConsumerFilter creditPolicy?: ConsumerCreditPolicy + consumerTag?: string } export interface DeclareSuperStreamConsumerParams { diff --git a/src/consumer.ts b/src/consumer.ts index 1cdbbfe3..0197a3f5 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -25,6 +25,7 @@ export class StreamConsumer implements Consumer { private stream: string public consumerId: number public consumerRef?: string + public consumerTag?: string public offset: Offset private clientLocalOffset: Offset private creditsHandler: ConsumerCreditPolicy @@ -38,6 +39,7 @@ export class StreamConsumer implements Consumer { stream: string consumerId: number consumerRef?: string + consumerTag?: string offset: Offset creditPolicy?: ConsumerCreditPolicy }, diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index cb145df8..f835f74a 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -84,6 +84,24 @@ describe("declare consumer", () => { await eventually(() => expect(messages).eql([Buffer.from("hello")])) }).timeout(10000) + it("declaring a consumer on an existing stream with identifiers", async () => { + const messages: Buffer[] = [] + await publisher.send(Buffer.from("hello")) + + await client.declareConsumer( + { stream: streamName, offset: Offset.first(), consumerTag: "test-id" }, + (message: Message) => { + messages.push(message.content) + } + ) + + await eventually(async () => { + const ids = await rabbit.returnConsumersIdentifiers() + expect(ids).length(1) + expect(ids[0]).eql("test-id") + }) + }).timeout(10000) + it("declaring an async consumer on an existing stream - the consumer should handle the message", async () => { const messages: Buffer[] = [] await publisher.send(Buffer.from("hello")) diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index 0b27a79c..a0c1177d 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -50,9 +50,14 @@ interface RabbitChannelDetails { user: string } +interface RabbitConsumerProperties { + identifier?: string +} + interface RabbitConsumersResponse { queue: RabbitConsumersResponseQueue consumer_tag: string + properties: RabbitConsumerProperties channel_details: RabbitChannelDetails } @@ -218,6 +223,18 @@ export class Rabbit { return resp.body.map((p) => p.consumer_tag) } + async returnConsumersIdentifiers(): Promise { + const resp = await got.get( + `http://${this.firstNode.host}:${this.port}/api/stream/consumers/%2F/`, + { + username: this.username, + password: this.password, + responseType: "json", + } + ) + return resp.body.map((p) => p.properties.identifier ?? "") + } + async returnConsumersCredits(): Promise { const allConsumerCredits: RabbitConsumerCredits[] = [] const allConsumersResp = await got.get(