Skip to content

Commit ee5c42f

Browse files
committed
feature: configurable policies for consumer credit flow
1 parent 16d26ff commit ee5c42f

File tree

7 files changed

+302
-16
lines changed

7 files changed

+302
-16
lines changed

src/client.ts

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
1111
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
1212
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
1313
import { CreateSuperStreamRequest } from "./requests/create_super_stream_request"
14-
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
14+
import { CreditRequest } from "./requests/credit_request"
1515
import { DeclarePublisherRequest } from "./requests/declare_publisher_request"
1616
import { DeletePublisherRequest } from "./requests/delete_publisher_request"
1717
import { DeleteStreamRequest } from "./requests/delete_stream_request"
@@ -42,6 +42,7 @@ import { UnsubscribeResponse } from "./responses/unsubscribe_response"
4242
import { SuperStreamConsumer } from "./super_stream_consumer"
4343
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
4444
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util"
45+
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"
4546

4647
export type ConnectionClosedListener = (hadError: boolean) => void
4748

@@ -184,7 +185,14 @@ export class Client {
184185

185186
const consumer = new StreamConsumer(
186187
handle,
187-
{ connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset },
188+
{
189+
connection,
190+
stream: params.stream,
191+
consumerId,
192+
consumerRef: params.consumerRef,
193+
offset: params.offset,
194+
creditPolicy: params.creditPolicy,
195+
},
188196
params.filter
189197
)
190198
connection.on("metadata_update", async (metadata) => {
@@ -461,8 +469,15 @@ export class Client {
461469
properties["match-unfiltered"] = `${params.filter.matchUnfiltered}`
462470
}
463471

472+
const creditPolicy = params.creditPolicy || defaultCreditPolicy
473+
464474
const res = await connection.sendAndWait<SubscribeResponse>(
465-
new SubscribeRequest({ ...params, subscriptionId: consumerId, credit: 10, properties: properties })
475+
new SubscribeRequest({
476+
...params,
477+
subscriptionId: consumerId,
478+
credit: creditPolicy.onSubscription(),
479+
properties: properties,
480+
})
466481
)
467482

468483
if (!res.ok) {
@@ -471,8 +486,10 @@ export class Client {
471486
}
472487
}
473488

474-
private askForCredit(params: CreditRequestParams, connection: Connection): Promise<void> {
475-
return connection.send(new CreditRequest({ ...params }))
489+
private askForCredit(subscriptionId: number, connection: Connection): CreditRequestWrapper {
490+
return async (howMany: number) => {
491+
return connection.send(new CreditRequest({ subscriptionId: subscriptionId, credit: howMany }))
492+
}
476493
}
477494

478495
private incPublisherId() {
@@ -499,8 +516,14 @@ export class Client {
499516
}
500517
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
501518
this.logger.debug(`response.messages.length: ${response.messages.length}`)
502-
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
503-
response.messages.map((x) => consumer.handle(x))
519+
const creditRequestWrapper = this.askForCredit(response.subscriptionId, connection)
520+
await consumer.creditPolicy.onChunkReceived(creditRequestWrapper)
521+
const chunkLength = response.messages.length
522+
for (const [idx, message] of response.messages.entries()) {
523+
consumer.handle(message)
524+
await consumer.creditPolicy.onChunkProgress(idx + 1, chunkLength, creditRequestWrapper)
525+
}
526+
await consumer.creditPolicy.onChunkCompleted(creditRequestWrapper)
504527
}
505528
}
506529

@@ -516,12 +539,20 @@ export class Client {
516539
}
517540
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
518541
this.logger.debug(`response.messages.length: ${response.messages.length}`)
519-
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
542+
543+
const creditRequestWrapper = this.askForCredit(response.subscriptionId, connection)
544+
await consumer.creditPolicy.onChunkReceived(creditRequestWrapper)
545+
const chunkLength = response.messages.length
520546
if (consumer.filter) {
521547
response.messages.filter((x) => consumer.filter?.postFilterFunc(x)).map((x) => consumer.handle(x))
522548
return
523549
}
524-
response.messages.map((x) => consumer.handle(x))
550+
551+
for (const [idx, message] of response.messages.entries()) {
552+
consumer.handle(message)
553+
await consumer.creditPolicy.onChunkProgress(idx + 1, chunkLength, creditRequestWrapper)
554+
}
555+
await consumer.creditPolicy.onChunkCompleted(creditRequestWrapper)
525556
}
526557
}
527558

@@ -707,6 +738,7 @@ export interface DeclareConsumerParams {
707738
connectionClosedListener?: ConnectionClosedListener
708739
singleActive?: boolean
709740
filter?: ConsumerFilter
741+
creditPolicy?: ConsumerCreditPolicy
710742
}
711743

712744
export interface DeclareSuperStreamConsumerParams {

src/consumer.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ConsumerFilter } from "./client"
22
import { ConnectionInfo, Connection } from "./connection"
33
import { ConnectionPool } from "./connection_pool"
4+
import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy"
45
import { Message } from "./publisher"
56
import { Offset } from "./requests/subscribe_request"
67

@@ -22,6 +23,7 @@ export class StreamConsumer implements Consumer {
2223
public consumerRef?: string
2324
public offset: Offset
2425
private clientLocalOffset: Offset
26+
private creditsHandler: ConsumerCreditPolicy
2527
readonly handle: ConsumerFunc
2628

2729
constructor(
@@ -32,6 +34,7 @@ export class StreamConsumer implements Consumer {
3234
consumerId: number
3335
consumerRef?: string
3436
offset: Offset
37+
creditPolicy?: ConsumerCreditPolicy
3538
},
3639
readonly filter?: ConsumerFilter
3740
) {
@@ -42,6 +45,7 @@ export class StreamConsumer implements Consumer {
4245
this.offset = params.offset
4346
this.clientLocalOffset = this.offset.clone()
4447
this.connection.incrRefCount()
48+
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
4549
this.handle = this.wrapHandle(handle, params.offset)
4650
}
4751

@@ -101,4 +105,8 @@ export class StreamConsumer implements Consumer {
101105
public get streamName(): string {
102106
return this.stream
103107
}
108+
109+
public get creditPolicy() {
110+
return this.creditsHandler
111+
}
104112
}

src/consumer_credit_policy.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
export type CreditRequestWrapper = (howMany: number) => Promise<void>
2+
3+
export abstract class ConsumerCreditPolicy {
4+
constructor(protected readonly onStartup: number) {}
5+
6+
abstract onChunkReceived(_requestWrapper: CreditRequestWrapper): Promise<void>
7+
abstract onChunkProgress(_idx: number, _total: number, _requestWrapper: CreditRequestWrapper): Promise<void>
8+
abstract onChunkCompleted(_requestWrapper: CreditRequestWrapper): Promise<void>
9+
10+
public onSubscription() {
11+
return this.onStartup
12+
}
13+
}
14+
15+
class NewCreditsOnChunkReceived extends ConsumerCreditPolicy {
16+
constructor(onStartup: number = 1, private readonly onRenewal: number = 1) {
17+
super(onStartup)
18+
}
19+
20+
public async onChunkCompleted(_requestWrapper: CreditRequestWrapper) {}
21+
public async onChunkProgress(_current: number, _total: number, _requestWrapper: CreditRequestWrapper) {}
22+
23+
public async onChunkReceived(requestWrapper: CreditRequestWrapper) {
24+
await requestWrapper(this.onRenewal)
25+
}
26+
27+
public onSubscription(): number {
28+
return this.onStartup
29+
}
30+
}
31+
32+
class NewCreditsOnChunkProgress extends ConsumerCreditPolicy {
33+
constructor(onStartup: number = 1, private readonly ratio: number = 0.5, private readonly howMany: number = 1) {
34+
super(onStartup)
35+
}
36+
37+
public async onChunkCompleted(_requestWrapper: CreditRequestWrapper) {}
38+
public async onChunkReceived(_requestWrapper: CreditRequestWrapper) {}
39+
40+
public async onChunkProgress(current: number, total: number, requestWrapper: CreditRequestWrapper) {
41+
const threshold = Math.max(1, Math.ceil(this.ratio * total))
42+
43+
if (current === threshold) {
44+
await requestWrapper(this.howMany)
45+
}
46+
}
47+
}
48+
49+
class NewCreditsOnChunkCompleted extends ConsumerCreditPolicy {
50+
constructor(onStartup: number = 1, private readonly howMany: number = 1) {
51+
super(onStartup)
52+
}
53+
54+
public async onChunkReceived(_requestWrapper: CreditRequestWrapper) {}
55+
public async onChunkProgress(_current: number, _total: number, _requestWrapper: CreditRequestWrapper) {}
56+
57+
public async onChunkCompleted(requestWrapper: CreditRequestWrapper) {
58+
await requestWrapper(this.howMany)
59+
}
60+
}
61+
62+
export const creditsOnChunkReceived = (onStartup: number, onRenewal: number) =>
63+
new NewCreditsOnChunkReceived(onStartup, onRenewal)
64+
export const creditsOnChunkProgress = (onStartup: number, ratio: number, onRenewal: number) =>
65+
new NewCreditsOnChunkProgress(onStartup, ratio, onRenewal)
66+
export const creditsOnChunkCompleted = (onStartup: number, onRenewal: number) =>
67+
new NewCreditsOnChunkCompleted(onStartup, onRenewal)
68+
export const defaultCreditPolicy = creditsOnChunkReceived(2, 1)
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { use as chaiUse, expect, spy } from "chai"
2+
import { Client, Publisher } from "../../src"
3+
import { Message } from "../../src/publisher"
4+
import { Offset } from "../../src/requests/subscribe_request"
5+
import { createClient, createConsumerRef, createPublisher, createStreamName } from "../support/fake_data"
6+
import { Rabbit } from "../support/rabbit"
7+
import { always, eventually, password, username } from "../support/util"
8+
import {
9+
creditsOnChunkCompleted,
10+
creditsOnChunkProgress,
11+
creditsOnChunkReceived,
12+
} from "../../src/consumer_credit_policy"
13+
import spies from "chai-spies"
14+
chaiUse(spies)
15+
16+
const send = async (publisher: Publisher, chunks: Message[][]) => {
17+
for (const chunk of chunks) {
18+
for (const msg of chunk) {
19+
await publisher.send(msg.content)
20+
}
21+
await publisher.flush()
22+
}
23+
}
24+
25+
describe("consumer credit flow policies", () => {
26+
let streamName: string
27+
const rabbit = new Rabbit(username, password)
28+
let client: Client
29+
let publisher: Publisher
30+
const previousMaxSharedClientInstances = process.env.MAX_SHARED_CLIENT_INSTANCES
31+
const sandbox = spy.sandbox()
32+
const chunk: Message[] = [
33+
{ content: Buffer.from("hello") },
34+
{ content: Buffer.from("there") },
35+
{ content: Buffer.from("brave") },
36+
{ content: Buffer.from("new") },
37+
{ content: Buffer.from("world") },
38+
]
39+
const chunks = [chunk, chunk]
40+
const nMessages = chunk.length * chunks.length
41+
42+
before(() => {
43+
process.env.MAX_SHARED_CLIENT_INSTANCES = "10"
44+
})
45+
46+
after(() => {
47+
if (previousMaxSharedClientInstances !== undefined) {
48+
process.env.MAX_SHARED_CLIENT_INSTANCES = previousMaxSharedClientInstances
49+
return
50+
}
51+
delete process.env.MAX_SHARED_CLIENT_INSTANCES
52+
})
53+
54+
beforeEach(async () => {
55+
client = await createClient(username, password)
56+
streamName = createStreamName()
57+
await rabbit.createStream(streamName)
58+
publisher = await createPublisher(streamName, client)
59+
})
60+
61+
afterEach(async () => {
62+
try {
63+
sandbox.restore()
64+
await client.close()
65+
await rabbit.deleteStream(streamName)
66+
await rabbit.closeAllConnections()
67+
await rabbit.deleteAllQueues({ match: /my-stream-/ })
68+
} catch (_e) {}
69+
})
70+
71+
it("NewCreditOnChunkReceived policy requests new credit when chunk is completely received", async () => {
72+
const consumerRef = createConsumerRef()
73+
const policy = creditsOnChunkReceived(1, 1)
74+
sandbox.on(policy, "onChunkReceived")
75+
const received: Message[] = []
76+
await client.declareConsumer(
77+
{
78+
stream: streamName,
79+
offset: Offset.first(),
80+
singleActive: true,
81+
consumerRef: consumerRef,
82+
creditPolicy: policy,
83+
},
84+
(message: Message) => {
85+
received.push(message)
86+
}
87+
)
88+
89+
await send(publisher, chunks)
90+
91+
await eventually(() => expect(received.length).eql(nMessages))
92+
await eventually(() => expect(policy.onChunkReceived).called.exactly(chunks.length))
93+
await always(() => expect(policy.onChunkReceived).called.below(chunks.length + 1), 5000)
94+
}).timeout(10000)
95+
96+
it("NewCreditOnChunkProgress policy requests new credit when chunk handled at 50% progress", async () => {
97+
const consumerRef = createConsumerRef()
98+
const policy = creditsOnChunkProgress(1, 0.5, 1)
99+
sandbox.on(policy, "onChunkProgress")
100+
const received: Message[] = []
101+
await client.declareConsumer(
102+
{
103+
stream: streamName,
104+
offset: Offset.first(),
105+
singleActive: true,
106+
consumerRef: consumerRef,
107+
creditPolicy: policy,
108+
},
109+
(message: Message) => {
110+
received.push(message)
111+
}
112+
)
113+
114+
await send(publisher, chunks)
115+
116+
await eventually(() => expect(received.length).eql(nMessages))
117+
await eventually(() => expect(policy.onChunkProgress).called.exactly(nMessages))
118+
await always(() => expect(policy.onChunkProgress).called.below(nMessages + 1), 5000)
119+
}).timeout(10000)
120+
121+
it("NewCreditOnChunkProgress policy requests new credit when chunk handled at 100% progress", async () => {
122+
const consumerRef = createConsumerRef()
123+
const policy = creditsOnChunkProgress(1, 1, 1)
124+
sandbox.on(policy, "onChunkProgress")
125+
const received: Message[] = []
126+
await client.declareConsumer(
127+
{
128+
stream: streamName,
129+
offset: Offset.first(),
130+
singleActive: true,
131+
consumerRef: consumerRef,
132+
creditPolicy: policy,
133+
},
134+
(message: Message) => {
135+
received.push(message)
136+
}
137+
)
138+
139+
await send(publisher, chunks)
140+
141+
await eventually(() => expect(received.length).eql(nMessages))
142+
await eventually(() => expect(policy.onChunkProgress).called.exactly(nMessages))
143+
await always(() => expect(policy.onChunkProgress).called.below(nMessages + 1), 5000)
144+
}).timeout(10000)
145+
146+
it("NewCreditOnChunkCompleted policy requests new credit when chunk completely handled", async () => {
147+
const consumerRef = createConsumerRef()
148+
const policy = creditsOnChunkCompleted(1, 1)
149+
sandbox.on(policy, "onChunkCompleted")
150+
const received: Message[] = []
151+
await client.declareConsumer(
152+
{
153+
stream: streamName,
154+
offset: Offset.first(),
155+
singleActive: true,
156+
consumerRef: consumerRef,
157+
creditPolicy: policy,
158+
},
159+
async (message: Message) => {
160+
received.push(message)
161+
}
162+
)
163+
164+
await send(publisher, chunks)
165+
166+
await eventually(() => expect(received.length).eql(nMessages))
167+
await eventually(() => expect(policy.onChunkCompleted).called.exactly(chunks.length))
168+
await always(() => expect(policy.onChunkCompleted).called.below(chunks.length + 1), 5000)
169+
}).timeout(10000)
170+
})

0 commit comments

Comments
 (0)