Skip to content

Commit

Permalink
Handling pipeline method properly in RedisAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Zelaya authored and Nicolas Zelaya committed Nov 6, 2023
1 parent 5912dc8 commit 6268360
Showing 1 changed file with 48 additions and 23 deletions.
71 changes: 48 additions & 23 deletions src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { timeout } from '../../utils/promise/timeout';
const LOG_PREFIX = 'storage:redis-adapter: ';

// If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis.
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'pipeline', 'expire', 'mget', 'lrange', 'ltrim', 'hset'];
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset'];
const METHODS_TO_PROMISE_WRAP_EXEC = ['pipeline'];

// Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere.
const DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -56,6 +57,7 @@ export class RedisAdapter extends ioredis {
this.once('ready', () => {
const commandsCount = this._notReadyCommandsQueue ? this._notReadyCommandsQueue.length : 0;
this.log.info(LOG_PREFIX + `Redis connection established. Queued commands: ${commandsCount}.`);

this._notReadyCommandsQueue && this._notReadyCommandsQueue.forEach(queued => {
this.log.info(LOG_PREFIX + `Executing queued ${queued.name} command.`);
queued.command().then(queued.resolve).catch(queued.reject);
Expand All @@ -71,49 +73,72 @@ export class RedisAdapter extends ioredis {
_setTimeoutWrappers() {
const instance: Record<string, any> = this;

METHODS_TO_PROMISE_WRAP.forEach(method => {
const originalMethod = instance[method];
const wrapWithQueueOrExecute = (method: string, commandWrapper: Function) => {
if (instance._notReadyCommandsQueue) {
return new Promise((resolve, reject) => {
instance._notReadyCommandsQueue.unshift({
resolve,
reject,
command: commandWrapper,
name: method.toUpperCase()
});
});
} else {
return commandWrapper();
}
};

instance[method] = function () {
const wrapCommand = (originalMethod: Function, methodName: string) => {
return function () {
const params = arguments;

function commandWrapper() {
instance.log.debug(LOG_PREFIX + `Executing ${method}.`);
// Return original method
const result = originalMethod.apply(instance, params);
const commandWrapper = () => {
instance.log.debug(`${LOG_PREFIX} Executing ${methodName}.`);
const result = originalMethod.apply(this, params);

Check failure on line 97 in src/storages/inRedis/RedisAdapter.ts

View workflow job for this annotation

GitHub Actions / Build

'this' implicitly has type 'any' because it does not have a type annotation.

if (thenable(result)) {
// For handling pending commands on disconnect, add to the set and remove once finished.
// On sync commands there's no need, only thenables.
instance._runningCommands.add(result);
const cleanUpRunningCommandsCb = function () {
const cleanUpRunningCommandsCb = () => {
instance._runningCommands.delete(result);
};
// Both success and error remove from queue.
result.then(cleanUpRunningCommandsCb, cleanUpRunningCommandsCb);

return timeout(instance._options.operationTimeout, result).catch(err => {
instance.log.error(LOG_PREFIX + `${method} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`);
instance.log.error(`${LOG_PREFIX}${methodName} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`);
// Handling is not the adapter responsibility.
throw err;
});
}

return result;
}
};

if (instance._notReadyCommandsQueue) {
return new Promise((res, rej) => {
instance._notReadyCommandsQueue.unshift({
resolve: res,
reject: rej,
command: commandWrapper,
name: method.toUpperCase()
});
});
} else {
return commandWrapper();
}
return wrapWithQueueOrExecute(methodName, commandWrapper);
};
};

// Wrap regular async methods to track timeouts and queue when Redis is not yet executing commands.
METHODS_TO_PROMISE_WRAP.forEach(methodName => {
const originalFn = instance[methodName];
instance[methodName] = wrapCommand(originalFn, methodName);
});

// Special handling for pipeline~like methods. We need to wrap the async trigger, which is exec, but return the Pipeline right away.
METHODS_TO_PROMISE_WRAP_EXEC.forEach(methodName => {
const originalFn = instance[methodName];
// "First level wrapper" to handle the sync execution and wrap async, queueing later if applicable.
instance[methodName] = function () {
instance.log.debug(`${LOG_PREFIX} Creating ${methodName}.`);

const res = originalFn.apply(instance, arguments);
const originalExec = res.exec;

res.exec = wrapCommand(originalExec, methodName + '.exec').bind(res);

return res;
};
});
}
Expand Down

0 comments on commit 6268360

Please sign in to comment.