Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Restart RabbitMQ
run: docker restart ${{ job.services.rabbitmq.id }}
- name: Wait for rabbit instance restart
run: sleep 5
run: sleep 10
- name: Create SuperStream
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-streams add_super_stream super-stream-test --partitions 2
- run: npm ci
Expand Down
3 changes: 2 additions & 1 deletion example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node index.js",
"cluster-example": "node cluster_example.js"
"cluster-example": "node cluster_example.js",
"rebuild-source": "cd .. && npm run build && cd - && npm install --force"
},
"author": "",
"license": "ISC",
Expand Down
73 changes: 46 additions & 27 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { randomUUID } from "crypto"
import { coerce, lt } from "semver"
import { inspect } from "util"
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
import { Connection, ConnectionInfo, errorMessageOf } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer"
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
import { Logger, NullLogger } from "./logger"
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
import { CreateSuperStreamRequest } from "./requests/create_super_stream_request"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
import { DeclarePublisherRequest } from "./requests/declare_publisher_request"
import { DeletePublisherRequest } from "./requests/delete_publisher_request"
import { DeleteStreamRequest } from "./requests/delete_stream_request"
import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request"
import { MetadataRequest } from "./requests/metadata_request"
import { PartitionsQuery } from "./requests/partitions_query"
import { BufferSizeSettings, Request } from "./requests/request"
import { BufferSizeSettings } from "./requests/request"
import { RouteQuery } from "./requests/route_query"
import { StreamStatsRequest } from "./requests/stream_stats_request"
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
import { UnsubscribeRequest } from "./requests/unsubscribe_request"
import { MetadataUpdateListener, PublishConfirmListener, PublishErrorListener } from "./response_decoder"
import { ConsumerUpdateQuery } from "./responses/consumer_update_query"
import { CreateStreamResponse } from "./responses/create_stream_response"
import { CreateSuperStreamResponse } from "./responses/create_super_stream_response"
import { DeclarePublisherResponse } from "./responses/declare_publisher_response"
import { DeletePublisherResponse } from "./responses/delete_publisher_response"
import { DeleteStreamResponse } from "./responses/delete_stream_response"
import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response"
import { DeliverResponse } from "./responses/deliver_response"
import { DeliverResponseV2 } from "./responses/deliver_response_v2"
import { Broker, MetadataResponse, StreamMetadata } from "./responses/metadata_response"
import { PartitionsResponse } from "./responses/partitions_response"
import { RouteResponse } from "./responses/route_response"
Expand All @@ -34,14 +42,6 @@ import { UnsubscribeResponse } from "./responses/unsubscribe_response"
import { SuperStreamConsumer } from "./super_stream_consumer"
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util"
import { CreateSuperStreamRequest } from "./requests/create_super_stream_request"
import { CreateSuperStreamResponse } from "./responses/create_super_stream_response"
import { DeleteSuperStreamResponse } from "./responses/delete_super_stream_response"
import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request"
import { lt, coerce } from "semver"
import { ConnectionInfo, Connection, errorMessageOf } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { DeliverResponseV2 } from "./responses/deliver_response_v2"

export type ConnectionClosedListener = (hadError: boolean) => void

