Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class Client {
}

public async declareSuperStreamConsumer(
{ superStream, offset, consumerRef }: DeclareSuperStreamConsumerParams,
{ superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams,
handle: ConsumerFunc
): Promise<SuperStreamConsumer> {
const partitions = await this.queryPartitions({ superStream })
Expand All @@ -254,6 +254,7 @@ export class Client {
consumerRef: consumerRef || `${superStream}-${randomUUID()}`,
offset: offset || Offset.first(),
partitions,
creditPolicy,
})
}

Expand Down Expand Up @@ -774,6 +775,7 @@ export interface DeclareSuperStreamConsumerParams {
superStream: string
consumerRef?: string
offset?: Offset
creditPolicy?: ConsumerCreditPolicy
}

export interface SubscribeParams {
Expand Down
13 changes: 12 additions & 1 deletion src/super_stream_consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Client } from "./client"
import { Consumer, ConsumerFunc } from "./consumer"
import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy"
import { Offset } from "./requests/subscribe_request"

export class SuperStreamConsumer {
Expand All @@ -9,6 +10,7 @@ export class SuperStreamConsumer {
private locator: Client
private partitions: string[]
private offset: Offset
private creditPolicy: ConsumerCreditPolicy

private constructor(
readonly handle: ConsumerFunc,
Expand All @@ -18,20 +20,28 @@ export class SuperStreamConsumer {
partitions: string[]
consumerRef: string
offset: Offset
creditPolicy?: ConsumerCreditPolicy
}
) {
this.superStream = params.superStream
this.consumerRef = params.consumerRef
this.locator = params.locator
this.partitions = params.partitions
this.offset = params.offset
this.creditPolicy = params.creditPolicy || defaultCreditPolicy
}

async start(): Promise<void> {
await Promise.all(
this.partitions.map(async (p) => {
const partitionConsumer = await this.locator.declareConsumer(
{ stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true },
{
stream: p,
consumerRef: this.consumerRef,
offset: this.offset,
singleActive: true,
creditPolicy: this.creditPolicy,
},
this.handle,
this
)
Expand All @@ -49,6 +59,7 @@ export class SuperStreamConsumer {
partitions: string[]
consumerRef: string
offset: Offset
creditPolicy?: ConsumerCreditPolicy
}
): Promise<SuperStreamConsumer> {
const superStreamConsumer = new SuperStreamConsumer(handle, params)
Expand Down
19 changes: 18 additions & 1 deletion test/e2e/superstream_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { Message, MessageOptions } from "../../src/publisher"
import { range } from "../../src/util"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, password, username } from "../support/util"
import { eventually, password, username, waitSleeping } from "../support/util"
import { randomUUID } from "crypto"
import { creditsOnChunkCompleted } from "../../src/consumer_credit_policy"

describe("super stream consumer", () => {
let superStreamName: string
Expand Down Expand Up @@ -52,6 +53,22 @@ describe("super stream consumer", () => {
})
})

it("declaring an async super stream consumer on an existing super stream - no error is thrown", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, async (_message: Message) => {
await waitSleeping(10)
return
})
})

it("declaring a super stream consumer with a custom credit policy - no error is thrown", async () => {
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, creditPolicy: creditsOnChunkCompleted(2, 1) },
(_message: Message) => {
return
}
)
})

it("declaring a super stream consumer on an existing super stream - read a message", async () => {
await sender(1)
const messages: Message[] = []
Expand Down
Loading