From f6e8e8ccb79ac38039ee5609f7992b321def1685 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 5 Jun 2024 19:54:42 +0200 Subject: [PATCH 01/12] feat: metadata update triggers closing of relevant consumers and publishers --- src/client.ts | 18 +++++++- src/connection.ts | 2 + src/consumer.ts | 4 ++ src/publisher.ts | 22 ++++++--- test/e2e/declare_publisher.test.ts | 11 +++++ test/e2e/metadata_update.test.ts | 72 +++++++++++++++++++++--------- test/e2e/offset.test.ts | 2 +- test/e2e/publish_confirm.test.ts | 10 ----- test/support/fake_data.ts | 42 ++++++++++++----- test/support/rabbit.ts | 20 ++++++--- 10 files changed, 145 insertions(+), 58 deletions(-) diff --git a/src/client.ts b/src/client.ts index 44cf7558..5ba87cb2 100644 --- a/src/client.ts +++ b/src/client.ts @@ -148,6 +148,12 @@ export class Client { logger: this.logger, } const publisher = new StreamPublisher(streamPublisherParams, filter) + connection.on("metadata_update", async (metadata) => { + if (metadata.metadataInfo.stream === publisher.streamName) { + await publisher.close(false) + this.publishers.delete(publisherId) + } + }) this.publishers.set(publisherId, { publisher, connection, params, filter }) this.logger.info( `New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` @@ -181,6 +187,14 @@ export class Client { { connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset }, params.filter ) + this.connection.on("metadata_update", async (metadata) => { + if (metadata.metadataInfo.stream === consumer.streamName) { + if (params.connectionClosedListener) { + params.connectionClosedListener(false) + } + await this.closeConsumer(consumerId) + } + }) this.consumers.set(consumerId, { connection, consumer, params }) await this.declareConsumerOnConnection(params, consumerId, this.connection) this.logger.info( @@ -197,11 +211,11 @@ export class Client { throw new Error(`Consumer with id: ${consumerId} does not exist`) } const res = await this.connection.sendAndWait(new UnsubscribeRequest(consumerId)) + await consumer.close(true) + this.consumers.delete(consumerId) if (!res.ok) { throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - await consumer.close(true) - this.consumers.delete(consumerId) this.logger.info(`Closed consumer with id: ${consumerId}`) return res.ok } diff --git a/src/connection.ts b/src/connection.ts index 0ce3f500..f640d101 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -510,6 +510,8 @@ export function errorMessageOf(code: number): string { switch (code) { case 0x02: return "Stream does not exist" + case 0x04: + return "Subscription ID does not exist" case 0x06: return "Stream not available" case 0x12: diff --git a/src/consumer.ts b/src/consumer.ts index 7974ab38..e2bfc821 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -97,4 +97,8 @@ export class StreamConsumer implements Consumer { } return handle } + + public get streamName(): string { + return this.stream + } } diff --git a/src/publisher.ts b/src/publisher.ts index a9672494..a5632466 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -1,17 +1,17 @@ import { inspect } from "util" import { messageSize } from "./amqp10/encoder" import { CompressionType } from "./compression" +import { Connection, ConnectionInfo } from "./connection" +import { ConnectionPool } from "./connection_pool" import { Logger } from "./logger" import { FrameSizeException } from "./requests/frame_size_exception" import { PublishRequest, PublishRequestMessage } from "./requests/publish_request" +import { PublishRequestV2 } from "./requests/publish_request_v2" import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request" import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" import { MetadataUpdateListener } from "./response_decoder" -import { ConnectionInfo, Connection } from "./connection" -import { ConnectionPool } from "./connection_pool" -import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record export type MessageAnnotations = Record @@ -82,6 +82,7 @@ export interface Publisher { getLastPublishingId(): Promise getConnectionInfo(): ConnectionInfo close(manuallyClose: boolean): Promise + closed: boolean ref: string readonly publisherId: number } @@ -101,7 +102,7 @@ export class StreamPublisher implements Publisher { private scheduled: NodeJS.Immediate | null private logger: Logger private maxChunkLength: number - private closed = false + private _closed = false constructor( params: { @@ -130,7 +131,14 @@ export class StreamPublisher implements Publisher { this.connection.incrRefCount() } + public get closed(): boolean { + return this._closed + } + async send(message: Buffer, opts: MessageOptions = {}): Promise { + if (this._closed) { + throw new Error(`Publisher has been closed`) + } if (this.boot && this.publishingId === -1n) { this.publishingId = await this.getLastPublishingId() } @@ -203,7 +211,11 @@ export class StreamPublisher implements Publisher { await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) } } - this.closed = true + this._closed = true + } + + public get streamName(): string { + return this.stream } private async enqueue(publishRequestMessage: PublishRequestMessage) { diff --git a/test/e2e/declare_publisher.test.ts b/test/e2e/declare_publisher.test.ts index 53a3c079..11bea87d 100644 --- a/test/e2e/declare_publisher.test.ts +++ b/test/e2e/declare_publisher.test.ts @@ -69,6 +69,17 @@ describe("declare publisher", () => { ) }) + it("if the server deletes the stream, the publisher gets closed", async () => { + const publisher = await createPublisher(streamName, client) + await rabbit.deleteStream(streamName) + + await expectToThrowAsync( + () => publisher.send(Buffer.from(`test${randomUUID()}`)), + Error, + "Publisher has been closed" + ) + }) + it("publishers for the same stream should share the underlying connection", async () => { const publisher1 = await createPublisher(streamName, client) const publisher2 = await createPublisher(streamName, client) diff --git a/test/e2e/metadata_update.test.ts b/test/e2e/metadata_update.test.ts index 5de65b5b..585af5cb 100644 --- a/test/e2e/metadata_update.test.ts +++ b/test/e2e/metadata_update.test.ts @@ -1,22 +1,16 @@ import { expect } from "chai" -import { Client, ClientListenersParams } from "../../src" -import { MetadataUpdateResponse } from "../../src/responses/metadata_update_response" -import { createClient, createPublisher, createStreamName } from "../support/fake_data" +import { Client, Offset } from "../../src" +import { createClient, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" -import { eventually, username, password } from "../support/util" +import { eventually, password, username } from "../support/util" -describe("update the metadata from the server", () => { +describe("react to a metadata update message from the server", () => { const rabbit = new Rabbit(username, password) let client: Client let streamName: string - const metadataUpdateResponses: MetadataUpdateResponse[] = [] beforeEach(async () => { - metadataUpdateResponses.length = 0 - const listeners: ClientListenersParams = { - metadata_update: (data) => metadataUpdateResponses.push(data), - } - client = await createClient(username, password, listeners) + client = await createClient(username, password) streamName = createStreamName() await rabbit.createStream(streamName) }) @@ -27,22 +21,56 @@ describe("update the metadata from the server", () => { await rabbit.deleteStream(streamName) await rabbit.closeAllConnections() await rabbit.deleteAllQueues({ match: /my-stream-/ }) - } catch (e) {} + } catch (_e) {} }) - it("when delete stream we receive a metadataUpdate", async () => { - await createPublisher(streamName, client) + it("when we have a metadata update on a stream any consumer on that stream gets removed from the consumers list", async () => { + await client.declareConsumer({ offset: Offset.first(), stream: streamName }, () => { + return + }) + + await rabbit.deleteStream(streamName) + + await eventually(() => { + expect(client.consumerCounts()).to.eql(0) + }, 3000) + }) + + it("when we have a metadata update on a stream the connection closed callback of its consumers fires", async () => { + let cbCalled = 0 + await client.declareConsumer( + { offset: Offset.first(), stream: streamName, connectionClosedListener: (_) => cbCalled++ }, + () => { + return + } + ) + + await rabbit.deleteStream(streamName) + + await eventually(() => { + expect(cbCalled).to.eql(1) + }, 3000) + }) + + it("when we have a metadata update on a stream any publisher on that stream gets closed", async () => { + const publisher = await client.declarePublisher({ stream: streamName }) + await rabbit.deleteStream(streamName) - await eventually(async () => expect(metadataUpdateResponses.length).greaterThanOrEqual(1), 10000) - }).timeout(10000) - it("when delete stream we receive a metadataUpdate registering after creation", async () => { - let called = 0 - const publisher = await createPublisher(streamName, client) - publisher.on("metadata_update", (_data: MetadataUpdateResponse) => called++) + await eventually(() => { + expect(client.publisherCounts()).to.eql(0) + expect(publisher.closed).to.eql(true) + }, 3000) + }) + + it("when we have a metadata update on a stream the connection closed callback of its publishers fires", async () => { + let cbCalled = 0 + await client.declarePublisher({ stream: streamName, connectionClosedListener: (_) => cbCalled++ }) await rabbit.deleteStream(streamName) - await eventually(async () => expect(called).greaterThanOrEqual(1), 10000) - }).timeout(10000) + await eventually(() => { + expect(cbCalled).to.eql(1) + }, 3000) + }) }) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 15c02f4c..05ec8175 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -246,7 +246,7 @@ describe("offset", () => { } ) await rabbit.deleteStream(testStreamName) - await expectToThrowAsync(() => consumer.queryOffset(), Error, `Query offset command returned error with code 2`) + await expectToThrowAsync(() => consumer.queryOffset(), Error, `This socket has been ended by the other party`) }) }) }) diff --git a/test/e2e/publish_confirm.test.ts b/test/e2e/publish_confirm.test.ts index d3376644..2f5f9d86 100644 --- a/test/e2e/publish_confirm.test.ts +++ b/test/e2e/publish_confirm.test.ts @@ -42,14 +42,4 @@ describe("publish a message and get confirmation", () => { const lastPublishingId = await publisher.getLastPublishingId() expect(publishResponses).eql([{ error: null, ids: [lastPublishingId] }]) }).timeout(12000) - - it("after the server replies with an error, the error callback is invoked with an error", async () => { - const publisher = await client.declarePublisher({ stream, publisherRef }) - publisher.on("publish_confirm", (error, ids) => publishResponses.push({ error, ids })) - await rabbit.deleteStream(stream) - - await publisher.send(Buffer.from(`test${randomUUID()}`)) - - await eventually(() => expect(publishResponses).eql([{ error: 256, ids: [undefined] }])) - }).timeout(10000) }) diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index 23eabcd9..a638f763 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -5,6 +5,8 @@ import { BufferSizeSettings } from "../../src/requests/request" import { Offset } from "../../src/requests/subscribe_request" import { Consumer, Publisher } from "../../src" import { getTestNodesFromEnv } from "./util" +import { createLogger, format, transports } from "winston" +import { inspect } from "util" export function createProperties(): MessageProperties { return { @@ -61,16 +63,32 @@ export async function createClient( connectionName?: string ): Promise { const [firstNode] = getTestNodesFromEnv() - return connect({ - hostname: firstNode.host, - port: port ?? firstNode.port, - username, - password, - vhost: "/", - frameMax: frameMax ?? 0, - heartbeat: 0, - listeners: listeners, - bufferSizeSettings: bufferSizeSettings, - connectionName: connectionName, - }) + return connect( + { + hostname: firstNode.host, + port: port ?? firstNode.port, + username, + password, + vhost: "/", + frameMax: frameMax ?? 0, + heartbeat: 0, + listeners: listeners, + bufferSizeSettings: bufferSizeSettings, + connectionName: connectionName, + } + // testLogger + ) } + +export const testLogger = createLogger({ + level: "debug", + format: format.combine( + format.colorize(), + format.timestamp(), + format.align(), + format.splat(), + format.label(), + format.printf((info) => `${info.timestamp} ${info.level}: ${info.message} ${info.meta ? inspect(info.meta) : ""}`) + ), + transports: new transports.Console(), +}) diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index 270ec283..ff0fcd19 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -1,4 +1,4 @@ -import got from "got" +import got, { HTTPError } from "got" import { getTestNodesFromEnv } from "./util" import { range } from "../../src/util" import { CreateStreamArguments } from "../../src/requests/create_stream_request" @@ -123,11 +123,19 @@ export class Rabbit { }) } - deleteStream(streamName: string) { - return got.delete(`http://${this.firstNode.host}:${this.port}/api/queues/%2F/${streamName}`, { - username: this.username, - password: this.password, - }) + async deleteStream(streamName: string) { + try { + const res = await got.delete(`http://${this.firstNode.host}:${this.port}/api/queues/%2F/${streamName}`, { + username: this.username, + password: this.password, + }) + return res + } catch (e) { + if (e instanceof HTTPError) { + if (e.message === "Response code 404 (Not Found)") return "" + } + throw e + } } createExchange(exchangeName: string) { From 24a9ac4b8bc0afd69cc6540b026d1c7f2bc14609 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 5 Jun 2024 20:05:09 +0200 Subject: [PATCH 02/12] chore: minor cleanup --- test/e2e/address_resolver.test.ts | 9 ++------- test/e2e/close_consumer.test.ts | 2 +- test/e2e/declare_consumer.test.ts | 5 +---- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/test/e2e/address_resolver.test.ts b/test/e2e/address_resolver.test.ts index e2774718..0a8e4a98 100644 --- a/test/e2e/address_resolver.test.ts +++ b/test/e2e/address_resolver.test.ts @@ -1,6 +1,5 @@ import { expect } from "chai" import { Client, connect } from "../../src" -import { Message } from "../../src/publisher" import { Offset } from "../../src/requests/subscribe_request" import { getAddressResolverFromEnv } from "../../src/util" import { createStreamName } from "../support/fake_data" @@ -41,15 +40,11 @@ describe("address resolver", () => { }) it("declaring a consumer - should not throw", async () => { - await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => { - console.log("Message received") - }) + await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null) }) it("declaring a consumer - if multiple nodes are present the consumer should be connected to a replica", async () => { - const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => { - console.log("Message received") - }) + const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null) const connectionInfo = consumer.getConnectionInfo() const queueInfo = await rabbit.getQueueInfo(streamName) diff --git a/test/e2e/close_consumer.test.ts b/test/e2e/close_consumer.test.ts index 736624cc..15a999f4 100644 --- a/test/e2e/close_consumer.test.ts +++ b/test/e2e/close_consumer.test.ts @@ -36,7 +36,7 @@ describe("close consumer", () => { it("closing a consumer in an existing stream", async () => { await client.declarePublisher({ stream: testStreamName }) - const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, console.log) + const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, () => null) const response = await client.closeConsumer(consumer.consumerId) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 675e768c..6671567f 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -192,10 +192,7 @@ describe("declare consumer", () => { it("declaring a consumer on a non-existing stream should raise an error", async () => { await expectToThrowAsync( - () => - client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, (message: Message) => - console.log(message.content) - ), + () => client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, () => null), Error, "Stream was not found on any node" ) From 0e8af8e748ef5c0c418c9b216ec2d91cb2051716 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 5 Jun 2024 20:15:56 +0200 Subject: [PATCH 03/12] tests: increase timeout on slow test --- test/e2e/metadata_update.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/metadata_update.test.ts b/test/e2e/metadata_update.test.ts index 585af5cb..cf675bd3 100644 --- a/test/e2e/metadata_update.test.ts +++ b/test/e2e/metadata_update.test.ts @@ -50,7 +50,7 @@ describe("react to a metadata update message from the server", () => { await eventually(() => { expect(cbCalled).to.eql(1) }, 3000) - }) + }).timeout(5000) it("when we have a metadata update on a stream any publisher on that stream gets closed", async () => { const publisher = await client.declarePublisher({ stream: streamName }) From 3480bd988a5adb52ed1dbbe2f766ccfe2017f9db Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 09:20:40 +0200 Subject: [PATCH 04/12] chore: adjust pipeline --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e471a43b..a1a661ff 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,7 +46,7 @@ jobs: - name: Restart RabbitMQ run: docker restart ${{ job.services.rabbitmq.id }} - name: Wait for rabbit instance restart - run: sleep 5 + run: sleep 10 - name: Create SuperStream run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-streams add_super_stream super-stream-test --partitions 2 - run: npm ci From 7e48efb0e47595dc17cf93be775296f401770c64 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 12:41:39 +0200 Subject: [PATCH 05/12] chore: add rebuild from source script on example --- example/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/example/package.json b/example/package.json index c8186a95..906e20d3 100644 --- a/example/package.json +++ b/example/package.json @@ -6,7 +6,8 @@ "scripts": { "test": "echo \"Error: no test specified\" && exit 1", "start": "node index.js", - "cluster-example": "node cluster_example.js" + "cluster-example": "node cluster_example.js", + "rebuild-source": "cd .. && npm run build && cd - && npm install --force" }, "author": "", "license": "ISC", From a97bf7292f0bcaf082a3d69685e920a08a2bb0f5 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 12:42:03 +0200 Subject: [PATCH 06/12] tests: fix typo --- test/e2e/declare_consumer.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 6671567f..2a200de9 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -100,7 +100,7 @@ describe("declare consumer", () => { await eventually(() => expect(messages).eql([Buffer.from("hello")])) }).timeout(10000) - it("declaring a single active consumer on an existing stream and a simple one - the active of the group and the simple should handle the message", async () => { + it("declaring a single active consumer on an existing stream and a simple one - the active of the group and the simple should handle the message", async () => { const messages: Buffer[] = [] const consumerRef = createConsumerRef() From d91849441203517e5de7de7cc0b6a4563670ce57 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 12:42:23 +0200 Subject: [PATCH 07/12] chore: minor cleanup --- src/connection.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connection.ts b/src/connection.ts index f640d101..70766982 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -171,7 +171,8 @@ export class Connection { } public static connect(params: ConnectionParams, logger: Logger): Promise { - return new Connection(params, logger).start() + const connection = Connection.create(params, logger) + return connection.start() } public static create(params: ConnectionParams, logger: Logger): Connection { From 4316ec06a8ae72b6058dd14571fd84c6f2f96136 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 12:42:39 +0200 Subject: [PATCH 08/12] fix: move all consumer chatter on the consumer connection --- src/client.ts | 57 ++++++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/client.ts b/src/client.ts index 5ba87cb2..0b1f850f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,19 +1,24 @@ import { randomUUID } from "crypto" +import { coerce, lt } from "semver" import { inspect } from "util" import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression" +import { Connection, ConnectionInfo, errorMessageOf } from "./connection" +import { ConnectionPool } from "./connection_pool" import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer" import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes" import { Logger, NullLogger } from "./logger" import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher" import { ConsumerUpdateResponse } from "./requests/consumer_update_response" import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request" +import { CreateSuperStreamRequest } from "./requests/create_super_stream_request" import { CreditRequest, CreditRequestParams } from "./requests/credit_request" import { DeclarePublisherRequest } from "./requests/declare_publisher_request" import { DeletePublisherRequest } from "./requests/delete_publisher_request" import { DeleteStreamRequest } from "./requests/delete_stream_request" +import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request" import { MetadataRequest } from "./requests/metadata_request" import { PartitionsQuery } from "./requests/partitions_query" -import { BufferSizeSettings, Request } from "./requests/request" +import { BufferSizeSettings } from "./requests/request" import { RouteQuery } from "./requests/route_query" import { StreamStatsRequest } from "./requests/stream_stats_request" import { Offset, SubscribeRequest } from "./requests/subscribe_request" @@ -21,10 +26,13 @@ import { UnsubscribeRequest } from "./requests/unsubscribe_request" import { MetadataUpdateListener, PublishConfirmListener, PublishErrorListener } from "./response_decoder" import { ConsumerUpdateQuery } from "./responses/consumer_update_query" import { CreateStreamResponse } from "./responses/create_stream_response" +import { CreateSuperStreamResponse } from "./responses/create_super_stream_response" import { DeclarePublisherResponse } from "./responses/declare_publisher_response" import { DeletePublisherResponse } from "./responses/delete_publisher_response" import { DeleteStreamResponse } from "./responses/delete_stream_response" +import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response" import { DeliverResponse } from "./responses/deliver_response" +import { DeliverResponseV2 } from "./responses/deliver_response_v2" import { Broker, MetadataResponse, StreamMetadata } from "./responses/metadata_response" import { PartitionsResponse } from "./responses/partitions_response" import { RouteResponse } from "./responses/route_response" @@ -34,14 +42,6 @@ import { UnsubscribeResponse } from "./responses/unsubscribe_response" import { SuperStreamConsumer } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util" -import { CreateSuperStreamRequest } from "./requests/create_super_stream_request" -import { CreateSuperStreamResponse } from "./responses/create_super_stream_response" -import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response" -import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request" -import { lt, coerce } from "semver" -import { ConnectionInfo, Connection, errorMessageOf } from "./connection" -import { ConnectionPool } from "./connection_pool" -import { DeliverResponseV2 } from "./responses/deliver_response_v2" export type ConnectionClosedListener = (hadError: boolean) => void @@ -187,7 +187,7 @@ export class Client { { connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset }, params.filter ) - this.connection.on("metadata_update", async (metadata) => { + connection.on("metadata_update", async (metadata) => { if (metadata.metadataInfo.stream === consumer.streamName) { if (params.connectionClosedListener) { params.connectionClosedListener(false) @@ -196,7 +196,7 @@ export class Client { } }) this.consumers.set(consumerId, { connection, consumer, params }) - await this.declareConsumerOnConnection(params, consumerId, this.connection) + await this.declareConsumerOnConnection(params, consumerId, connection) this.logger.info( `New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}` ) @@ -204,13 +204,13 @@ export class Client { } public async closeConsumer(consumerId: number) { - const consumer = this.consumers.get(consumerId)?.consumer + const { consumer, connection } = this.consumers.get(consumerId) ?? { consumer: undefined, connection: undefined } if (!consumer) { this.logger.error("Consumer does not exist") throw new Error(`Consumer with id: ${consumerId} does not exist`) } - const res = await this.connection.sendAndWait(new UnsubscribeRequest(consumerId)) + const res = await connection.sendAndWait(new UnsubscribeRequest(consumerId)) await consumer.close(true) this.consumers.delete(consumerId) if (!res.ok) { @@ -268,10 +268,6 @@ export class Client { return Array.from(this.consumers.values()) } - public send(cmd: Request): Promise { - return this.connection.send(cmd) - } - public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise { this.logger.debug(`Create Stream...`) const res = await this.connection.sendAndWait(new CreateStreamRequest(params)) @@ -388,7 +384,7 @@ export class Client { } uniqueConnectionIds.add(connection.id) const consumerParams = { ...params, offset: consumer.localOffset } - await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, this.connection) + await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection) } for (const { publisher, connection, params, filter } of this.publishers.values()) { @@ -475,8 +471,8 @@ export class Client { } } - private askForCredit(params: CreditRequestParams): Promise { - return this.send(new CreditRequest({ ...params })) + private askForCredit(params: CreditRequestParams, connection: Connection): Promise { + return connection.send(new CreditRequest({ ...params })) } private incPublisherId() { @@ -493,28 +489,34 @@ export class Client { private getDeliverV1Callback() { return async (response: DeliverResponse) => { - const consumer = this.consumers.get(response.subscriptionId)?.consumer + const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + consumer: undefined, + connection: undefined, + } if (!consumer) { this.logger.error(`On deliverV1 no consumer found`) return } this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`) this.logger.debug(`response.messages.length: ${response.messages.length}`) - await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection) response.messages.map((x) => consumer.handle(x)) } } private getDeliverV2Callback() { return async (response: DeliverResponseV2) => { - const consumer = this.consumers.get(response.subscriptionId)?.consumer + const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + consumer: undefined, + connection: undefined, + } if (!consumer) { this.logger.error(`On deliverV2 no consumer found`) return } this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`) this.logger.debug(`response.messages.length: ${response.messages.length}`) - await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }) + await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection) if (consumer.filter) { response.messages.filter((x) => consumer.filter?.postFilterFunc(x)).map((x) => consumer.handle(x)) return @@ -525,13 +527,16 @@ export class Client { private getConsumerUpdateCallback() { return async (response: ConsumerUpdateQuery) => { - const consumer = this.consumers.get(response.subscriptionId)?.consumer + const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + consumer: undefined, + connection: undefined, + } if (!consumer) { this.logger.error(`On consumer_update_query no consumer found`) return } this.logger.debug(`on consumer_update_query -> ${consumer.consumerRef}`) - await this.send( + await connection.send( new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset: consumer.offset }) ) } From 0db5e5c1372fe02aaf259d5741be1c52bb078665 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 12:52:09 +0200 Subject: [PATCH 09/12] chore: added a log on the rabbit http client --- test/support/rabbit.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index ff0fcd19..8753d949 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -132,6 +132,7 @@ export class Rabbit { return res } catch (e) { if (e instanceof HTTPError) { + console.warn(`Attempted to delete a stream which had already been deleted`) if (e.message === "Response code 404 (Not Found)") return "" } throw e From 9c47cc3e941d46bb6a0485d3d5a89a077eb668b6 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 13:01:12 +0200 Subject: [PATCH 10/12] chore: revert adding console log in rabbit http client, it only dirties the output --- test/support/rabbit.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index 8753d949..ff0fcd19 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -132,7 +132,6 @@ export class Rabbit { return res } catch (e) { if (e instanceof HTTPError) { - console.warn(`Attempted to delete a stream which had already been deleted`) if (e.message === "Response code 404 (Not Found)") return "" } throw e From db160fd93b28a0f3c3b130fc79f6d539021fb90b Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 14:23:15 +0200 Subject: [PATCH 11/12] tests: increase wait on single test --- test/e2e/offset.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 05ec8175..e71752d3 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -74,7 +74,7 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) - await wait(200) + await wait(500) const lastBatchMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( @@ -246,6 +246,7 @@ describe("offset", () => { } ) await rabbit.deleteStream(testStreamName) + await wait(200) await expectToThrowAsync(() => consumer.queryOffset(), Error, `This socket has been ended by the other party`) }) }) From 3e15de72a8209100a8639f15f3336b5ff8fd4c10 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 6 Jun 2024 14:39:06 +0200 Subject: [PATCH 12/12] tests: minor fixes for the pipeline which runs considerably slower --- test/e2e/declare_publisher.test.ts | 3 ++- test/e2e/metadata_update.test.ts | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/test/e2e/declare_publisher.test.ts b/test/e2e/declare_publisher.test.ts index 11bea87d..d0d7075c 100644 --- a/test/e2e/declare_publisher.test.ts +++ b/test/e2e/declare_publisher.test.ts @@ -2,7 +2,7 @@ import { expect } from "chai" import { Client } from "../../src" import { createClient, createPublisher, createStreamName } from "../support/fake_data" import { Rabbit, RabbitConnectionResponse } from "../support/rabbit" -import { eventually, expectToThrowAsync, username, password } from "../support/util" +import { eventually, expectToThrowAsync, username, password, wait } from "../support/util" import { getMaxSharedConnectionInstances } from "../../src/util" import { randomUUID } from "crypto" @@ -72,6 +72,7 @@ describe("declare publisher", () => { it("if the server deletes the stream, the publisher gets closed", async () => { const publisher = await createPublisher(streamName, client) await rabbit.deleteStream(streamName) + await wait(500) await expectToThrowAsync( () => publisher.send(Buffer.from(`test${randomUUID()}`)), diff --git a/test/e2e/metadata_update.test.ts b/test/e2e/metadata_update.test.ts index cf675bd3..d03042b2 100644 --- a/test/e2e/metadata_update.test.ts +++ b/test/e2e/metadata_update.test.ts @@ -15,8 +15,10 @@ describe("react to a metadata update message from the server", () => { await rabbit.createStream(streamName) }) - afterEach(async () => { + afterEach(async function () { try { + // eslint-disable-next-line no-invalid-this + this.timeout(5000) await client.close() await rabbit.deleteStream(streamName) await rabbit.closeAllConnections()