Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ const consumerOptions = { stream: "stream-name", offset: Offset.next() }
- Offset.offset(x) ---> Start reading from the specified offset. The parameter has to be a bigint.
- Offset.timestamp(t) ---> Start reading from the messages stored after the timestamp t.

Optionally a consumer identifier can be set in the consumer option.
It's an optional property called consumerTag.
*/

const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {
Expand Down
5 changes: 5 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export class Client {
stream: params.stream,
consumerId,
consumerRef: params.consumerRef,
consumerTag: params.consumerTag,
offset: params.offset,
creditPolicy: params.creditPolicy,
},
Expand Down Expand Up @@ -503,6 +504,9 @@ export class Client {
}
properties["match-unfiltered"] = `${params.filter.matchUnfiltered}`
}
if (params.consumerTag) {
properties["identifier"] = params.consumerTag
}

const creditPolicy = params.creditPolicy || defaultCreditPolicy

Expand Down Expand Up @@ -774,6 +778,7 @@ export interface DeclareConsumerParams {
singleActive?: boolean
filter?: ConsumerFilter
creditPolicy?: ConsumerCreditPolicy
consumerTag?: string
}

export interface DeclareSuperStreamConsumerParams {
Expand Down
2 changes: 2 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class StreamConsumer implements Consumer {
private stream: string
public consumerId: number
public consumerRef?: string
public consumerTag?: string
public offset: Offset
private clientLocalOffset: Offset
private creditsHandler: ConsumerCreditPolicy
Expand All @@ -38,6 +39,7 @@ export class StreamConsumer implements Consumer {
stream: string
consumerId: number
consumerRef?: string
consumerTag?: string
offset: Offset
creditPolicy?: ConsumerCreditPolicy
},
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ describe("declare consumer", () => {
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
}).timeout(10000)

it("declaring a consumer on an existing stream with identifiers", async () => {
const messages: Buffer[] = []
await publisher.send(Buffer.from("hello"))

await client.declareConsumer(
{ stream: streamName, offset: Offset.first(), consumerTag: "test-id" },
(message: Message) => {
messages.push(message.content)
}
)

await eventually(async () => {
const ids = await rabbit.returnConsumersIdentifiers()
expect(ids).length(1)
expect(ids[0]).eql("test-id")
})
}).timeout(10000)

it("declaring an async consumer on an existing stream - the consumer should handle the message", async () => {
const messages: Buffer[] = []
await publisher.send(Buffer.from("hello"))
Expand Down
17 changes: 17 additions & 0 deletions test/support/rabbit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ interface RabbitChannelDetails {
user: string
}

interface RabbitConsumerProperties {
identifier?: string
}

interface RabbitConsumersResponse {
queue: RabbitConsumersResponseQueue
consumer_tag: string
properties: RabbitConsumerProperties
channel_details: RabbitChannelDetails
}

Expand Down Expand Up @@ -218,6 +223,18 @@ export class Rabbit {
return resp.body.map((p) => p.consumer_tag)
}

async returnConsumersIdentifiers(): Promise<string[]> {
const resp = await got.get<RabbitConsumersResponse[]>(
`http://${this.firstNode.host}:${this.port}/api/stream/consumers/%2F/`,
{
username: this.username,
password: this.password,
responseType: "json",
}
)
return resp.body.map((p) => p.properties.identifier ?? "")
}

async returnConsumersCredits(): Promise<RabbitConsumerCredits[]> {
const allConsumerCredits: RabbitConsumerCredits[] = []
const allConsumersResp = await got.get<RabbitConsumersResponse[]>(
Expand Down