Skip to content

Commit

Permalink
Merge pull request #266 from splitio/sdks-7658
Browse files Browse the repository at this point in the history
[SDKS-7658] Handle timeouts and queueing of Redis pipeline operations
  • Loading branch information
EmilianoSanchez authored Dec 1, 2023
2 parents b06e1a2 + b49ae03 commit 67d0e61
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
1.12.0 (December XX, 2023)
- Added support for Flag Sets in "consumer" and "partial consumer" modes for Pluggable and Redis storages.
- Updated evaluation flow to log a warning when using flag sets that don't contain cached feature flags.
- Updated Redis adapter to handle timeouts and queueing of some missing commands: 'hincrby' and 'popNRaw'.
- Updated Redis adapter to handle timeouts and queueing of some missing commands: 'hincrby', 'popNRaw', and 'pipeline.exec'.
- Bugfixing - Fixed manager methods in consumer modes to return results in a promise when the SDK is not operational (not ready or destroyed).

1.11.0 (November 3, 2023)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@splitsoftware/splitio-commons",
"version": "1.12.1-rc.2",
"version": "1.12.1-rc.4",
"description": "Split Javascript SDK common components",
"main": "cjs/index.js",
"module": "esm/index.js",
Expand Down
40 changes: 31 additions & 9 deletions src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ioredis from 'ioredis';
import ioredis, { Pipeline } from 'ioredis';
import { ILogger } from '../../logger/types';
import { merge, isString } from '../../utils/lang';
import { _Set, setToArray, ISet } from '../../utils/lang/sets';
Expand All @@ -9,6 +9,7 @@ const LOG_PREFIX = 'storage:redis-adapter: ';

// If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis.
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'hincrby', 'popNRaw'];
const METHODS_TO_PROMISE_WRAP_EXEC = ['pipeline'];

// Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere.
const DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -72,15 +73,16 @@ export class RedisAdapter extends ioredis {
_setTimeoutWrappers() {
const instance: Record<string, any> = this;

METHODS_TO_PROMISE_WRAP.forEach(methodName => {
const originalMethod = instance[methodName];

instance[methodName] = function () {
const wrapCommand = (originalMethod: Function, methodName: string) => {
// The value of "this" in this function should be the instance actually executing the method. It might be the instance referred (the base one)
// or it can be the instance of a Pipeline object.
return function (this: RedisAdapter | Pipeline) {
const params = arguments;
const caller = this;

function commandWrapper() {
instance.log.debug(`${LOG_PREFIX}Executing ${methodName}.`);
const result = originalMethod.apply(instance, params);
const result = originalMethod.apply(caller, params);

if (thenable(result)) {
// For handling pending commands on disconnect, add to the set and remove once finished.
Expand All @@ -103,10 +105,10 @@ export class RedisAdapter extends ioredis {
}

if (instance._notReadyCommandsQueue) {
return new Promise((res, rej) => {
return new Promise((resolve, reject) => {
instance._notReadyCommandsQueue.unshift({
resolve: res,
reject: rej,
resolve,
reject,
command: commandWrapper,
name: methodName.toUpperCase()
});
Expand All @@ -115,6 +117,26 @@ export class RedisAdapter extends ioredis {
return commandWrapper();
}
};
};

// Wrap regular async methods to track timeouts and queue when Redis is not yet executing commands.
METHODS_TO_PROMISE_WRAP.forEach(methodName => {
const originalFn = instance[methodName];
instance[methodName] = wrapCommand(originalFn, methodName);
});

// Special handling for pipeline~like methods. We need to wrap the async trigger, which is exec, but return the Pipeline right away.
METHODS_TO_PROMISE_WRAP_EXEC.forEach(methodName => {
const originalFn = instance[methodName];
// "First level wrapper" to handle the sync execution and wrap async, queueing later if applicable.
instance[methodName] = function () {
const res = originalFn.apply(instance, arguments);
const originalExec = res.exec;

res.exec = wrapCommand(originalExec, methodName + '.exec').bind(res);

return res;
};
});
}

Expand Down
42 changes: 32 additions & 10 deletions src/storages/inRedis/__tests__/RedisAdapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@ const LOG_PREFIX = 'storage:redis-adapter: ';

// The list of methods we're wrapping on a promise (for timeout) on the adapter.
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'hincrby', 'popNRaw'];
const METHODS_TO_PROMISE_WRAP_EXEC = ['pipeline'];

const pipelineExecMock = jest.fn(() => Promise.resolve('exec'));
const ioredisMock = reduce([...METHODS_TO_PROMISE_WRAP, 'disconnect'], (acc, methodName) => {
acc[methodName] = jest.fn(() => Promise.resolve(methodName));
return acc;
}, reduce(METHODS_TO_PROMISE_WRAP_EXEC, (acc, methodName) => {
acc[methodName] = jest.fn(() => {
const pipelineAlikeMock = Object.assign(reduce(METHODS_TO_PROMISE_WRAP, (acc, methodName) => {
acc[methodName] = jest.fn(() => pipelineAlikeMock);
return acc;
}, {}), {
exec: pipelineExecMock
});

return pipelineAlikeMock;
});
return acc;
}, {
once: jest.fn()
}) as { once: jest.Mock };
}) as { once: jest.Mock });

let constructorParams: any = false;

Expand Down Expand Up @@ -247,14 +261,16 @@ describe('STORAGE Redis Adapter', () => {
url: 'redis://localhost:6379/0'
});

forEach(METHODS_TO_PROMISE_WRAP, methodName => {
forEach([...METHODS_TO_PROMISE_WRAP, ...METHODS_TO_PROMISE_WRAP_EXEC], methodName => {
expect(instance[methodName]).not.toBe(ioredisMock[methodName]); // Method "${methodName}" from ioredis library should be wrapped.
expect(ioredisMock[methodName]).not.toBeCalled(); // Checking that the method was not called yet.

const startingQueueLength = instance._notReadyCommandsQueue.length;

// We do have the commands queue on this state, so a call for this methods will queue the command.
const wrapperResult = instance[methodName](methodName);
const wrapperResult = METHODS_TO_PROMISE_WRAP_EXEC.includes(methodName) ?
instance[methodName](methodName).exec() :
instance[methodName](methodName);
expect(wrapperResult instanceof Promise).toBe(true); // The result is a promise since we are queueing commands on this state.

expect(instance._notReadyCommandsQueue.length).toBe(startingQueueLength + 1); // The queue should have one more item.
Expand All @@ -263,19 +279,24 @@ describe('STORAGE Redis Adapter', () => {
expect(typeof queuedCommand.resolve).toBe('function'); // The queued item should have the correct form.
expect(typeof queuedCommand.reject).toBe('function'); // The queued item should have the correct form.
expect(typeof queuedCommand.command).toBe('function'); // The queued item should have the correct form.
expect(queuedCommand.name).toBe(methodName.toUpperCase()); // The queued item should have the correct form.
expect(queuedCommand.name).toBe((METHODS_TO_PROMISE_WRAP_EXEC.includes(methodName) ? methodName + '.exec' : methodName).toUpperCase()); // The queued item should have the correct form.
});

instance._notReadyCommandsQueue = false; // Remove the queue.
loggerMock.error.resetHistory;

forEach(METHODS_TO_PROMISE_WRAP, (methodName, index) => {
forEach([...METHODS_TO_PROMISE_WRAP, ...METHODS_TO_PROMISE_WRAP_EXEC], (methodName, index) => {
// We do NOT have the commands queue on this state, so a call for this methods will execute the command.
expect(ioredisMock[methodName]).not.toBeCalled(); // Control assertion - Original method (${methodName}) was not called yet
if (METHODS_TO_PROMISE_WRAP.includes(methodName)) expect(ioredisMock[methodName]).not.toBeCalled(); // Control assertion - Original method (${methodName}) was not called yet
else expect(pipelineExecMock).not.toBeCalled(); // Control assertion - Original Pipeline exec method was not called yet

const previousTimeoutCalls = timeout.mock.calls.length;
let previousRunningCommandsSize = instance._runningCommands.size;
instance[methodName](methodName).catch(() => { }); // Swallow exception so it's not spread to logs.

(METHODS_TO_PROMISE_WRAP_EXEC.includes(methodName) ?
instance[methodName](methodName).exec() :
instance[methodName](methodName)
).catch(() => { }); // Swallow exception so it's not spread to logs.
expect(ioredisMock[methodName]).toBeCalled(); // Original method (${methodName}) is called right away (through wrapper) when we are not queueing anymore.
expect(instance._runningCommands.size).toBe(previousRunningCommandsSize + 1); // If the result of the operation was a thenable it will add the item to the running commands queue.

Expand All @@ -290,7 +311,7 @@ describe('STORAGE Redis Adapter', () => {
commandTimeoutResolver.rej('test');
setTimeout(() => { // Allow the promises to tick.
expect(instance._runningCommands.has(commandTimeoutResolver.originalPromise)).toBe(false); // After a command finishes with error, it's promise is removed from the instance._runningCommands queue.
expect(loggerMock.error.mock.calls[index]).toEqual([`${LOG_PREFIX}${methodName} operation threw an error or exceeded configured timeout of 5000ms. Message: test`]); // The log error method should be called with the corresponding messages, depending on the method, error and operationTimeout.
expect(loggerMock.error.mock.calls[index]).toEqual([`${LOG_PREFIX}${METHODS_TO_PROMISE_WRAP_EXEC.includes(methodName) ? methodName + '.exec' : methodName} operation threw an error or exceeded configured timeout of 5000ms. Message: test`]); // The log error method should be called with the corresponding messages, depending on the method, error and operationTimeout.
}, 0);
});

Expand All @@ -306,9 +327,10 @@ describe('STORAGE Redis Adapter', () => {

instance._notReadyCommandsQueue = false; // Connection is "ready"

forEach(METHODS_TO_PROMISE_WRAP, methodName => {
forEach([...METHODS_TO_PROMISE_WRAP, ...METHODS_TO_PROMISE_WRAP_EXEC], methodName => {
// Just call the wrapped method, we don't care about all the paths tested on the previous case, just how it behaves when the command is resolved.
instance[methodName](methodName);
if (METHODS_TO_PROMISE_WRAP_EXEC.includes(methodName)) instance[methodName](methodName).exec();
else instance[methodName](methodName);
// Get the original promise (the one passed to timeout)
const commandTimeoutResolver = timeoutPromiseResolvers[0];

Expand Down

0 comments on commit 67d0e61

Please sign in to comment.