Skip to content

Commit 7702220

Browse files
committed
fix returnBuffers, add some tests
1 parent 74daee3 commit 7702220

File tree

11 files changed

+149
-25
lines changed

11 files changed

+149
-25
lines changed

packages/bloom/lib/commands/bloom/LOADCHUNK.spec.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { strict as assert } from 'assert';
2+
import testUtils, { GLOBAL } from '../../test-utils';
23
import { transformArguments } from './LOADCHUNK';
34

45
describe('BF LOADCHUNK', () => {
@@ -8,4 +9,20 @@ describe('BF LOADCHUNK', () => {
89
['BF.LOADCHUNK', 'key', '0', '']
910
);
1011
});
12+
13+
testUtils.testWithClient('client.bf.loadChunk', async client => {
14+
const [, { iterator, chunk }] = await Promise.all([
15+
client.bf.reserve('source', 0.01, 100),
16+
client.bf.scanDump(
17+
client.commandOptions({ returnBuffers: true }),
18+
'source',
19+
0
20+
)
21+
]);
22+
23+
assert.equal(
24+
await client.bf.loadChunk('destination', iterator, chunk),
25+
'OK'
26+
);
27+
}, GLOBAL.SERVERS.OPEN);
1128
});

packages/bloom/lib/commands/bloom/SCANDUMP.spec.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { strict as assert } from 'assert';
2+
import testUtils, { GLOBAL } from '../../test-utils';
23
import { transformArguments } from './SCANDUMP';
34

45
describe('BF SCANDUMP', () => {
@@ -8,4 +9,14 @@ describe('BF SCANDUMP', () => {
89
['BF.SCANDUMP', 'key', '0']
910
);
1011
});
12+
13+
testUtils.testWithClient('client.bf.scanDump', async client => {
14+
const [, dump] = await Promise.all([
15+
client.bf.reserve('key', 0.01, 100),
16+
client.bf.scanDump('key', 0)
17+
]);
18+
assert.equal(typeof dump, 'object');
19+
assert.equal(typeof dump.iterator, 'number');
20+
assert.equal(typeof dump.chunk, 'string');
21+
}, GLOBAL.SERVERS.OPEN);
1122
});

packages/bloom/lib/commands/cuckoo/LOADCHUNK.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { strict as assert } from 'assert';
2+
import testUtils, { GLOBAL } from '../../test-utils';
23
import { transformArguments } from './LOADCHUNK';
34

45
describe('CF LOADCHUNK', () => {
@@ -8,4 +9,23 @@ describe('CF LOADCHUNK', () => {
89
['CF.LOADCHUNK', 'item', '0', '']
910
);
1011
});
12+
13+
testUtils.testWithClient('client.cf.loadChunk', async client => {
14+
const [,, { iterator, chunk }] = await Promise.all([
15+
client.cf.reserve('source', 4),
16+
client.cf.add('source', 'item'),
17+
client.cf.scanDump(
18+
client.commandOptions({ returnBuffers: true }),
19+
'source',
20+
0
21+
)
22+
]);
23+
24+
assert.ok(Buffer.isBuffer(chunk));
25+
26+
assert.equal(
27+
await client.cf.loadChunk('destination', iterator, chunk),
28+
'OK'
29+
);
30+
}, GLOBAL.SERVERS.OPEN);
1131
});

packages/bloom/lib/commands/cuckoo/LOADCHUNK.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1+
import { RedisCommandArgument, RedisCommandArguments } from '@node-redis/client/dist/lib/commands';
2+
13
export const FIRST_KEY_INDEX = 1;
24

3-
export function transformArguments(key: string, iterator: number, chunk: string): Array<string> {
5+
export function transformArguments(
6+
key: string,
7+
iterator: number,
8+
chunk: RedisCommandArgument
9+
): RedisCommandArguments {
410
return ['CF.LOADCHUNK', key, iterator.toString(), chunk];
511
}
612

packages/bloom/lib/commands/cuckoo/SCANDUMP.spec.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { strict as assert } from 'assert';
2+
import testUtils, { GLOBAL } from '../../test-utils';
23
import { transformArguments } from './SCANDUMP';
34

45
describe('CF SCANDUMP', () => {
@@ -8,4 +9,15 @@ describe('CF SCANDUMP', () => {
89
['CF.SCANDUMP', 'key', '0']
910
);
1011
});
12+
13+
testUtils.testWithClient('client.cf.scanDump', async client => {
14+
await client.cf.reserve('key', 4);
15+
assert.deepEqual(
16+
await client.cf.scanDump('key', 0),
17+
{
18+
iterator: 0,
19+
chunk: null
20+
}
21+
);
22+
}, GLOBAL.SERVERS.OPEN);
1123
});

packages/bloom/lib/commands/cuckoo/SCANDUMP.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ export function transformArguments(key: string, iterator: number): Array<string>
66

77
type ScanDumpRawReply = [
88
iterator: number,
9-
chunk: string
9+
chunk: string | null
1010
];
1111

1212
interface ScanDumpReply {
1313
iterator: number;
14-
chunk: string;
14+
chunk: string | null;
1515
}
1616

