From 8ab27091237fe6d4d27b5050abd5b76605bed3c6 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Mon, 21 Jul 2025 16:29:38 +0300 Subject: [PATCH 1/7] chore: update Redis version from 8.2-RC1-pre to 8.2-rc1 --- .github/workflows/tests.yml | 2 +- packages/bloom/lib/test-utils.ts | 2 +- packages/client/lib/sentinel/test-util.ts | 2 +- packages/client/lib/test-utils.ts | 2 +- packages/entraid/lib/test-utils.ts | 2 +- packages/json/lib/test-utils.ts | 2 +- packages/search/lib/test-utils.ts | 2 +- packages/time-series/lib/test-utils.ts | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 89efdb61114..df8cb1d1b6c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -22,7 +22,7 @@ jobs: fail-fast: false matrix: node-version: ["18", "20", "22"] - redis-version: ["rs-7.2.0-v13", "rs-7.4.0-v1", "8.0.2", "8.2-M01-pre"] + redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2-rc1"] steps: - uses: actions/checkout@v4 with: diff --git a/packages/bloom/lib/test-utils.ts b/packages/bloom/lib/test-utils.ts index 4396c94f726..268ebca8cb9 100644 --- a/packages/bloom/lib/test-utils.ts +++ b/packages/bloom/lib/test-utils.ts @@ -4,7 +4,7 @@ import RedisBloomModules from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index c8efa47f41d..7c4752a8852 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -174,7 +174,7 @@ export class SentinelFramework extends DockerBase { this.#testUtils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); this.#nodeMap = new Map>>>(); this.#sentinelMap = new Map>>>(); diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index b9b906e943c..86b6ed294ac 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -9,7 +9,7 @@ import RedisBloomModules from '@redis/bloom'; const utils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export default utils; diff --git a/packages/entraid/lib/test-utils.ts b/packages/entraid/lib/test-utils.ts index 3c561d4ba44..e5d977a6b40 100644 --- a/packages/entraid/lib/test-utils.ts +++ b/packages/entraid/lib/test-utils.ts @@ -6,7 +6,7 @@ import { EntraidCredentialsProvider } from './entraid-credentials-provider'; export const testUtils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); const DEBUG_MODE_ARGS = testUtils.isVersionGreaterThan([7]) ? diff --git a/packages/json/lib/test-utils.ts b/packages/json/lib/test-utils.ts index 6b6859d61bf..cba2d95e737 100644 --- a/packages/json/lib/test-utils.ts +++ b/packages/json/lib/test-utils.ts @@ -4,7 +4,7 @@ import RedisJSON from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/search/lib/test-utils.ts b/packages/search/lib/test-utils.ts index a2b9c816da1..d4d91307b99 100644 --- a/packages/search/lib/test-utils.ts +++ b/packages/search/lib/test-utils.ts @@ -5,7 +5,7 @@ import { RespVersions } from '@redis/client'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/time-series/lib/test-utils.ts b/packages/time-series/lib/test-utils.ts index 8a664ee8df2..9c59918e705 100644 --- a/packages/time-series/lib/test-utils.ts +++ b/packages/time-series/lib/test-utils.ts @@ -4,7 +4,7 @@ import TimeSeries from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { From d39148cbe8ed119c232481ed5b24aa84619cf112 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 24 Jul 2025 14:11:03 +0300 Subject: [PATCH 2/7] feat: implement XDELEX command for Redis 8.2 --- packages/client/lib/commands/XDELEX.spec.ts | 157 ++++++++++++++++++++ packages/client/lib/commands/XDELEX.ts | 59 ++++++++ packages/client/lib/commands/index.ts | 3 + 3 files changed, 219 insertions(+) create mode 100644 packages/client/lib/commands/XDELEX.spec.ts create mode 100644 packages/client/lib/commands/XDELEX.ts diff --git a/packages/client/lib/commands/XDELEX.spec.ts b/packages/client/lib/commands/XDELEX.spec.ts new file mode 100644 index 00000000000..5e83efca730 --- /dev/null +++ b/packages/client/lib/commands/XDELEX.spec.ts @@ -0,0 +1,157 @@ +import { strict as assert } from "node:assert"; +import XDELEX, { XDELEX_REPLY_CODES, XDelexPolicy } from "./XDELEX"; +import { parseArgs } from "./generic-transformers"; +import testUtils, { GLOBAL } from "../test-utils"; + +describe("XDELEX", () => { + describe("transformArguments", () => { + it("string - without policy", () => { + assert.deepEqual(parseArgs(XDELEX, "key", "0-0"), [ + "XDELEX", + "key", + "IDS", + "1", + "0-0", + ]); + }); + + it("string - with policy", () => { + assert.deepEqual(parseArgs(XDELEX, "key", "0-0", XDelexPolicy.KEEPREF), [ + "XDELEX", + "key", + "KEEPREF", + "IDS", + "1", + "0-0", + ]); + }); + + it("array - without policy", () => { + assert.deepEqual(parseArgs(XDELEX, "key", ["0-0", "1-0"]), [ + "XDELEX", + "key", + "IDS", + "2", + "0-0", + "1-0", + ]); + }); + + it("array - with policy", () => { + assert.deepEqual( + parseArgs(XDELEX, "key", ["0-0", "1-0"], XDelexPolicy.DELREF), + ["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"] + ); + }); + }); + + testUtils.testAll( + `XDELEX non-existing key - without policy`, + async (client) => { + const reply = await client.xDelex("{tag}stream-key", "0-0"); + assert.deepEqual(reply, [XDELEX_REPLY_CODES.NOT_FOUND]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX existing key - without policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + const reply = await client.xDelex( + streamKey, + messageId, + XDelexPolicy.KEEPREF + ); + assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX existing key - with policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + const reply = await client.xDelex( + streamKey, + messageId, + XDelexPolicy.DELREF + ); + assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX acknowledge policy - with consumer group`, + async (client) => { + const streamKey = "{tag}stream-key"; + + // Add a message to the stream + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + // Create consumer group + await client.xGroupCreate(streamKey, "testgroup", "0"); + + const reply = await client.xDelex( + streamKey, + messageId, + XDelexPolicy.ACKED + ); + assert.deepEqual(reply, [XDELEX_REPLY_CODES.DANGLING_REFS]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX multiple keys`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageIds = await Promise.all([ + client.xAdd(streamKey, "*", { + field: "value1", + }), + client.xAdd(streamKey, "*", { + field: "value2", + }), + ]); + + const reply = await client.xDelex( + streamKey, + [...messageIds, "0-0"], + XDelexPolicy.DELREF + ); + assert.deepEqual(reply, [ + XDELEX_REPLY_CODES.DELETED, + XDELEX_REPLY_CODES.DELETED, + XDELEX_REPLY_CODES.NOT_FOUND, + ]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); +}); diff --git a/packages/client/lib/commands/XDELEX.ts b/packages/client/lib/commands/XDELEX.ts new file mode 100644 index 00000000000..0c6d967da21 --- /dev/null +++ b/packages/client/lib/commands/XDELEX.ts @@ -0,0 +1,59 @@ +import { CommandParser } from "../client/parser"; +import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { RedisVariadicArgument } from "./generic-transformers"; + +/** XDELEX deletion policies */ +export const XDelexPolicy = { + /** Preserve references (default) */ + KEEPREF: "KEEPREF", + /** Delete all references */ + DELREF: "DELREF", + /** Only acknowledged entries */ + ACKED: "ACKED", +} as const; + +/** XDELEX reply codes */ +export const XDELEX_REPLY_CODES = { + /** ID not found */ + NOT_FOUND: -1, + /** Entry deleted */ + DELETED: 1, + /** Dangling references */ + DANGLING_REFS: 2, +} as const; + +/** + * Deletes one or multiple entries from the stream + */ +export default { + IS_READ_ONLY: false, + /** + * Constructs the XDELEX command to delete one or multiple entries from the stream + * + * @param parser - The command parser + * @param key - The stream key + * @param id - One or more message IDs to delete + * @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) + * @returns Array of integers: -1 (not found), 1 (deleted), 2 (dangling refs) + * @see https://redis.io/commands/xdelex/ + */ + parseCommand( + parser: CommandParser, + key: RedisArgument, + id: RedisVariadicArgument, + policy?: (typeof XDelexPolicy)[keyof typeof XDelexPolicy], + ) { + parser.push("XDELEX"); + parser.pushKey(key); + + if (policy) { + parser.push(policy); + } + + parser.push("IDS"); + parser.pushVariadicWithLength(id); + }, + transformReply: undefined as unknown as () => ArrayReply< + (typeof XDELEX_REPLY_CODES)[keyof typeof XDELEX_REPLY_CODES] + >, +} as const satisfies Command; diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index 87ab8d10b8f..803abc8388f 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -287,6 +287,7 @@ import XAUTOCLAIM from './XAUTOCLAIM'; import XCLAIM_JUSTID from './XCLAIM_JUSTID'; import XCLAIM from './XCLAIM'; import XDEL from './XDEL'; +import XDELEX from './XDELEX'; import XGROUP_CREATE from './XGROUP_CREATE'; import XGROUP_CREATECONSUMER from './XGROUP_CREATECONSUMER'; import XGROUP_DELCONSUMER from './XGROUP_DELCONSUMER'; @@ -938,6 +939,8 @@ export default { xClaim: XCLAIM, XDEL, xDel: XDEL, + XDELEX, + xDelex: XDELEX, XGROUP_CREATE, xGroupCreate: XGROUP_CREATE, XGROUP_CREATECONSUMER, From 1c1e1ca7cac7dfe3f91a0cf8e5efa92ea9c7094e Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 24 Jul 2025 14:51:16 +0300 Subject: [PATCH 3/7] feat: implement XACKDEL command for Redis 8.2 --- packages/client/lib/commands/XACKDEL.spec.ts | 186 +++++++++++++++++++ packages/client/lib/commands/XACKDEL.ts | 62 +++++++ packages/client/lib/commands/index.ts | 3 + 3 files changed, 251 insertions(+) create mode 100644 packages/client/lib/commands/XACKDEL.spec.ts create mode 100644 packages/client/lib/commands/XACKDEL.ts diff --git a/packages/client/lib/commands/XACKDEL.spec.ts b/packages/client/lib/commands/XACKDEL.spec.ts new file mode 100644 index 00000000000..ac99ee8fb79 --- /dev/null +++ b/packages/client/lib/commands/XACKDEL.spec.ts @@ -0,0 +1,186 @@ +import { strict as assert } from "node:assert"; +import XACKDEL, { XACKDEL_REPLY_CODES, XAckDelPolicy } from "./XACKDEL"; +import { parseArgs } from "./generic-transformers"; +import testUtils, { GLOBAL } from "../test-utils"; + +describe("XACKDEL", () => { + describe("transformArguments", () => { + it("string - without policy", () => { + assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [ + "XACKDEL", + "key", + "group", + "IDS", + "1", + "0-0", + ]); + }); + + it("string - with policy", () => { + assert.deepEqual( + parseArgs(XACKDEL, "key", "group", "0-0", XAckDelPolicy.KEEPREF), + ["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"] + ); + }); + + it("array - without policy", () => { + assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [ + "XACKDEL", + "key", + "group", + "IDS", + "2", + "0-0", + "1-0", + ]); + }); + + it("array - with policy", () => { + assert.deepEqual( + parseArgs( + XACKDEL, + "key", + "group", + ["0-0", "1-0"], + XAckDelPolicy.DELREF + ), + ["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"] + ); + }); + }); + + testUtils.testAll( + `XACKDEL non-existing key - without policy`, + async (client) => { + const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0"); + assert.deepEqual(reply, [XACKDEL_REPLY_CODES.NOT_FOUND]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL existing key - without policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer group, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel(streamKey, groupName, messageId); + assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL existing key - with policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer group, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + messageId, + XAckDelPolicy.DELREF + ); + assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL acknowledge policy - with consumer group`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer groups, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + await client.xGroupCreate(streamKey, "some-other-group", "0"); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + messageId, + XAckDelPolicy.ACKED + ); + assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_DANGLING_REFS]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL multiple keys`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer groups, stream and add messages + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageIds = await Promise.all([ + client.xAdd(streamKey, "*", { field: "value1" }), + client.xAdd(streamKey, "*", { field: "value2" }), + ]); + + // read messages + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + [...messageIds, "0-0"], + XAckDelPolicy.DELREF + ); + assert.deepEqual(reply, [ + XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED, + XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED, + XACKDEL_REPLY_CODES.NOT_FOUND, + ]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); +}); diff --git a/packages/client/lib/commands/XACKDEL.ts b/packages/client/lib/commands/XACKDEL.ts new file mode 100644 index 00000000000..fbb57a468c9 --- /dev/null +++ b/packages/client/lib/commands/XACKDEL.ts @@ -0,0 +1,62 @@ +import { CommandParser } from "../client/parser"; +import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { RedisVariadicArgument } from "./generic-transformers"; + +/** XACKDEL deletion policies */ +export const XAckDelPolicy = { + /** Preserve references (default) */ + KEEPREF: "KEEPREF", + /** Delete all references */ + DELREF: "DELREF", + /** Only acknowledged entries */ + ACKED: "ACKED", +} as const; + +/** XACKDEL reply codes */ +export const XACKDEL_REPLY_CODES = { + /** ID not found */ + NOT_FOUND: -1, + /** Entry acknowledged and deleted */ + ACKNOWLEDGED_AND_DELETED: 1, + /** Entry acknowledged but dangling references remain */ + ACKNOWLEDGED_DANGLING_REFS: 2, +} as const; + +/** + * Acknowledges and deletes one or multiple messages for a stream consumer group + */ +export default { + IS_READ_ONLY: false, + /** + * Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group + * + * @param parser - The command parser + * @param key - The stream key + * @param group - The consumer group name + * @param id - One or more message IDs to acknowledge and delete + * @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) + * @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs) + * @see https://redis.io/commands/xackdel/ + */ + parseCommand( + parser: CommandParser, + key: RedisArgument, + group: RedisArgument, + id: RedisVariadicArgument, + policy?: (typeof XAckDelPolicy)[keyof typeof XAckDelPolicy], + ) { + parser.push("XACKDEL"); + parser.pushKey(key); + parser.push(group); + + if (policy) { + parser.push(policy); + } + + parser.push("IDS"); + parser.pushVariadicWithLength(id); + }, + transformReply: undefined as unknown as () => ArrayReply< + (typeof XACKDEL_REPLY_CODES)[keyof typeof XACKDEL_REPLY_CODES] + >, +} as const satisfies Command; diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index 803abc8388f..b3e1d3a3cfa 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -280,6 +280,7 @@ import TYPE from './TYPE'; import UNLINK from './UNLINK'; import WAIT from './WAIT'; import XACK from './XACK'; +import XACKDEL from './XACKDEL'; import XADD_NOMKSTREAM from './XADD_NOMKSTREAM'; import XADD from './XADD'; import XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID'; @@ -925,6 +926,8 @@ export default { wait: WAIT, XACK, xAck: XACK, + XACKDEL, + xAckDel: XACKDEL, XADD_NOMKSTREAM, xAddNoMkStream: XADD_NOMKSTREAM, XADD, From 3aec14b986e3b1f2f3f8f8b806995bd20726f6a9 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 24 Jul 2025 16:30:59 +0300 Subject: [PATCH 4/7] refactor: create shared stream deletion types for Redis 8.2 commands --- packages/client/lib/commands/XACKDEL.spec.ts | 36 ++++++++++------ packages/client/lib/commands/XACKDEL.ts | 31 ++++---------- packages/client/lib/commands/XDELEX.spec.ts | 42 +++++++++---------- packages/client/lib/commands/XDELEX.ts | 31 ++++---------- .../lib/commands/common-stream.types.ts | 28 +++++++++++++ 5 files changed, 86 insertions(+), 82 deletions(-) create mode 100644 packages/client/lib/commands/common-stream.types.ts diff --git a/packages/client/lib/commands/XACKDEL.spec.ts b/packages/client/lib/commands/XACKDEL.spec.ts index ac99ee8fb79..9d7bad15a24 100644 --- a/packages/client/lib/commands/XACKDEL.spec.ts +++ b/packages/client/lib/commands/XACKDEL.spec.ts @@ -1,7 +1,11 @@ import { strict as assert } from "node:assert"; -import XACKDEL, { XACKDEL_REPLY_CODES, XAckDelPolicy } from "./XACKDEL"; +import XACKDEL from "./XACKDEL"; import { parseArgs } from "./generic-transformers"; import testUtils, { GLOBAL } from "../test-utils"; +import { + STREAM_DELETION_POLICY, + STREAM_DELETION_REPLY_CODES, +} from "./common-stream.types"; describe("XACKDEL", () => { describe("transformArguments", () => { @@ -18,7 +22,13 @@ describe("XACKDEL", () => { it("string - with policy", () => { assert.deepEqual( - parseArgs(XACKDEL, "key", "group", "0-0", XAckDelPolicy.KEEPREF), + parseArgs( + XACKDEL, + "key", + "group", + "0-0", + STREAM_DELETION_POLICY.KEEPREF + ), ["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"] ); }); @@ -42,7 +52,7 @@ describe("XACKDEL", () => { "key", "group", ["0-0", "1-0"], - XAckDelPolicy.DELREF + STREAM_DELETION_POLICY.DELREF ), ["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"] ); @@ -53,7 +63,7 @@ describe("XACKDEL", () => { `XACKDEL non-existing key - without policy`, async (client) => { const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0"); - assert.deepEqual(reply, [XACKDEL_REPLY_CODES.NOT_FOUND]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -78,7 +88,7 @@ describe("XACKDEL", () => { }); const reply = await client.xAckDel(streamKey, groupName, messageId); - assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -106,9 +116,9 @@ describe("XACKDEL", () => { streamKey, groupName, messageId, - XAckDelPolicy.DELREF + STREAM_DELETION_POLICY.DELREF ); - assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -137,9 +147,9 @@ describe("XACKDEL", () => { streamKey, groupName, messageId, - XAckDelPolicy.ACKED + STREAM_DELETION_POLICY.ACKED ); - assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_DANGLING_REFS]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -170,12 +180,12 @@ describe("XACKDEL", () => { streamKey, groupName, [...messageIds, "0-0"], - XAckDelPolicy.DELREF + STREAM_DELETION_POLICY.DELREF ); assert.deepEqual(reply, [ - XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED, - XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED, - XACKDEL_REPLY_CODES.NOT_FOUND, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.NOT_FOUND, ]); }, { diff --git a/packages/client/lib/commands/XACKDEL.ts b/packages/client/lib/commands/XACKDEL.ts index fbb57a468c9..6e209879e49 100644 --- a/packages/client/lib/commands/XACKDEL.ts +++ b/packages/client/lib/commands/XACKDEL.ts @@ -1,27 +1,11 @@ import { CommandParser } from "../client/parser"; import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { + StreamDeletionReplyCode, + StreamDeletionPolicy, +} from "./common-stream.types"; import { RedisVariadicArgument } from "./generic-transformers"; -/** XACKDEL deletion policies */ -export const XAckDelPolicy = { - /** Preserve references (default) */ - KEEPREF: "KEEPREF", - /** Delete all references */ - DELREF: "DELREF", - /** Only acknowledged entries */ - ACKED: "ACKED", -} as const; - -/** XACKDEL reply codes */ -export const XACKDEL_REPLY_CODES = { - /** ID not found */ - NOT_FOUND: -1, - /** Entry acknowledged and deleted */ - ACKNOWLEDGED_AND_DELETED: 1, - /** Entry acknowledged but dangling references remain */ - ACKNOWLEDGED_DANGLING_REFS: 2, -} as const; - /** * Acknowledges and deletes one or multiple messages for a stream consumer group */ @@ -43,7 +27,7 @@ export default { key: RedisArgument, group: RedisArgument, id: RedisVariadicArgument, - policy?: (typeof XAckDelPolicy)[keyof typeof XAckDelPolicy], + policy?: StreamDeletionPolicy ) { parser.push("XACKDEL"); parser.pushKey(key); @@ -56,7 +40,6 @@ export default { parser.push("IDS"); parser.pushVariadicWithLength(id); }, - transformReply: undefined as unknown as () => ArrayReply< - (typeof XACKDEL_REPLY_CODES)[keyof typeof XACKDEL_REPLY_CODES] - >, + transformReply: + undefined as unknown as () => ArrayReply, } as const satisfies Command; diff --git a/packages/client/lib/commands/XDELEX.spec.ts b/packages/client/lib/commands/XDELEX.spec.ts index 5e83efca730..192f7f0c4cf 100644 --- a/packages/client/lib/commands/XDELEX.spec.ts +++ b/packages/client/lib/commands/XDELEX.spec.ts @@ -1,7 +1,11 @@ import { strict as assert } from "node:assert"; -import XDELEX, { XDELEX_REPLY_CODES, XDelexPolicy } from "./XDELEX"; +import XDELEX from "./XDELEX"; import { parseArgs } from "./generic-transformers"; import testUtils, { GLOBAL } from "../test-utils"; +import { + STREAM_DELETION_POLICY, + STREAM_DELETION_REPLY_CODES, +} from "./common-stream.types"; describe("XDELEX", () => { describe("transformArguments", () => { @@ -16,14 +20,10 @@ describe("XDELEX", () => { }); it("string - with policy", () => { - assert.deepEqual(parseArgs(XDELEX, "key", "0-0", XDelexPolicy.KEEPREF), [ - "XDELEX", - "key", - "KEEPREF", - "IDS", - "1", - "0-0", - ]); + assert.deepEqual( + parseArgs(XDELEX, "key", "0-0", STREAM_DELETION_POLICY.KEEPREF), + ["XDELEX", "key", "KEEPREF", "IDS", "1", "0-0"] + ); }); it("array - without policy", () => { @@ -39,7 +39,7 @@ describe("XDELEX", () => { it("array - with policy", () => { assert.deepEqual( - parseArgs(XDELEX, "key", ["0-0", "1-0"], XDelexPolicy.DELREF), + parseArgs(XDELEX, "key", ["0-0", "1-0"], STREAM_DELETION_POLICY.DELREF), ["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"] ); }); @@ -49,7 +49,7 @@ describe("XDELEX", () => { `XDELEX non-existing key - without policy`, async (client) => { const reply = await client.xDelex("{tag}stream-key", "0-0"); - assert.deepEqual(reply, [XDELEX_REPLY_CODES.NOT_FOUND]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -68,9 +68,9 @@ describe("XDELEX", () => { const reply = await client.xDelex( streamKey, messageId, - XDelexPolicy.KEEPREF + STREAM_DELETION_POLICY.KEEPREF ); - assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -89,9 +89,9 @@ describe("XDELEX", () => { const reply = await client.xDelex( streamKey, messageId, - XDelexPolicy.DELREF + STREAM_DELETION_POLICY.DELREF ); - assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -115,9 +115,9 @@ describe("XDELEX", () => { const reply = await client.xDelex( streamKey, messageId, - XDelexPolicy.ACKED + STREAM_DELETION_POLICY.ACKED ); - assert.deepEqual(reply, [XDELEX_REPLY_CODES.DANGLING_REFS]); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]); }, { client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, @@ -141,12 +141,12 @@ describe("XDELEX", () => { const reply = await client.xDelex( streamKey, [...messageIds, "0-0"], - XDelexPolicy.DELREF + STREAM_DELETION_POLICY.DELREF ); assert.deepEqual(reply, [ - XDELEX_REPLY_CODES.DELETED, - XDELEX_REPLY_CODES.DELETED, - XDELEX_REPLY_CODES.NOT_FOUND, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.NOT_FOUND, ]); }, { diff --git a/packages/client/lib/commands/XDELEX.ts b/packages/client/lib/commands/XDELEX.ts index 0c6d967da21..021dd0a9e13 100644 --- a/packages/client/lib/commands/XDELEX.ts +++ b/packages/client/lib/commands/XDELEX.ts @@ -1,27 +1,11 @@ import { CommandParser } from "../client/parser"; import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { + StreamDeletionPolicy, + StreamDeletionReplyCode, +} from "./common-stream.types"; import { RedisVariadicArgument } from "./generic-transformers"; -/** XDELEX deletion policies */ -export const XDelexPolicy = { - /** Preserve references (default) */ - KEEPREF: "KEEPREF", - /** Delete all references */ - DELREF: "DELREF", - /** Only acknowledged entries */ - ACKED: "ACKED", -} as const; - -/** XDELEX reply codes */ -export const XDELEX_REPLY_CODES = { - /** ID not found */ - NOT_FOUND: -1, - /** Entry deleted */ - DELETED: 1, - /** Dangling references */ - DANGLING_REFS: 2, -} as const; - /** * Deletes one or multiple entries from the stream */ @@ -41,7 +25,7 @@ export default { parser: CommandParser, key: RedisArgument, id: RedisVariadicArgument, - policy?: (typeof XDelexPolicy)[keyof typeof XDelexPolicy], + policy?: StreamDeletionPolicy ) { parser.push("XDELEX"); parser.pushKey(key); @@ -53,7 +37,6 @@ export default { parser.push("IDS"); parser.pushVariadicWithLength(id); }, - transformReply: undefined as unknown as () => ArrayReply< - (typeof XDELEX_REPLY_CODES)[keyof typeof XDELEX_REPLY_CODES] - >, + transformReply: + undefined as unknown as () => ArrayReply, } as const satisfies Command; diff --git a/packages/client/lib/commands/common-stream.types.ts b/packages/client/lib/commands/common-stream.types.ts new file mode 100644 index 00000000000..60955b6e3c3 --- /dev/null +++ b/packages/client/lib/commands/common-stream.types.ts @@ -0,0 +1,28 @@ +/** Common stream deletion policies + * + * Added in Redis 8.2 + */ +export const STREAM_DELETION_POLICY = { + /** Preserve references (default) */ + KEEPREF: "KEEPREF", + /** Delete all references */ + DELREF: "DELREF", + /** Only acknowledged entries */ + ACKED: "ACKED", +} as const; + +export type StreamDeletionPolicy = + (typeof STREAM_DELETION_POLICY)[keyof typeof STREAM_DELETION_POLICY]; + +/** Common reply codes for stream deletion operations */ +export const STREAM_DELETION_REPLY_CODES = { + /** ID not found */ + NOT_FOUND: -1, + /** Entry deleted */ + DELETED: 1, + /** Dangling references */ + DANGLING_REFS: 2, +} as const; + +export type StreamDeletionReplyCode = + (typeof STREAM_DELETION_REPLY_CODES)[keyof typeof STREAM_DELETION_REPLY_CODES]; From b54588b54f7a09790e8ac22959df0765973bec2a Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 24 Jul 2025 17:03:34 +0300 Subject: [PATCH 5/7] feat: add Redis 8.2 deletion policies to XTRIM command --- packages/client/lib/commands/XTRIM.spec.ts | 95 +++++++++++++++++++++- packages/client/lib/commands/XTRIM.ts | 8 ++ 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/commands/XTRIM.spec.ts b/packages/client/lib/commands/XTRIM.spec.ts index 2c31f0fef92..b88cf84676e 100644 --- a/packages/client/lib/commands/XTRIM.spec.ts +++ b/packages/client/lib/commands/XTRIM.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XTRIM from './XTRIM'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XTRIM', () => { describe('transformArguments', () => { @@ -12,6 +13,13 @@ describe('XTRIM', () => { ); }); + it('simple - MINID', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MINID', 123), + ['XTRIM', 'key', 'MINID', '123'] + ); + }); + it('with strategyModifier', () => { assert.deepEqual( parseArgs(XTRIM, 'key', 'MAXLEN', 1, { @@ -39,15 +47,96 @@ describe('XTRIM', () => { ['XTRIM', 'key', 'MAXLEN', '=', '1', 'LIMIT', '1'] ); }); + + it('with policy', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MAXLEN', 1, { + policy: STREAM_DELETION_POLICY.DELREF + }), + ['XTRIM', 'key', 'MAXLEN', '1', 'DELREF'] + ); + }); + + it('with all options', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MAXLEN', 1, { + strategyModifier: '~', + LIMIT: 100, + policy: STREAM_DELETION_POLICY.ACKED + }), + ['XTRIM', 'key', 'MAXLEN', '~', '1', 'LIMIT', '100', 'ACKED'] + ); + }); + }); + + testUtils.testAll('xTrim with MAXLEN', async client => { + assert.equal( + typeof await client.xTrim('key', 'MAXLEN', 1), + 'number' + ); + }, { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, }); - testUtils.testAll('xTrim', async client => { + testUtils.testAll('xTrim with MINID', async client => { assert.equal( - await client.xTrim('key', 'MAXLEN', 1), - 0 + typeof await client.xTrim('key', 'MINID', 1), + 'number' ); }, { client: GLOBAL.SERVERS.OPEN, cluster: GLOBAL.CLUSTERS.OPEN, }); + + testUtils.testAll( + 'xTrim with LIMIT', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MAXLEN', 1000, { + strategyModifier: '~', + LIMIT: 10 + }), + 'number' + ); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + 'xTrim with policy', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MAXLEN', 0, { + policy: STREAM_DELETION_POLICY.DELREF + }), + 'number' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + 'xTrim with all options', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MINID', 0, { + strategyModifier: '~', + LIMIT: 10, + policy: STREAM_DELETION_POLICY.KEEPREF + }), + 'number' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); diff --git a/packages/client/lib/commands/XTRIM.ts b/packages/client/lib/commands/XTRIM.ts index 6125720111a..34171d4611e 100644 --- a/packages/client/lib/commands/XTRIM.ts +++ b/packages/client/lib/commands/XTRIM.ts @@ -1,16 +1,20 @@ import { CommandParser } from '../client/parser'; import { NumberReply, Command, RedisArgument } from '../RESP/types'; +import { StreamDeletionPolicy } from './common-stream.types'; /** * Options for the XTRIM command * * @property strategyModifier - Exact ('=') or approximate ('~') trimming * @property LIMIT - Maximum number of entries to trim in one call (Redis 6.2+) + * @property policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) */ export interface XTrimOptions { strategyModifier?: '=' | '~'; /** added in 6.2 */ LIMIT?: number; + /** added in 8.2 */ + policy?: StreamDeletionPolicy; } /** @@ -49,6 +53,10 @@ export default { if (options?.LIMIT) { parser.push('LIMIT', options.LIMIT.toString()); } + + if (options?.policy) { + parser.push(options.policy); + } }, transformReply: undefined as unknown as () => NumberReply } as const satisfies Command; From 51176de3780f65da4bf9848ce4c083424003cd27 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 24 Jul 2025 17:21:09 +0300 Subject: [PATCH 6/7] feat: add Redis 8.2 deletion policies to XADD commands --- packages/client/lib/commands/XADD.spec.ts | 80 +++++++++++++++++ packages/client/lib/commands/XADD.ts | 8 ++ .../lib/commands/XADD_NOMKSTREAM.spec.ts | 88 ++++++++++++++++--- 3 files changed, 165 insertions(+), 11 deletions(-) diff --git a/packages/client/lib/commands/XADD.spec.ts b/packages/client/lib/commands/XADD.spec.ts index 321581d0865..a41e8682751 100644 --- a/packages/client/lib/commands/XADD.spec.ts +++ b/packages/client/lib/commands/XADD.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XADD from './XADD'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XADD', () => { describe('transformArguments', () => { @@ -78,6 +79,37 @@ describe('XADD', () => { ['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value'] ); }); + + it('with TRIM.policy', () => { + assert.deepEqual( + parseArgs(XADD, 'key', '*', { + field: 'value' + }, { + TRIM: { + threshold: 1000, + policy: STREAM_DELETION_POLICY.DELREF + } + }), + ['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value'] + ); + }); + + it('with all TRIM options', () => { + assert.deepEqual( + parseArgs(XADD, 'key', '*', { + field: 'value' + }, { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 100, + policy: STREAM_DELETION_POLICY.ACKED + } + }), + ['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value'] + ); + }); }); testUtils.testAll('xAdd', async client => { @@ -91,4 +123,52 @@ describe('XADD', () => { client: GLOBAL.SERVERS.OPEN, cluster: GLOBAL.CLUSTERS.OPEN }); + + testUtils.testAll( + 'xAdd with TRIM policy', + async (client) => { + assert.equal( + typeof await client.xAdd('{tag}key', '*', + { field: 'value' }, + { + TRIM: { + strategy: 'MAXLEN', + threshold: 1000, + policy: STREAM_DELETION_POLICY.KEEPREF + } + } + ), + 'string' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + 'xAdd with all TRIM options', + async (client) => { + assert.equal( + typeof await client.xAdd('{tag}key2', '*', + { field: 'value' }, + { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 10, + policy: STREAM_DELETION_POLICY.DELREF + } + } + ), + 'string' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); diff --git a/packages/client/lib/commands/XADD.ts b/packages/client/lib/commands/XADD.ts index b0c50b1bfdb..f2509a9fa7b 100644 --- a/packages/client/lib/commands/XADD.ts +++ b/packages/client/lib/commands/XADD.ts @@ -1,5 +1,6 @@ import { CommandParser } from '../client/parser'; import { RedisArgument, BlobStringReply, Command } from '../RESP/types'; +import { StreamDeletionPolicy } from './common-stream.types'; import { Tail } from './generic-transformers'; /** @@ -10,6 +11,7 @@ import { Tail } from './generic-transformers'; * @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming * @property TRIM.threshold - Maximum stream length or minimum ID to retain * @property TRIM.limit - Maximum number of entries to trim in one call + * @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF) */ export interface XAddOptions { TRIM?: { @@ -17,6 +19,8 @@ export interface XAddOptions { strategyModifier?: '=' | '~'; threshold: number; limit?: number; + /** added in 8.2 */ + policy?: StreamDeletionPolicy; }; } @@ -58,6 +62,10 @@ export function parseXAddArguments( if (options.TRIM.limit) { parser.push('LIMIT', options.TRIM.limit.toString()); } + + if (options.TRIM.policy) { + parser.push(options.TRIM.policy); + } } parser.push(id); diff --git a/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts b/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts index 97927f212ff..a957d0f06c1 100644 --- a/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts +++ b/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XADD_NOMKSTREAM from './XADD_NOMKSTREAM'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XADD NOMKSTREAM', () => { testUtils.isVersionGreaterThanHook([6, 2]); @@ -80,17 +81,82 @@ describe('XADD NOMKSTREAM', () => { ['XADD', 'key', 'NOMKSTREAM', '1000', 'LIMIT', '1', '*', 'field', 'value'] ); }); - }); - testUtils.testAll('xAddNoMkStream', async client => { - assert.equal( - await client.xAddNoMkStream('key', '*', { - field: 'value' - }), - null - ); - }, { - client: GLOBAL.SERVERS.OPEN, - cluster: GLOBAL.CLUSTERS.OPEN + it('with TRIM.policy', () => { + assert.deepEqual( + parseArgs(XADD_NOMKSTREAM, 'key', '*', { + field: 'value' + }, { + TRIM: { + threshold: 1000, + policy: STREAM_DELETION_POLICY.DELREF + } + }), + ['XADD', 'key', 'NOMKSTREAM', '1000', 'DELREF', '*', 'field', 'value'] + ); + }); + + it('with all TRIM options', () => { + assert.deepEqual( + parseArgs(XADD_NOMKSTREAM, 'key', '*', { + field: 'value' + }, { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 100, + policy: STREAM_DELETION_POLICY.ACKED + } + }), + ['XADD', 'key', 'NOMKSTREAM', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value'] + ); + }); }); + + testUtils.testAll( + 'xAddNoMkStream - null when stream does not exist', + async (client) => { + assert.equal( + await client.xAddNoMkStream('{tag}nonexistent-stream', '*', { + field: 'value' + }), + null + ); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + 'xAddNoMkStream - with all TRIM options', + async (client) => { + const streamKey = '{tag}stream'; + + // Create stream and add some messages + await client.xAdd(streamKey, '*', { field: 'value1' }); + + // Use NOMKSTREAM with all TRIM options + const messageId = await client.xAddNoMkStream(streamKey, '*', + { field: 'value2' }, + { + TRIM: { + strategyModifier: '~', + limit: 1, + strategy: 'MAXLEN', + threshold: 2, + policy: STREAM_DELETION_POLICY.DELREF + } + } + ); + + assert.equal(typeof messageId, 'string'); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); From decb40e20abc785e95924e020df97163e6813ad2 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Fri, 25 Jul 2025 13:50:22 +0300 Subject: [PATCH 7/7] fix: correct XDELEX command method name and test parameter --- packages/client/lib/commands/XDELEX.spec.ts | 11 +++++------ packages/client/lib/commands/index.ts | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/client/lib/commands/XDELEX.spec.ts b/packages/client/lib/commands/XDELEX.spec.ts index 192f7f0c4cf..8c421503256 100644 --- a/packages/client/lib/commands/XDELEX.spec.ts +++ b/packages/client/lib/commands/XDELEX.spec.ts @@ -48,7 +48,7 @@ describe("XDELEX", () => { testUtils.testAll( `XDELEX non-existing key - without policy`, async (client) => { - const reply = await client.xDelex("{tag}stream-key", "0-0"); + const reply = await client.xDelEx("{tag}stream-key", "0-0"); assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]); }, { @@ -65,10 +65,9 @@ describe("XDELEX", () => { field: "value", }); - const reply = await client.xDelex( + const reply = await client.xDelEx( streamKey, messageId, - STREAM_DELETION_POLICY.KEEPREF ); assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); }, @@ -86,7 +85,7 @@ describe("XDELEX", () => { field: "value", }); - const reply = await client.xDelex( + const reply = await client.xDelEx( streamKey, messageId, STREAM_DELETION_POLICY.DELREF @@ -112,7 +111,7 @@ describe("XDELEX", () => { // Create consumer group await client.xGroupCreate(streamKey, "testgroup", "0"); - const reply = await client.xDelex( + const reply = await client.xDelEx( streamKey, messageId, STREAM_DELETION_POLICY.ACKED @@ -138,7 +137,7 @@ describe("XDELEX", () => { }), ]); - const reply = await client.xDelex( + const reply = await client.xDelEx( streamKey, [...messageIds, "0-0"], STREAM_DELETION_POLICY.DELREF diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index b3e1d3a3cfa..4614c8b282b 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -943,7 +943,7 @@ export default { XDEL, xDel: XDEL, XDELEX, - xDelex: XDELEX, + xDelEx: XDELEX, XGROUP_CREATE, xGroupCreate: XGROUP_CREATE, XGROUP_CREATECONSUMER,