diff --git a/src/client.ts b/src/client.ts index 50d599c2..f0042d3c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -244,7 +244,7 @@ export class Client { } public async declareSuperStreamConsumer( - { superStream, offset, consumerRef }: DeclareSuperStreamConsumerParams, + { superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams, handle: ConsumerFunc ): Promise { const partitions = await this.queryPartitions({ superStream }) @@ -254,6 +254,7 @@ export class Client { consumerRef: consumerRef || `${superStream}-${randomUUID()}`, offset: offset || Offset.first(), partitions, + creditPolicy, }) } @@ -774,6 +775,7 @@ export interface DeclareSuperStreamConsumerParams { superStream: string consumerRef?: string offset?: Offset + creditPolicy?: ConsumerCreditPolicy } export interface SubscribeParams { diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index a8213739..46b375d2 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -1,5 +1,6 @@ import { Client } from "./client" import { Consumer, ConsumerFunc } from "./consumer" +import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy" import { Offset } from "./requests/subscribe_request" export class SuperStreamConsumer { @@ -9,6 +10,7 @@ export class SuperStreamConsumer { private locator: Client private partitions: string[] private offset: Offset + private creditPolicy: ConsumerCreditPolicy private constructor( readonly handle: ConsumerFunc, @@ -18,6 +20,7 @@ export class SuperStreamConsumer { partitions: string[] consumerRef: string offset: Offset + creditPolicy?: ConsumerCreditPolicy } ) { this.superStream = params.superStream @@ -25,13 +28,20 @@ export class SuperStreamConsumer { this.locator = params.locator this.partitions = params.partitions this.offset = params.offset + this.creditPolicy = params.creditPolicy || defaultCreditPolicy } async start(): Promise { await Promise.all( this.partitions.map(async (p) => { const partitionConsumer = await this.locator.declareConsumer( - { stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true }, + { + stream: p, + consumerRef: this.consumerRef, + offset: this.offset, + singleActive: true, + creditPolicy: this.creditPolicy, + }, this.handle, this ) @@ -49,6 +59,7 @@ export class SuperStreamConsumer { partitions: string[] consumerRef: string offset: Offset + creditPolicy?: ConsumerCreditPolicy } ): Promise { const superStreamConsumer = new SuperStreamConsumer(handle, params) diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts index bac8bf4f..806fe8ce 100644 --- a/test/e2e/superstream_consumer.test.ts +++ b/test/e2e/superstream_consumer.test.ts @@ -4,8 +4,9 @@ import { Message, MessageOptions } from "../../src/publisher" import { range } from "../../src/util" import { createClient, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" -import { eventually, password, username } from "../support/util" +import { eventually, password, username, waitSleeping } from "../support/util" import { randomUUID } from "crypto" +import { creditsOnChunkCompleted } from "../../src/consumer_credit_policy" describe("super stream consumer", () => { let superStreamName: string @@ -52,6 +53,22 @@ describe("super stream consumer", () => { }) }) + it("declaring an async super stream consumer on an existing super stream - no error is thrown", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, async (_message: Message) => { + await waitSleeping(10) + return + }) + }) + + it("declaring a super stream consumer with a custom credit policy - no error is thrown", async () => { + await client.declareSuperStreamConsumer( + { superStream: superStreamName, creditPolicy: creditsOnChunkCompleted(2, 1) }, + (_message: Message) => { + return + } + ) + }) + it("declaring a super stream consumer on an existing super stream - read a message", async () => { await sender(1) const messages: Message[] = []