diff --git a/src/client.ts b/src/client.ts index 201251e5..d73c7570 100644 --- a/src/client.ts +++ b/src/client.ts @@ -407,7 +407,7 @@ export class Client { await connection.restart() } uniqueConnectionIds.add(connection.connectionId) - const consumerParams = { ...params, offset: consumer.localOffset } + const consumerParams = { ...params, offset: Offset.offset(consumer.getOffset()) } await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection) } diff --git a/src/consumer.ts b/src/consumer.ts index d6f09f56..42a61b49 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -23,9 +23,9 @@ export interface Consumer { /** * Store the stream offset on the server * - * @param {bigint} offsetValue - The value of the offset to save + * @param {bigint} offsetValue - The value of the offset to save, if not specified the local offset is used */ - storeOffset(offsetValue: bigint): Promise + storeOffset(offsetValue?: bigint): Promise /** * Get the saved offset on the server @@ -34,6 +34,11 @@ export interface Consumer { */ queryOffset(): Promise + /** + * Get the stream local offset + */ + getOffset(): bigint + /** * Gets the infos of the publisher's connection * @@ -102,9 +107,10 @@ export class StreamConsumer implements Consumer { await this.pool.releaseConnection(this.connection) } - public storeOffset(offsetValue: bigint): Promise { + public storeOffset(offsetValue?: bigint): Promise { if (!this.consumerRef) throw new Error("ConsumerReference must be defined in order to use this!") - return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue }) + const offset = offsetValue ? offsetValue : this.clientLocalOffset.value ?? 0n + return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue: offset }) } public queryOffset(): Promise { @@ -112,15 +118,15 @@ export class StreamConsumer implements Consumer { return this.connection.queryOffset({ stream: this.stream, reference: this.consumerRef }) } + getOffset(): bigint { + return this.clientLocalOffset.value ?? 0n + } + public getConnectionInfo(): ConnectionInfo { const { host, port, id, readable, localPort, ready, vhost } = this.connection.getConnectionInfo() return { host, port, id, readable, localPort, ready, vhost } } - public get localOffset() { - return this.clientLocalOffset.clone() - } - public async handle(message: Message) { if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return await this.consumerHandle(message) diff --git a/test/e2e/consumer_offset.test.ts b/test/e2e/consumer_offset.test.ts new file mode 100644 index 00000000..0346c5d2 --- /dev/null +++ b/test/e2e/consumer_offset.test.ts @@ -0,0 +1,78 @@ +import { expect } from "chai" +import { Offset } from "../../src" +import { Message } from "../../src/publisher" +import { createClient, createStreamName } from "../support/fake_data" +import { eventually, password, username } from "../support/util" + +describe("Consumer Offset", () => { + it("test start and stop", async () => { + const client = await createClient(username, password) + + const streamName = createStreamName() + await client.createStream({ stream: streamName, arguments: {} }) + + let onIncomingMessageCalls = 0 + const onIncomingMessage = async (msg: Message) => { + console.log(msg.content.toString("utf-8")) + console.log(msg.offset) + onIncomingMessageCalls++ + return + } + const referenceName = "ref" + const consumer = await client.declareConsumer( + { + stream: streamName, + offset: Offset.offset(0n), + consumerRef: referenceName, + }, + onIncomingMessage + ) + const publisher = await client.declarePublisher({ stream: streamName }) + await publisher.send(Buffer.from("Hello1")) + await eventually(async () => { + expect(onIncomingMessageCalls).to.eql(1) + }) + + const localOffset = consumer.getOffset() + if (localOffset === undefined) { + throw new Error("localOffset is undefined") + } + + // Perhaps there may be an option to upload the offset to the server directly from the consumer's internal store? Instead of having to fetch for it and then retrieve it + await consumer.storeOffset(localOffset) + await consumer.close(false) + + await publisher.send(Buffer.from("Hello2")) + await publisher.send(Buffer.from("Hello3")) + + const lastMessageOffset = await client.queryOffset({ + stream: streamName, + reference: referenceName, + }) + expect(lastMessageOffset).to.eql(0n) + + let resumedOnIncomingMessageCalls = 0 + let offset: bigint | undefined = 0n + const resumedOnIncomingMessage = async (msg: Message) => { + console.log("Resumed ", msg.content.toString("utf-8")) + offset = msg.offset + resumedOnIncomingMessageCalls++ + return + } + + const resumedConsumer = await client.declareConsumer( + { + stream: streamName, + offset: Offset.offset(lastMessageOffset + 1n), + consumerRef: referenceName, + }, + resumedOnIncomingMessage + ) + await eventually(async () => { + expect(resumedOnIncomingMessageCalls).to.eql(2) + }) + expect(resumedConsumer.getOffset()).to.eql(offset) + + await publisher.close(false) + }) +}) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 95bd2104..645dd904 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -239,7 +239,7 @@ describe("declare consumer", () => { async (message: Message) => { messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`) if (messagesFromFirstConsumer.length === 50) { - await consumer1.storeOffset(message.offset!) + await consumer1.storeOffset(message.offset) } } ) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 8a79db85..000b415f 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -179,7 +179,7 @@ describe("offset", () => { const consumer = await client.declareConsumer( { stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() }, async (message: Message) => { - await consumer.storeOffset(message.offset!) + await consumer.storeOffset(message.offset) offset = message.offset! } ) @@ -194,6 +194,26 @@ describe("offset", () => { }) }).timeout(10000) + it("saving the offset of a stream correctly without specifying it ", async () => { + let offset: bigint = 0n + const consumer = await client.declareConsumer( + { stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() }, + async (message: Message) => { + offset = message.offset! + } + ) + const publisher = await client.declarePublisher({ stream: testStreamName }) + + await publisher.send(Buffer.from("hello")) + await publisher.send(Buffer.from("world")) + + await eventually(async () => { + await consumer.storeOffset() + const result = await consumer.queryOffset() + expect(result).eql(offset) + }) + }).timeout(10000) + it("declaring a consumer without consumerRef and saving the store offset should rise an error", async () => { const consumer = await client.declareConsumer( { stream: testStreamName, offset: Offset.first() }, @@ -224,7 +244,7 @@ describe("offset", () => { async (message: Message) => { consumerOneMessages.push(message) if (message.content.toString() === "marker") { - await consumer.storeOffset(message.offset!) + await consumer.storeOffset(message.offset) } } )