From 6268360ab044e250665b89bead81b2ca58d0819c Mon Sep 17 00:00:00 2001 From: Nicolas Zelaya Date: Mon, 6 Nov 2023 09:19:35 -0300 Subject: [PATCH] Handling pipeline method properly in RedisAdapter --- src/storages/inRedis/RedisAdapter.ts | 71 +++++++++++++++++++--------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/src/storages/inRedis/RedisAdapter.ts b/src/storages/inRedis/RedisAdapter.ts index 7beb2988..423566f5 100644 --- a/src/storages/inRedis/RedisAdapter.ts +++ b/src/storages/inRedis/RedisAdapter.ts @@ -8,7 +8,8 @@ import { timeout } from '../../utils/promise/timeout'; const LOG_PREFIX = 'storage:redis-adapter: '; // If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis. -const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'pipeline', 'expire', 'mget', 'lrange', 'ltrim', 'hset']; +const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset']; +const METHODS_TO_PROMISE_WRAP_EXEC = ['pipeline']; // Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere. const DEFAULT_OPTIONS = { @@ -56,6 +57,7 @@ export class RedisAdapter extends ioredis { this.once('ready', () => { const commandsCount = this._notReadyCommandsQueue ? this._notReadyCommandsQueue.length : 0; this.log.info(LOG_PREFIX + `Redis connection established. Queued commands: ${commandsCount}.`); + this._notReadyCommandsQueue && this._notReadyCommandsQueue.forEach(queued => { this.log.info(LOG_PREFIX + `Executing queued ${queued.name} command.`); queued.command().then(queued.resolve).catch(queued.reject); @@ -71,49 +73,72 @@ export class RedisAdapter extends ioredis { _setTimeoutWrappers() { const instance: Record = this; - METHODS_TO_PROMISE_WRAP.forEach(method => { - const originalMethod = instance[method]; + const wrapWithQueueOrExecute = (method: string, commandWrapper: Function) => { + if (instance._notReadyCommandsQueue) { + return new Promise((resolve, reject) => { + instance._notReadyCommandsQueue.unshift({ + resolve, + reject, + command: commandWrapper, + name: method.toUpperCase() + }); + }); + } else { + return commandWrapper(); + } + }; - instance[method] = function () { + const wrapCommand = (originalMethod: Function, methodName: string) => { + return function () { const params = arguments; - function commandWrapper() { - instance.log.debug(LOG_PREFIX + `Executing ${method}.`); - // Return original method - const result = originalMethod.apply(instance, params); + const commandWrapper = () => { + instance.log.debug(`${LOG_PREFIX} Executing ${methodName}.`); + const result = originalMethod.apply(this, params); if (thenable(result)) { // For handling pending commands on disconnect, add to the set and remove once finished. // On sync commands there's no need, only thenables. instance._runningCommands.add(result); - const cleanUpRunningCommandsCb = function () { + const cleanUpRunningCommandsCb = () => { instance._runningCommands.delete(result); }; // Both success and error remove from queue. result.then(cleanUpRunningCommandsCb, cleanUpRunningCommandsCb); return timeout(instance._options.operationTimeout, result).catch(err => { - instance.log.error(LOG_PREFIX + `${method} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`); + instance.log.error(`${LOG_PREFIX}${methodName} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`); // Handling is not the adapter responsibility. throw err; }); } return result; - } + }; - if (instance._notReadyCommandsQueue) { - return new Promise((res, rej) => { - instance._notReadyCommandsQueue.unshift({ - resolve: res, - reject: rej, - command: commandWrapper, - name: method.toUpperCase() - }); - }); - } else { - return commandWrapper(); - } + return wrapWithQueueOrExecute(methodName, commandWrapper); + }; + }; + + // Wrap regular async methods to track timeouts and queue when Redis is not yet executing commands. + METHODS_TO_PROMISE_WRAP.forEach(methodName => { + const originalFn = instance[methodName]; + instance[methodName] = wrapCommand(originalFn, methodName); + }); + + // Special handling for pipeline~like methods. We need to wrap the async trigger, which is exec, but return the Pipeline right away. + METHODS_TO_PROMISE_WRAP_EXEC.forEach(methodName => { + const originalFn = instance[methodName]; + // "First level wrapper" to handle the sync execution and wrap async, queueing later if applicable. + instance[methodName] = function () { + instance.log.debug(`${LOG_PREFIX} Creating ${methodName}.`); + + const res = originalFn.apply(instance, arguments); + const originalExec = res.exec; + + res.exec = wrapCommand(originalExec, methodName + '.exec').bind(res); + + return res; }; }); }