Skip to content

Commit

Permalink
Merge pull request #70 from moleculerjs/xaddMaxLen
Browse files Browse the repository at this point in the history
Support Redis capped streams
  • Loading branch information
icebob authored Aug 5, 2023
2 parents 71a833a + 42837c1 commit feb2a9c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
{
"type": "node",
"request": "launch",
"name": "Launch selected demo",
"name": "Launch demo",
"program": "examples/index.js",
"cwd": "${workspaceRoot}",
"args": [
"context"
"simple"
],
"console": "integratedTerminal",
"env": {
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Use the `broker.sendToChannel(channelName, payload, opts)` method to send a mess
| `acks` | `Number` | Kafka | Control the number of required acks. |
| `timeout` | `Number` | Kafka | The time to await a response in ms. Default: `30000` |
| `compression` | `any` | Kafka | Compression codec. Default: `CompressionTypes.None` |
| `xaddMaxLen` | `Number` or `String` | Redis | Define `MAXLEN` for `XADD` command |

## Middleware hooks

Expand Down Expand Up @@ -621,6 +622,23 @@ module.exports = {
};
```

#### Capped Streams

To support Redis ["capped streams"](https://redis.io/docs/data-types/streams-tutorial/#capped-streams), you can define the `MAXLEN` value in `sendToChannel` options as `xaddMaxLen`. It can be a number or a string with `~` prefix like `~1000`. It will be transformed to `...MAXLEN ~ 1000 ...`

**Example**

```js
broker.sendToChannel("order.created", {
id: 1234,
items: [
/*...*/
]
},{
xaddMaxLen: "~1000"
});
```

### AMQP (RabbitMQ)

The AMQP adapter uses the exchange-queue logic of RabbitMQ for creating consumer groups. It means the `sendToChannel` method sends the message to the exchange and not for a queue.
Expand Down
4 changes: 2 additions & 2 deletions examples/simple/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const broker = new ServiceBroker({
},
middlewares: [
ChannelsMiddleware({
adapter: process.env.ADAPTER || "Fake"
adapter: process.env.ADAPTER || "redis://localhost:6379"
})
],
replCommands: [
Expand All @@ -33,7 +33,7 @@ const broker = new ServiceBroker({
count: ++c,
pid: process.pid
},
{ key: "" + c, headers: { a: "something" } }
{ key: "" + c, headers: { a: "something" }, xaddMaxLen: "~10" }
);
}
},
Expand Down
22 changes: 17 additions & 5 deletions src/adapters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,27 @@ class RedisAdapter extends BaseAdapter {

try {
let args = [
channelName, // Stream name
"*", // Auto ID
"payload", // Entry
opts.raw ? payload : this.serializer.serialize(payload) // Actual payload
channelName // Stream name
];

// Support Capped Streams. More info: https://redis.io/docs/data-types/streams-tutorial/#capped-streams
if (opts && opts.xaddMaxLen) {
const maxLen = "" + opts.xaddMaxLen;
if (maxLen.startsWith("~")) {
args.push("MAXLEN", "~", maxLen.substring(1));
} else {
args.push("MAXLEN", maxLen);
}
}

// Auto ID
args.push("*");
// Add payload
args.push("payload", opts.raw ? payload : this.serializer.serialize(payload));

// Add headers
if (opts.headers) {
args.push(...["headers", this.serializer.serialize(opts.headers)]);
args.push("headers", this.serializer.serialize(opts.headers));
}

// https://redis.io/commands/XADD
Expand Down

0 comments on commit feb2a9c

Please sign in to comment.