Skip to content

Commit

Permalink
Execute empty MULTI (#2423)
Browse files Browse the repository at this point in the history
* Fix multi.exec with empty queue and previous watch

When calling exec on a multi instance which you did not use, no command is sent currently.

This is a problem for watched keys, because no EXEC means no unwatch, which might cause hard-to-debug problems.

Proposed Fix: Sending UNWATCH

* execute empty multi command (instead of skipping)

* Update index.ts

* Update index.ts

* Update multi-command.ts

* Update multi-command.ts

* Update multi-command.ts

* Update multi-command.ts

* short circuit empty pipelines

* Update index.ts

---------

Co-authored-by: Leibale <me@leibale.com>
  • Loading branch information
Mik13 and leibale authored Feb 24, 2023
1 parent 1be8422 commit 0f28dad
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 45 deletions.
25 changes: 17 additions & 8 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,23 @@ describe('Client', () => {
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('execAsPipeline', async client => {
assert.deepEqual(
await client.multi()
.ping()
.exec(true),
['PONG']
);
}, GLOBAL.SERVERS.OPEN);
describe('execAsPipeline', () => {
testUtils.testWithClient('exec(true)', async client => {
assert.deepEqual(
await client.multi()
.ping()
.exec(true),
['PONG']
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('empty execAsPipeline', async client => {
assert.deepEqual(
await client.multi().execAsPipeline(),
[]
);
}, GLOBAL.SERVERS.OPEN);
});

testUtils.testWithClient('should remember selected db', async client => {
await client.multi()
Expand Down
21 changes: 15 additions & 6 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ export default class RedisClient<
);
} else if (!this.#socket.isReady && this.#options?.disableOfflineQueue) {
return Promise.reject(new ClientOfflineError());
}
}

const promise = this.#queue.addCommand<T>(args, options);
this.#tick();
Expand Down Expand Up @@ -725,11 +725,14 @@ export default class RedisClient<
return Promise.reject(new ClientClosedError());
}

const promise = Promise.all(
commands.map(({ args }) => {
return this.#queue.addCommand(args, { chainId });
})
);
const promise = chainId ?
// if `chainId` has a value, it's a `MULTI` (and not "pipeline") - need to add the `MULTI` and `EXEC` commands
Promise.all([
this.#queue.addCommand(['MULTI'], { chainId }),
this.#addMultiCommands(commands, chainId),
this.#queue.addCommand(['EXEC'], { chainId })
]) :
this.#addMultiCommands(commands);

this.#tick();

Expand All @@ -742,6 +745,12 @@ export default class RedisClient<
return results;
}

#addMultiCommands(commands: Array<RedisMultiQueuedCommand>, chainId?: symbol) {
return Promise.all(
commands.map(({ args }) => this.#queue.addCommand(args, { chainId }))
);
}

async* scanIterator(options?: ScanCommandOptions): AsyncIterable<string> {
let cursor = 0;
do {
Expand Down
7 changes: 3 additions & 4 deletions packages/client/lib/client/multi-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,9 @@ export default class RedisClientMultiCommand {
return this.execAsPipeline();
}

const commands = this.#multi.exec();
if (!commands) return [];

return this.#multi.handleExecReplies(
await this.#executor(
commands,
this.#multi.queue,
this.#selectedDB,
RedisMultiCommand.generateChainId()
)
Expand All @@ -185,6 +182,8 @@ export default class RedisClientMultiCommand {
EXEC = this.exec;

async execAsPipeline(): Promise<Array<RedisCommandRawReply>> {
if (this.#multi.queue.length === 0) return [];

return this.#multi.transformReplies(
await this.#executor(
this.#multi.queue,
Expand Down
5 changes: 1 addition & 4 deletions packages/client/lib/cluster/multi-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ export default class RedisClusterMultiCommand {
return this.execAsPipeline();
}

const commands = this.#multi.exec();
if (!commands) return [];

return this.#multi.handleExecReplies(
await this.#executor(commands, this.#firstKey, RedisMultiCommand.generateChainId())
await this.#executor(this.#multi.queue, this.#firstKey, RedisMultiCommand.generateChainId())
);
}

Expand Down
21 changes: 10 additions & 11 deletions packages/client/lib/multi-command.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,23 @@ describe('Multi Command', () => {
});

describe('exec', () => {
it('undefined', () => {
assert.equal(
new RedisMultiCommand().exec(),
undefined
it('without commands', () => {
assert.deepEqual(
new RedisMultiCommand().queue,
[]
);
});

it('Array', () => {
it('with commands', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);

assert.deepEqual(
multi.exec(),
[
{ args: ['MULTI'] },
{ args: ['PING'], transformReply: undefined },
{ args: ['EXEC'] }
]
multi.queue,
[{
args: ['PING'],
transformReply: undefined
}]
);
});
});
Expand Down
12 changes: 0 additions & 12 deletions packages/client/lib/multi-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ export default class RedisMultiCommand {
return transformedArguments;
}

exec(): undefined | Array<RedisMultiQueuedCommand> {
if (!this.queue.length) {
return;
}

return [
{ args: ['MULTI'] },
...this.queue,
{ args: ['EXEC'] }
];
}

handleExecReplies(rawReplies: Array<RedisCommandRawReply>): Array<RedisCommandRawReply> {
const execReply = rawReplies[rawReplies.length - 1] as (null | Array<RedisCommandRawReply>);
if (execReply === null) {
Expand Down

0 comments on commit 0f28dad

Please sign in to comment.