Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { SubscribeResponse } from "./responses/subscribe_response"
import { UnsubscribeResponse } from "./responses/unsubscribe_response"
import { SuperStreamConsumer } from "./super_stream_consumer"
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util"
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util"
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"

export type ConnectionClosedListener = (hadError: boolean) => void
Expand Down Expand Up @@ -163,7 +163,7 @@ export class Client {
})
}
const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter)
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => {
await publisher.close(false)
this.publishers.delete(publisher.extendedId)
})
Expand Down Expand Up @@ -215,7 +215,7 @@ export class Client {
},
params.filter
)
connection.onConsumerClosed(consumer.extendedId, params.stream, async () => {
connection.registerForCloseConsumer(consumer.extendedId, params.stream, async () => {
if (params.connectionClosedListener) {
params.connectionClosedListener(false)
}
Expand All @@ -230,24 +230,21 @@ export class Client {
}

public async closeConsumer(extendedConsumerId: string) {
const { consumer, connection } = this.consumers.get(extendedConsumerId) ?? {
consumer: undefined,
connection: undefined,
}
const consumerId = extractConsumerId(extendedConsumerId)

if (!consumer) {
const activeConsumer = this.consumers.get(extendedConsumerId)
if (!activeConsumer) {
this.logger.error("Consumer does not exist")
throw new Error(`Consumer with id: ${extendedConsumerId} does not exist`)
}
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
await consumer.close(true)
this.consumers.delete(extendedConsumerId)
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)

const consumerId = extractConsumerId(extendedConsumerId)
const { streamInfos } = await this.connection.sendAndWait<MetadataResponse>(
new MetadataRequest({ streams: [activeConsumer.consumer.streamName] })
)
if (streamInfos.length > 0 && streamExists(streamInfos[0])) {
await this.unsubscribe(activeConsumer.connection, consumerId)
}
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
return res.ok
await this.closing(activeConsumer.consumer, extendedConsumerId)
return true
}

public async declareSuperStreamConsumer(
Expand Down Expand Up @@ -406,11 +403,7 @@ export class Client {
const uniqueConnectionIds = new Set<string>()
uniqueConnectionIds.add(this.connection.connectionId)

await new Promise(async (res) => {
setTimeout(() => {
res(true)
}, 5000)
})
await wait(5000)
await this.connection.restart()

for (const { consumer, connection, params } of this.consumers.values()) {
Expand Down Expand Up @@ -703,6 +696,20 @@ export class Client {
return Connection.connect({ ...connectionParams, hostname: chosenNode.host, port: chosenNode.port }, this.logger)
}

private async unsubscribe(connection: Connection, consumerId: number) {
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
return res
}

private async closing(consumer: StreamConsumer, extendedConsumerId: string) {
await consumer.close(true)
this.consumers.delete(extendedConsumerId)
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
}

static async connect(params: ClientParams, logger?: Logger): Promise<Client> {
return new Client(logger ?? new NullLogger(), {
...params,
Expand Down Expand Up @@ -842,3 +849,10 @@ const extractPublisherId = (extendedPublisherId: string) => {
}

const getVhostOrDefault = (vhost: string) => vhost ?? "/"

const streamExists = (streamInfo: StreamMetadata): boolean => {
return (
streamInfo.responseCode !== ResponseCode.StreamDoesNotExist &&
streamInfo.responseCode !== ResponseCode.SubscriptionIdDoesNotExist
)
}
12 changes: 10 additions & 2 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,20 @@ export class Connection {
)
}

public onPublisherClosed(publisherExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
public registerForClosePublisher(
publisherExtendedId: string,
streamName: string,
callback: () => void | Promise<void>
) {
this.publisherListeners.push({ extendedId: publisherExtendedId, stream: streamName })
this.closeEventsEmitter.once(`close_publisher_${publisherExtendedId}`, callback)
}

public onConsumerClosed(consumerExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
public registerForCloseConsumer(
consumerExtendedId: string,
streamName: string,
callback: () => void | Promise<void>
) {
this.consumerListeners.push({ extendedId: consumerExtendedId, stream: streamName })
this.closeEventsEmitter.once(`close_consumer_${consumerExtendedId}`, callback)
}
Expand Down
11 changes: 11 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,14 @@ export const bigIntMax = (n: bigint[]): bigint | undefined => {
if (!n.length) return undefined
return n.reduce((acc, i) => (i > acc ? i : acc), n[0])
}

export const wait = async (ms: number) => {
return new Promise((res) => {
setTimeout(() => res(true), ms)
})
}

export const ResponseCode = {
StreamDoesNotExist: 2,
SubscriptionIdDoesNotExist: 4,
} as const
17 changes: 17 additions & 0 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ describe("declare consumer", () => {
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
}).timeout(10000)

it("closing a consumer on an existing stream - raises connectionClosedListener", async () => {
const messages: Buffer[] = []
await publisher.send(Buffer.from("hello"))
let called = false
await client.declareConsumer(
{ stream: streamName, offset: Offset.first(), connectionClosedListener: () => (called = true) },
(message: Message) => {
messages.push(message.content)
}
)
await eventually(() => expect(messages).eql([Buffer.from("hello")]))

await client.close({ closingCode: 0, closingReason: "", manuallyClose: false })

await eventually(() => expect(called).true)
}).timeout(10000)

it("declaring a consumer on an existing stream with identifiers", async () => {
const messages: Buffer[] = []
await publisher.send(Buffer.from("hello"))
Expand Down
Loading