Expand Down Expand Up @@ -148,6 +148,12 @@ export class Client {
logger: this.logger,
}
const publisher = new StreamPublisher(streamPublisherParams, filter)
connection.on("metadata_update", async (metadata) => {
if (metadata.metadataInfo.stream === publisher.streamName) {
await publisher.close(false)
this.publishers.delete(publisherId)
}
})
this.publishers.set(publisherId, { publisher, connection, params, filter })
this.logger.info(
`New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}`
Expand Down Expand Up @@ -181,27 +187,35 @@ export class Client {
{ connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset },
params.filter
)
connection.on("metadata_update", async (metadata) => {
if (metadata.metadataInfo.stream === consumer.streamName) {
if (params.connectionClosedListener) {
params.connectionClosedListener(false)
}
await this.closeConsumer(consumerId)
}
})
this.consumers.set(consumerId, { connection, consumer, params })
await this.declareConsumerOnConnection(params, consumerId, this.connection)
await this.declareConsumerOnConnection(params, consumerId, connection)
this.logger.info(
`New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
)
return consumer
}

public async closeConsumer(consumerId: number) {
const consumer = this.consumers.get(consumerId)?.consumer
const { consumer, connection } = this.consumers.get(consumerId) ?? { consumer: undefined, connection: undefined }

if (!consumer) {
this.logger.error("Consumer does not exist")
throw new Error(`Consumer with id: ${consumerId} does not exist`)
}
const res = await this.connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
await consumer.close(true)
this.consumers.delete(consumerId)
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
await consumer.close(true)
this.consumers.delete(consumerId)
this.logger.info(`Closed consumer with id: ${consumerId}`)
return res.ok
}
Expand Down Expand Up @@ -254,10 +268,6 @@ export class Client {
return Array.from(this.consumers.values())
}

public send(cmd: Request): Promise<void> {
return this.connection.send(cmd)
}

public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise<true> {
this.logger.debug(`Create Stream...`)
const res = await this.connection.sendAndWait<CreateStreamResponse>(new CreateStreamRequest(params))
Expand Down Expand Up @@ -374,7 +384,7 @@ export class Client {
}
uniqueConnectionIds.add(connection.id)
const consumerParams = { ...params, offset: consumer.localOffset }
await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, this.connection)
await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection)
}

for (const { publisher, connection, params, filter } of this.publishers.values()) {
Expand Down Expand Up @@ -461,8 +471,8 @@ export class Client {
}
}

private askForCredit(params: CreditRequestParams): Promise<void> {
return this.send(new CreditRequest({ ...params }))
private askForCredit(params: CreditRequestParams, connection: Connection): Promise<void> {
return connection.send(new CreditRequest({ ...params }))
}

private incPublisherId() {
Expand All @@ -479,28 +489,34 @@ export class Client {

private getDeliverV1Callback() {
return async (response: DeliverResponse) => {
const consumer = this.consumers.get(response.subscriptionId)?.consumer
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
consumer: undefined,
connection: undefined,
}
if (!consumer) {
this.logger.error(`On deliverV1 no consumer found`)
return
}
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
this.logger.debug(`response.messages.length: ${response.messages.length}`)
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
response.messages.map((x) => consumer.handle(x))
}
}

private getDeliverV2Callback() {
return async (response: DeliverResponseV2) => {
const consumer = this.consumers.get(response.subscriptionId)?.consumer
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
consumer: undefined,
connection: undefined,
}
if (!consumer) {
this.logger.error(`On deliverV2 no consumer found`)
return
}
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
this.logger.debug(`response.messages.length: ${response.messages.length}`)
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
if (consumer.filter) {
response.messages.filter((x) => consumer.filter?.postFilterFunc(x)).map((x) => consumer.handle(x))
return
Expand All @@ -511,13 +527,16 @@ export class Client {

private getConsumerUpdateCallback() {
return async (response: ConsumerUpdateQuery) => {
const consumer = this.consumers.get(response.subscriptionId)?.consumer
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
consumer: undefined,
connection: undefined,
}
if (!consumer) {
this.logger.error(`On consumer_update_query no consumer found`)
return
}
this.logger.debug(`on consumer_update_query -> ${consumer.consumerRef}`)
await this.send(
await connection.send(
new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset: consumer.offset })
)
}
Expand Down
5 changes: 4 additions & 1 deletion src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ export class Connection {
}

public static connect(params: ConnectionParams, logger: Logger): Promise<Connection> {
return new Connection(params, logger).start()
const connection = Connection.create(params, logger)
return connection.start()
}

public static create(params: ConnectionParams, logger: Logger): Connection {
Expand Down Expand Up @@ -510,6 +511,8 @@ export function errorMessageOf(code: number): string {
switch (code) {
case 0x02:
return "Stream does not exist"
case 0x04:
return "Subscription ID does not exist"
case 0x06:
return "Stream not available"
case 0x12:
Expand Down
4 changes: 4 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ export class StreamConsumer implements Consumer {
}
return handle
}

public get streamName(): string {
return this.stream
}
}
22 changes: 17 additions & 5 deletions src/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { inspect } from "util"
import { messageSize } from "./amqp10/encoder"
import { CompressionType } from "./compression"
import { Connection, ConnectionInfo } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { Logger } from "./logger"
import { FrameSizeException } from "./requests/frame_size_exception"
import { PublishRequest, PublishRequestMessage } from "./requests/publish_request"
import { PublishRequestV2 } from "./requests/publish_request_v2"
import { SubEntryBatchPublishRequest } from "./requests/sub_entry_batch_publish_request"
import { PublishConfirmResponse } from "./responses/publish_confirm_response"
import { PublishErrorResponse } from "./responses/publish_error_response"
import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util"
import { MetadataUpdateListener } from "./response_decoder"
import { ConnectionInfo, Connection } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { PublishRequestV2 } from "./requests/publish_request_v2"

export type MessageApplicationProperties = Record<string, string | number>
export type MessageAnnotations = Record<string, MessageAnnotationsValue>
Expand Down Expand Up @@ -82,6 +82,7 @@ export interface Publisher {
getLastPublishingId(): Promise<bigint>
getConnectionInfo(): ConnectionInfo
close(manuallyClose: boolean): Promise<void>
closed: boolean
ref: string
readonly publisherId: number
}
Expand All @@ -101,7 +102,7 @@ export class StreamPublisher implements Publisher {
private scheduled: NodeJS.Immediate | null
private logger: Logger
private maxChunkLength: number
private closed = false
private _closed = false

constructor(
params: {
Expand Down Expand Up @@ -130,7 +131,14 @@ export class StreamPublisher implements Publisher {
this.connection.incrRefCount()
}

public get closed(): boolean {
return this._closed
}

async send(message: Buffer, opts: MessageOptions = {}): Promise<SendResult> {
if (this._closed) {
throw new Error(`Publisher has been closed`)
}
if (this.boot && this.publishingId === -1n) {
this.publishingId = await this.getLastPublishingId()
}
Expand Down Expand Up @@ -203,7 +211,11 @@ export class StreamPublisher implements Publisher {
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
}
}
this.closed = true
this._closed = true
}

public get streamName(): string {
return this.stream
}

private async enqueue(publishRequestMessage: PublishRequestMessage) {
Expand Down
9 changes: 2 additions & 7 deletions test/e2e/address_resolver.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { expect } from "chai"
import { Client, connect } from "../../src"
import { Message } from "../../src/publisher"
import { Offset } from "../../src/requests/subscribe_request"
import { getAddressResolverFromEnv } from "../../src/util"
import { createStreamName } from "../support/fake_data"
Expand Down Expand Up @@ -41,15 +40,11 @@ describe("address resolver", () => {
})

it("declaring a consumer - should not throw", async () => {
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => {
console.log("Message received")
})
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null)
})

it("declaring a consumer - if multiple nodes are present the consumer should be connected to a replica", async () => {
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_: Message) => {
console.log("Message received")
})
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null)

const connectionInfo = consumer.getConnectionInfo()
const queueInfo = await rabbit.getQueueInfo(streamName)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/close_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe("close consumer", () => {

it("closing a consumer in an existing stream", async () => {
await client.declarePublisher({ stream: testStreamName })
const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, console.log)
const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, () => null)

const response = await client.closeConsumer(consumer.consumerId)

Expand Down
7 changes: 2 additions & 5 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe("declare consumer", () => {
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
}).timeout(10000)

it("declaring a single active consumer on an existing stream and a simple one - the active of the group and the simple should handle the message", async () => {
it("declaring a single active consumer on an existing stream and a simple one - the active of the group and the simple should handle the message", async () => {
const messages: Buffer[] = []
const consumerRef = createConsumerRef()

Expand Down Expand Up @@ -192,10 +192,7 @@ describe("declare consumer", () => {

it("declaring a consumer on a non-existing stream should raise an error", async () => {
await expectToThrowAsync(
() =>
client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, (message: Message) =>
console.log(message.content)
),
() => client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, () => null),
Error,
"Stream was not found on any node"
)
Expand Down
14 changes: 13 additions & 1 deletion test/e2e/declare_publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from "chai"
import { Client } from "../../src"
import { createClient, createPublisher, createStreamName } from "../support/fake_data"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { eventually, expectToThrowAsync, username, password } from "../support/util"
import { eventually, expectToThrowAsync, username, password, wait } from "../support/util"
import { getMaxSharedConnectionInstances } from "../../src/util"
import { randomUUID } from "crypto"

Expand Down Expand Up @@ -69,6 +69,18 @@ describe("declare publisher", () => {
)
})

it("if the server deletes the stream, the publisher gets closed", async () => {
const publisher = await createPublisher(streamName, client)
await rabbit.deleteStream(streamName)
await wait(500)

await expectToThrowAsync(
() => publisher.send(Buffer.from(`test${randomUUID()}`)),
Error,
"Publisher has been closed"
)
})

it("publishers for the same stream should share the underlying connection", async () => {
const publisher1 = await createPublisher(streamName, client)
const publisher2 = await createPublisher(streamName, client)
Expand Down
Loading