Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ export class Client {
await connection.restart()
}
uniqueConnectionIds.add(connection.connectionId)
const consumerParams = { ...params, offset: consumer.localOffset }
const consumerParams = { ...params, offset: Offset.offset(consumer.getOffset()) }
await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection)
}

Expand Down
22 changes: 14 additions & 8 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ export interface Consumer {
/**
* Store the stream offset on the server
*
* @param {bigint} offsetValue - The value of the offset to save
* @param {bigint} offsetValue - The value of the offset to save, if not specified the local offset is used
*/
storeOffset(offsetValue: bigint): Promise<void>
storeOffset(offsetValue?: bigint): Promise<void>

/**
* Get the saved offset on the server
Expand All @@ -34,6 +34,11 @@ export interface Consumer {
*/
queryOffset(): Promise<bigint>

/**
* Get the stream local offset
*/
getOffset(): bigint

/**
* Gets the infos of the publisher's connection
*
Expand Down Expand Up @@ -102,25 +107,26 @@ export class StreamConsumer implements Consumer {
await this.pool.releaseConnection(this.connection)
}

public storeOffset(offsetValue: bigint): Promise<void> {
public storeOffset(offsetValue?: bigint): Promise<void> {
if (!this.consumerRef) throw new Error("ConsumerReference must be defined in order to use this!")
return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue })
const offset = offsetValue ? offsetValue : this.clientLocalOffset.value ?? 0n
return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue: offset })
}

public queryOffset(): Promise<bigint> {
if (!this.consumerRef) throw new Error("ConsumerReference must be defined in order to use this!")
return this.connection.queryOffset({ stream: this.stream, reference: this.consumerRef })
}

getOffset(): bigint {
return this.clientLocalOffset.value ?? 0n
}

public getConnectionInfo(): ConnectionInfo {
const { host, port, id, readable, localPort, ready, vhost } = this.connection.getConnectionInfo()
return { host, port, id, readable, localPort, ready, vhost }
}

public get localOffset() {
return this.clientLocalOffset.clone()
}

public async handle(message: Message) {
if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return
await this.consumerHandle(message)
Expand Down
78 changes: 78 additions & 0 deletions test/e2e/consumer_offset.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { expect } from "chai"
import { Offset } from "../../src"
import { Message } from "../../src/publisher"
import { createClient, createStreamName } from "../support/fake_data"
import { eventually, password, username } from "../support/util"

describe("Consumer Offset", () => {
it("test start and stop", async () => {
const client = await createClient(username, password)

const streamName = createStreamName()
await client.createStream({ stream: streamName, arguments: {} })

let onIncomingMessageCalls = 0
const onIncomingMessage = async (msg: Message) => {
console.log(msg.content.toString("utf-8"))
console.log(msg.offset)
onIncomingMessageCalls++
return
}
const referenceName = "ref"
const consumer = await client.declareConsumer(
{
stream: streamName,
offset: Offset.offset(0n),
consumerRef: referenceName,
},
onIncomingMessage
)
const publisher = await client.declarePublisher({ stream: streamName })
await publisher.send(Buffer.from("Hello1"))
await eventually(async () => {
expect(onIncomingMessageCalls).to.eql(1)
})

const localOffset = consumer.getOffset()
if (localOffset === undefined) {
throw new Error("localOffset is undefined")
}

// Perhaps there may be an option to upload the offset to the server directly from the consumer's internal store? Instead of having to fetch for it and then retrieve it
await consumer.storeOffset(localOffset)
await consumer.close(false)

await publisher.send(Buffer.from("Hello2"))
await publisher.send(Buffer.from("Hello3"))

const lastMessageOffset = await client.queryOffset({
stream: streamName,
reference: referenceName,
})
expect(lastMessageOffset).to.eql(0n)

let resumedOnIncomingMessageCalls = 0
let offset: bigint | undefined = 0n
const resumedOnIncomingMessage = async (msg: Message) => {
console.log("Resumed ", msg.content.toString("utf-8"))
offset = msg.offset
resumedOnIncomingMessageCalls++
return
}

const resumedConsumer = await client.declareConsumer(
{
stream: streamName,
offset: Offset.offset(lastMessageOffset + 1n),
consumerRef: referenceName,
},
resumedOnIncomingMessage
)
await eventually(async () => {
expect(resumedOnIncomingMessageCalls).to.eql(2)
})
expect(resumedConsumer.getOffset()).to.eql(offset)

await publisher.close(false)
})
})
2 changes: 1 addition & 1 deletion test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ describe("declare consumer", () => {
async (message: Message) => {
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
if (messagesFromFirstConsumer.length === 50) {
await consumer1.storeOffset(message.offset!)
await consumer1.storeOffset(message.offset)
}
}
)
Expand Down
24 changes: 22 additions & 2 deletions test/e2e/offset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ describe("offset", () => {
const consumer = await client.declareConsumer(
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() },
async (message: Message) => {
await consumer.storeOffset(message.offset!)
await consumer.storeOffset(message.offset)
offset = message.offset!
}
)
Expand All @@ -194,6 +194,26 @@ describe("offset", () => {
})
}).timeout(10000)

it("saving the offset of a stream correctly without specifying it ", async () => {
let offset: bigint = 0n
const consumer = await client.declareConsumer(
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() },
async (message: Message) => {
offset = message.offset!
}
)
const publisher = await client.declarePublisher({ stream: testStreamName })

await publisher.send(Buffer.from("hello"))
await publisher.send(Buffer.from("world"))

await eventually(async () => {
await consumer.storeOffset()
const result = await consumer.queryOffset()
expect(result).eql(offset)
})
}).timeout(10000)

it("declaring a consumer without consumerRef and saving the store offset should rise an error", async () => {
const consumer = await client.declareConsumer(
{ stream: testStreamName, offset: Offset.first() },
Expand Down Expand Up @@ -224,7 +244,7 @@ describe("offset", () => {
async (message: Message) => {
consumerOneMessages.push(message)
if (message.content.toString() === "marker") {
await consumer.storeOffset(message.offset!)
await consumer.storeOffset(message.offset)
}
}
)
Expand Down