1717
export function transformReply([iterator, chunk]: ScanDumpRawReply): ScanDumpReply {

packages/client/lib/client/commands-queue.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,20 +360,28 @@ export default class RedisCommandsQueue {
360360
return toSend?.args;
361361
}
362362

363-
parseResponse(data: Buffer): void {
363+
#setReturnBuffers() {
364364
this.#parser.setReturnBuffers(
365365
!!this.#waitingForReply.head?.value.returnBuffers ||
366366
!!this.#pubSubState?.subscribed
367367
);
368+
}
369+
370+
parseResponse(data: Buffer): void {
371+
this.#setReturnBuffers();
368372
this.#parser.execute(data);
369373
}
370374

371375
#shiftWaitingForReply(): CommandWaitingForReply {
372376
if (!this.#waitingForReply.length) {
373377
throw new Error('Got an unexpected reply from Redis');
374378
}
375-
return this.#waitingForReply.shift()!;
379+
380+
const waitingForReply = this.#waitingForReply.shift()!;
381+
this.#setReturnBuffers();
382+
return waitingForReply;
376383
}
384+
377385
flushWaitingForReply(err: Error): void {
378386
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
379387
if (!this.#chainInExecution) {
@@ -384,6 +392,7 @@ export default class RedisCommandsQueue {
384392
}
385393
this.#chainInExecution = undefined;
386394
}
395+
387396
flushAll(err: Error): void {
388397
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
389398
RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err);

packages/client/lib/client/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
8989
return commandOptions(options);
9090
}
9191

92+
commandOptions = RedisClient.commandOptions;
93+
9294
static extend<M extends RedisModules, S extends RedisScripts>(plugins?: RedisPlugins<M, S>): InstantiableRedisClient<M, S> {
9395
const Client = <any>extendWithModulesAndScripts({
9496
BaseClass: RedisClient,

packages/client/lib/commands/CLIENT_KILL.spec.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,18 @@ describe('CLIENT KILL', () => {
9393
);
9494
});
9595
});
96+
97+
it('TYPE & SKIP_ME', () => {
98+
assert.deepEqual(
99+
transformArguments([
100+
{
101+
filter: ClientKillFilters.TYPE,
102+
type: 'master'
103+
},
104+
ClientKillFilters.SKIP_ME
105+
]),
106+
['CLIENT', 'KILL', 'TYPE', 'master', 'SKIPME']
107+
);
108+
});
96109
});
97110
});

packages/client/lib/commands/XPENDING.spec.ts

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,53 @@ describe('XPENDING', () => {
1212
});
1313
});
1414

15-
testUtils.testWithClient('client.xPending', async client => {
16-
await client.xGroupCreate('key', 'group', '$', {
17-
MKSTREAM: true
18-
});
15+
describe('client.xPending', () => {
16+
testUtils.testWithClient('simple', async client => {
17+
await Promise.all([
18+
client.xGroupCreate('key', 'group', '$', {
19+
MKSTREAM: true
20+
}),
21+
client.xGroupCreateConsumer('key', 'group', 'consumer')
22+
]);
23+
24+
assert.deepEqual(
25+
await client.xPending('key', 'group'),
26+
{
27+
pending: 0,
28+
firstId: null,
29+
lastId: null,
30+
consumers: null
31+
}
32+
);
33+
}, GLOBAL.SERVERS.OPEN);
34+
35+
testUtils.testWithClient('with consumers', async client => {
36+
const [,, id] = await Promise.all([
37+
client.xGroupCreate('key', 'group', '$', {
38+
MKSTREAM: true
39+
}),
40+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
41+
client.xAdd('key', '*', { field: 'value' }),
42+
client.xReadGroup('group', 'consumer', {
43+
key: 'key',
44+
id: '>'
45+
})
46+
]);
47+
48+
assert.deepEqual(
49+
await client.xPending('key', 'group'),
50+
{
51+
pending: 1,
52+
firstId: id,
53+
lastId: id,
54+
consumers: [{
55+
name: 'consumer',
56+
deliveriesCounter: 1
57+
}]
58+
}
59+
);
60+
}, GLOBAL.SERVERS.OPEN);
61+
});
62+
1963

20-
assert.deepEqual(
21-
await client.xPending('key', 'group'),
22-
{
23-
pending: 0,
24-
firstId: null,
25-
lastId: null,
26-
consumers: null
27-
}
28-
);
29-
}, GLOBAL.SERVERS.OPEN);
3064
});

packages/client/lib/commands/XPENDING.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ type XPendingRawReply = [
1717
lastId: RedisCommandArgument | null,
1818
consumers: Array<[
1919
name: RedisCommandArgument,
20-
deliveriesCounter: number
20+
deliveriesCounter: RedisCommandArgument
2121
]> | null
22-
]
22+
];
2323

2424
interface XPendingReply {
2525
pending: number;
2626
firstId: RedisCommandArgument | null;
27-
lastId: RedisCommandArgument | null
27+
lastId: RedisCommandArgument | null;
2828
consumers: Array<{
29-
name: RedisCommandArgument,
30-
deliveriesCounter: number
29+
name: RedisCommandArgument;
30+
deliveriesCounter: number;
3131
}> | null;
3232
}
3333

@@ -38,7 +38,7 @@ export function transformReply(reply: XPendingRawReply): XPendingReply {
3838
lastId: reply[2],
3939
consumers: reply[3] === null ? null : reply[3].map(([name, deliveriesCounter]) => ({
4040
name,
41-
deliveriesCounter
41+
deliveriesCounter: Number(deliveriesCounter)
4242
}))
4343
};
4444
}

0 commit comments

Comments
 (0)