diff --git a/src/client.ts b/src/client.ts index 33c491e4..897f6f4f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -41,7 +41,7 @@ import { SubscribeResponse } from "./responses/subscribe_response" import { UnsubscribeResponse } from "./responses/unsubscribe_response" import { SuperStreamConsumer } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" -import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util" +import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util" import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy" export type ConnectionClosedListener = (hadError: boolean) => void @@ -163,7 +163,7 @@ export class Client { }) } const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter) - connection.onPublisherClosed(publisher.extendedId, params.stream, async () => { + connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => { await publisher.close(false) this.publishers.delete(publisher.extendedId) }) @@ -215,7 +215,7 @@ export class Client { }, params.filter ) - connection.onConsumerClosed(consumer.extendedId, params.stream, async () => { + connection.registerForCloseConsumer(consumer.extendedId, params.stream, async () => { if (params.connectionClosedListener) { params.connectionClosedListener(false) } @@ -230,24 +230,21 @@ export class Client { } public async closeConsumer(extendedConsumerId: string) { - const { consumer, connection } = this.consumers.get(extendedConsumerId) ?? { - consumer: undefined, - connection: undefined, - } - const consumerId = extractConsumerId(extendedConsumerId) - - if (!consumer) { + const activeConsumer = this.consumers.get(extendedConsumerId) + if (!activeConsumer) { this.logger.error("Consumer does not exist") throw new Error(`Consumer with id: ${extendedConsumerId} does not exist`) } - const res = await connection.sendAndWait(new UnsubscribeRequest(consumerId)) - await consumer.close(true) - this.consumers.delete(extendedConsumerId) - if (!res.ok) { - throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + + const consumerId = extractConsumerId(extendedConsumerId) + const { streamInfos } = await this.connection.sendAndWait( + new MetadataRequest({ streams: [activeConsumer.consumer.streamName] }) + ) + if (streamInfos.length > 0 && streamExists(streamInfos[0])) { + await this.unsubscribe(activeConsumer.connection, consumerId) } - this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) - return res.ok + await this.closing(activeConsumer.consumer, extendedConsumerId) + return true } public async declareSuperStreamConsumer( @@ -406,11 +403,7 @@ export class Client { const uniqueConnectionIds = new Set() uniqueConnectionIds.add(this.connection.connectionId) - await new Promise(async (res) => { - setTimeout(() => { - res(true) - }, 5000) - }) + await wait(5000) await this.connection.restart() for (const { consumer, connection, params } of this.consumers.values()) { @@ -703,6 +696,20 @@ export class Client { return Connection.connect({ ...connectionParams, hostname: chosenNode.host, port: chosenNode.port }, this.logger) } + private async unsubscribe(connection: Connection, consumerId: number) { + const res = await connection.sendAndWait(new UnsubscribeRequest(consumerId)) + if (!res.ok) { + throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + return res + } + + private async closing(consumer: StreamConsumer, extendedConsumerId: string) { + await consumer.close(true) + this.consumers.delete(extendedConsumerId) + this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) + } + static async connect(params: ClientParams, logger?: Logger): Promise { return new Client(logger ?? new NullLogger(), { ...params, @@ -842,3 +849,10 @@ const extractPublisherId = (extendedPublisherId: string) => { } const getVhostOrDefault = (vhost: string) => vhost ?? "/" + +const streamExists = (streamInfo: StreamMetadata): boolean => { + return ( + streamInfo.responseCode !== ResponseCode.StreamDoesNotExist && + streamInfo.responseCode !== ResponseCode.SubscriptionIdDoesNotExist + ) +} diff --git a/src/connection.ts b/src/connection.ts index 4b8e3246..ef1457ef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -265,12 +265,20 @@ export class Connection { ) } - public onPublisherClosed(publisherExtendedId: string, streamName: string, callback: () => void | Promise) { + public registerForClosePublisher( + publisherExtendedId: string, + streamName: string, + callback: () => void | Promise + ) { this.publisherListeners.push({ extendedId: publisherExtendedId, stream: streamName }) this.closeEventsEmitter.once(`close_publisher_${publisherExtendedId}`, callback) } - public onConsumerClosed(consumerExtendedId: string, streamName: string, callback: () => void | Promise) { + public registerForCloseConsumer( + consumerExtendedId: string, + streamName: string, + callback: () => void | Promise + ) { this.consumerListeners.push({ extendedId: consumerExtendedId, stream: streamName }) this.closeEventsEmitter.once(`close_consumer_${consumerExtendedId}`, callback) } diff --git a/src/util.ts b/src/util.ts index f5e98d76..438957da 100644 --- a/src/util.ts +++ b/src/util.ts @@ -39,3 +39,14 @@ export const bigIntMax = (n: bigint[]): bigint | undefined => { if (!n.length) return undefined return n.reduce((acc, i) => (i > acc ? i : acc), n[0]) } + +export const wait = async (ms: number) => { + return new Promise((res) => { + setTimeout(() => res(true), ms) + }) +} + +export const ResponseCode = { + StreamDoesNotExist: 2, + SubscriptionIdDoesNotExist: 4, +} as const diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index f835f74a..6073f971 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -84,6 +84,23 @@ describe("declare consumer", () => { await eventually(() => expect(messages).eql([Buffer.from("hello")])) }).timeout(10000) + it("closing a consumer on an existing stream - raises connectionClosedListener", async () => { + const messages: Buffer[] = [] + await publisher.send(Buffer.from("hello")) + let called = false + await client.declareConsumer( + { stream: streamName, offset: Offset.first(), connectionClosedListener: () => (called = true) }, + (message: Message) => { + messages.push(message.content) + } + ) + await eventually(() => expect(messages).eql([Buffer.from("hello")])) + + await client.close({ closingCode: 0, closingReason: "", manuallyClose: false }) + + await eventually(() => expect(called).true) + }).timeout(10000) + it("declaring a consumer on an existing stream with identifiers", async () => { const messages: Buffer[] = [] await publisher.send(Buffer.from("hello"))