Skip to content

Commit

Permalink
Merge pull request #78 from moleculerjs/issue-74
Browse files Browse the repository at this point in the history
don't destroy consumer when there are pending messages
  • Loading branch information
icebob authored Apr 21, 2024
2 parents 263b853 + 8c3eb6e commit a7364bb
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 9 deletions.
58 changes: 58 additions & 0 deletions examples/redis-pending/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"use strict";

// Adapted from: https://github.com/moleculerjs/moleculer-channels/issues/74

const { ServiceBroker } = require("moleculer");
const ChannelsMiddleware = require("../../types").Middleware;
const broker = new ServiceBroker({
namespace: "test",
nodeID: "test1",
transporter: "TCP",
middlewares: [
ChannelsMiddleware({
adapter: "redis://127.0.0.1:6379"
})
]
});

const serviceSchema = {
name: "subscriber",
channels: {
"order.created": {
group: "mygroup",
redis: {
minIdleTime: 1000,
claimInterval: 1,
startID: "0"
},
maxRetries: 100,
handler(payload) {
this.logger.info("Received order.created event", payload);
throw new Error();
}
}
}
};
broker.createService(serviceSchema);

// Start the Moleculer broker
broker.start().then(async () => {
try {
broker.repl();

for (let i = 0; i < 10; i++) {
await broker.sendToChannel("order.created", { id: i, items: "test" });

await broker.Promise.delay(100);
}

await broker.destroyService("subscriber");

setTimeout(() => {
broker.logger.info("Recreate service");
broker.createService(serviceSchema);
}, 10000);
} catch (error) {
console.log(error);
}
});
35 changes: 26 additions & 9 deletions src/adapters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,15 +505,32 @@ class RedisAdapter extends BaseAdapter {
})
.then(() => {
const pubClient = this.clients.get(this.pubName);
// 1. Delete consumer from the consumer group
// 2. Do NOT destroy the consumer group
// https://redis.io/commands/XGROUP
return pubClient.xgroup(
"DELCONSUMER",
chan.name, // Stream Name
chan.group, // Consumer Group
chan.id // Consumer ID
);
return pubClient
.xpending(
chan.name,
chan.group,
"-", // Start
"+", // End
10 // Max reported entries
)
.then(pending => {
if (pending.length !== 0) {
// Don't destroy the consumer group if there are pending messages
// It might come back online in the future and process the messages
// More info: https://github.com/moleculerjs/moleculer-channels/issues/74
return;
}

// 1. Delete consumer from the consumer group
// 2. Do NOT destroy the consumer group
// https://redis.io/commands/XGROUP
return pubClient.xgroup(
"DELCONSUMER",
chan.name, // Stream Name
chan.group, // Consumer Group
chan.id // Consumer ID
);
});
})
.then(() => resolve())
.catch(err => reject(err));
Expand Down

0 comments on commit a7364bb

Please sign in to comment.