From 1ae862b1a79a3ef0195cda9adca36bb35c11d1d9 Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 18 Jul 2024 12:37:50 +0200 Subject: [PATCH 1/4] chore: add an offset tracking example --- example/src/offset_tracking_example.js | 68 ++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 example/src/offset_tracking_example.js diff --git a/example/src/offset_tracking_example.js b/example/src/offset_tracking_example.js new file mode 100644 index 00000000..2b60d103 --- /dev/null +++ b/example/src/offset_tracking_example.js @@ -0,0 +1,68 @@ +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const streamName = `example-${randomUUID()}` + console.log(`Creating stream ${streamName}`) + + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, + }) + await client.createStream({ stream: streamName, arguments: {} }) + const publisher = await client.declarePublisher({ stream: streamName }) + const totalMessages = 100 + + console.log(`Publishing ${totalMessages} messages`) + for (let i = 0; i < totalMessages; i++) { + const messageBody = i === totalMessages - 1 ? "marker" : `hello ${i}` + await publisher.send(Buffer.from(messageBody)) + } + + let initialOffset = rabbit.Offset.offset(0n) + let firstOffset = initialOffset.value + let lastOffset = initialOffset.value + let messageCount = 0 + const consumerRef = "offset-tracking-consumer" + const consumer = await client.declareConsumer( + { stream: streamName, offset: initialOffset, consumerRef }, + (message) => { + messageCount++ + if (message.offset === initialOffset.value) { + console.log("First message received") + } + if (messageCount % 10 === 0) { + console.log("Storing offset") + client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) + } + if (message.content.toString() === "marker") { + console.log("Marker found") + client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) + lastOffset = message.offset + } + } + ) + + console.log(`Start consuming...`) + await sleep(2000) + console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`) + const lastStoredOffset = await consumer.queryOffset() + console.log(`Last stored offset was ${lastStoredOffset}`) + + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) From 7c94262baf37ae78ced1c4d3068fea5185d2a736 Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 18 Jul 2024 15:07:43 +0200 Subject: [PATCH 2/4] chore: pr fixes --- example/src/offset_tracking_example.js | 38 ++++++++++++-------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/example/src/offset_tracking_example.js b/example/src/offset_tracking_example.js index 2b60d103..0a56a0b7 100644 --- a/example/src/offset_tracking_example.js +++ b/example/src/offset_tracking_example.js @@ -26,29 +26,27 @@ async function main() { await publisher.send(Buffer.from(messageBody)) } - let initialOffset = rabbit.Offset.offset(0n) - let firstOffset = initialOffset.value - let lastOffset = initialOffset.value + const startFrom = rabbit.Offset.offset(0n) + let firstOffset = startFrom.value + let lastOffset = startFrom.value let messageCount = 0 const consumerRef = "offset-tracking-consumer" - const consumer = await client.declareConsumer( - { stream: streamName, offset: initialOffset, consumerRef }, - (message) => { - messageCount++ - if (message.offset === initialOffset.value) { - console.log("First message received") - } - if (messageCount % 10 === 0) { - console.log("Storing offset") - client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) - } - if (message.content.toString() === "marker") { - console.log("Marker found") - client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) - lastOffset = message.offset - } + const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom, consumerRef }, (message) => { + messageCount++ + if (message.offset === startFrom.value) { + console.log("First message received") + firstOffset = message.offset } - ) + if (messageCount % 10 === 0) { + console.log("Storing offset") + client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) + } + if (message.content.toString() === "marker") { + console.log("Marker found") + client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) + lastOffset = message.offset + } + }) console.log(`Start consuming...`) await sleep(2000) From 46b2d2da0441d5c0c3fee453352eb8944048f99d Mon Sep 17 00:00:00 2001 From: magne Date: Fri, 19 Jul 2024 16:24:26 +0200 Subject: [PATCH 3/4] chore: changing variables names --- example/src/offset_tracking_example.js | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/example/src/offset_tracking_example.js b/example/src/offset_tracking_example.js index 0a56a0b7..0d463cac 100644 --- a/example/src/offset_tracking_example.js +++ b/example/src/offset_tracking_example.js @@ -1,11 +1,10 @@ const rabbit = require("rabbitmq-stream-js-client") -const { randomUUID } = require("crypto") const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" async function main() { - const streamName = `example-${randomUUID()}` + const streamName = `stream-offset-tracking-javascript` console.log(`Creating stream ${streamName}`) const client = await rabbit.connect({ @@ -18,26 +17,26 @@ async function main() { }) await client.createStream({ stream: streamName, arguments: {} }) const publisher = await client.declarePublisher({ stream: streamName }) - const totalMessages = 100 + const messageCount = 100 - console.log(`Publishing ${totalMessages} messages`) - for (let i = 0; i < totalMessages; i++) { - const messageBody = i === totalMessages - 1 ? "marker" : `hello ${i}` - await publisher.send(Buffer.from(messageBody)) + console.log(`Publishing ${messageCount} messages`) + for (let i = 0; i < messageCount; i++) { + const body = i === messageCount - 1 ? "marker" : `hello ${i}` + await publisher.send(Buffer.from(body)) } const startFrom = rabbit.Offset.offset(0n) let firstOffset = startFrom.value let lastOffset = startFrom.value - let messageCount = 0 - const consumerRef = "offset-tracking-consumer" + let messageReceivedCount = 0 + const consumerRef = "offset-tracking-tutorial" const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom, consumerRef }, (message) => { - messageCount++ + messageReceivedCount++ if (message.offset === startFrom.value) { console.log("First message received") firstOffset = message.offset } - if (messageCount % 10 === 0) { + if (messageReceivedCount % 10 === 0) { console.log("Storing offset") client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) } From 948b3519eec78dec30b618990340bb1f43a9791a Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 19 Aug 2024 18:02:33 +0200 Subject: [PATCH 4/4] chore: fix offset tracking example --- example/src/offset_tracking_example.js | 61 ++++++++++++++------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/example/src/offset_tracking_example.js b/example/src/offset_tracking_example.js index 0d463cac..943a1b21 100644 --- a/example/src/offset_tracking_example.js +++ b/example/src/offset_tracking_example.js @@ -17,43 +17,48 @@ async function main() { }) await client.createStream({ stream: streamName, arguments: {} }) const publisher = await client.declarePublisher({ stream: streamName }) - const messageCount = 100 + const toSend = 100 - console.log(`Publishing ${messageCount} messages`) - for (let i = 0; i < messageCount; i++) { - const body = i === messageCount - 1 ? "marker" : `hello ${i}` + console.log(`Publishing ${toSend} messages`) + for (let i = 0; i < toSend; i++) { + const body = i === toSend - 1 ? "marker" : `hello ${i}` await publisher.send(Buffer.from(body)) } - const startFrom = rabbit.Offset.offset(0n) - let firstOffset = startFrom.value - let lastOffset = startFrom.value - let messageReceivedCount = 0 const consumerRef = "offset-tracking-tutorial" - const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom, consumerRef }, (message) => { - messageReceivedCount++ - if (message.offset === startFrom.value) { - console.log("First message received") - firstOffset = message.offset - } - if (messageReceivedCount % 10 === 0) { - console.log("Storing offset") - client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) - } - if (message.content.toString() === "marker") { - console.log("Marker found") - client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset }) - lastOffset = message.offset + let firstOffset = undefined + let offsetSpecification = rabbit.Offset.first() + try { + const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }) + offsetSpecification = rabbit.Offset.offset(offset + 1n) + } catch (e) {} + + let lastOffset = offsetSpecification.value + let messageCount = 0 + const consumer = await client.declareConsumer( + { stream: streamName, offset: offsetSpecification, consumerRef }, + async (message) => { + messageCount++ + if (!firstOffset && messageCount === 1) { + firstOffset = message.offset + console.log("First message received") + } + if (messageCount % 10 === 0) { + await consumer.storeOffset(message.offset) + } + if (message.content.toString() === "marker") { + console.log("Marker found") + lastOffset = message.offset + await consumer.storeOffset(message.offset) + console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`) + await consumer.close(true) + process.exit(0) + } } - }) + ) console.log(`Start consuming...`) await sleep(2000) - console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`) - const lastStoredOffset = await consumer.queryOffset() - console.log(`Last stored offset was ${lastStoredOffset}`) - - await client.close() } main()