Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Add shared context among Redis instances #1110

Merged
merged 2 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ module.exports = {
testMatch: ['**/test/**/*.js'],
coverageDirectory: 'coverage',
setupFiles: ['<rootDir>/testSetupBabel.js'],
setupFilesAfterEnv: ['<rootDir>/testSetupAfterEnv.js'],
};
18 changes: 17 additions & 1 deletion src/commands/flushall.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
import contextMap, { createContext } from '../context';
import { createData } from '../data';
import { createExpires } from '../expires';

export function flushall() {
this.data.clear();
const oldContext = contextMap.get(this.keyData);
const newContext = createContext(oldContext.keyPrefix);

contextMap.set(this.keyData, newContext);

this.expires = createExpires(newContext.expires, newContext.keyPrefix);
this.data = createData(
newContext.data,
this.expires,
{},
newContext.keyPrefix
);

return 'OK';
}
2 changes: 1 addition & 1 deletion src/commands/flushdb.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function flushdb() {
this.data.clear();
this.flushall();
return 'OK';
}
19 changes: 19 additions & 0 deletions src/context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { EventEmitter } from 'events';
import { createSharedData } from './data';
import { createSharedExpires } from './expires';

const contextMap = new Map();

export default contextMap;

export function createContext(keyPrefix) {
const expires = createSharedExpires();

return {
channels: new EventEmitter(),
expires,
data: createSharedData(expires),
patternChannels: new EventEmitter(),
keyPrefix,
};
}
144 changes: 80 additions & 64 deletions src/data.js
Original file line number Diff line number Diff line change
@@ -1,79 +1,95 @@
import { assign } from 'lodash';

export default function createData(
expiresInstance,
initial = {},
keyPrefix = ''
) {
export function createSharedData(sharedExpires) {
let raw = {};

function createInstance(prefix, expires) {
return Object.freeze({
clear() {
raw = {};
},
delete(key) {
if (expires.has(key)) {
expires.delete(key);
}
delete raw[`${prefix}${key}`];
},
get(key) {
if (expires.has(key) && expires.isExpired(key)) {
this.delete(key);
}
return Object.freeze({
clear() {
raw = {};
},
delete(key) {
if (sharedExpires.has(key)) {
sharedExpires.delete(key);
}
delete raw[key];
},
get(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
this.delete(key);
}

const value = raw[`${prefix}${key}`];
const value = raw[key];

if (Array.isArray(value)) {
return value.slice();
}
if (Array.isArray(value)) {
return value.slice();
}

if (Buffer.isBuffer(value)) {
return Buffer.from(value);
}
if (Buffer.isBuffer(value)) {
return Buffer.from(value);
}

if (value instanceof Set) {
return new Set(value);
}
if (value instanceof Set) {
return new Set(value);
}

if (value instanceof Map) {
return new Map(value);
}
if (value instanceof Map) {
return new Map(value);
}

if (typeof value === 'object' && value) {
return assign({}, value);
}
if (typeof value === 'object' && value) {
return assign({}, value);
}

return value;
},
has(key) {
if (expires.has(key) && expires.isExpired(key)) {
this.delete(key);
}
return value;
},
has(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
this.delete(key);
}

return {}.hasOwnProperty.call(raw, `${prefix}${key}`);
},
keys() {
return Object.keys(raw);
},
set(key, val) {
let item = val;

if (Array.isArray(val)) {
item = val.slice();
} else if (Buffer.isBuffer(val)) {
item = Buffer.from(val);
} else if (val instanceof Set) {
item = new Set(val);
} else if (val instanceof Map) {
item = new Map(val);
} else if (typeof val === 'object' && val) {
item = assign({}, val);
}

raw[`${prefix}${key}`] = item;
},
return {}.hasOwnProperty.call(raw, key);
},
keys(prefix) {
const keys = Object.keys(raw);

if (!prefix) return keys;

return keys.filter((key) => key.startsWith(prefix));
},
set(key, val) {
let item = val;

if (Array.isArray(val)) {
item = val.slice();
} else if (Buffer.isBuffer(val)) {
item = Buffer.from(val);
} else if (val instanceof Set) {
item = new Set(val);
} else if (val instanceof Map) {
item = new Map(val);
} else if (typeof val === 'object' && val) {
item = assign({}, val);
}

raw[key] = item;
},
});
}

export function createData(
sharedData,
expiresInstance,
initial = {},
keyPrefix = ''
) {
function createInstance(prefix, expires) {
return Object.freeze({
clear: () => sharedData.clear(),
delete: (key) => sharedData.delete(`${prefix}${key}`),
get: (key) => sharedData.get(`${prefix}${key}`),
has: (key) => sharedData.has(`${prefix}${key}`),
keys: () => sharedData.keys(prefix),
set: (key, val) => sharedData.set(`${prefix}${key}`, val),
withKeyPrefix(newKeyPrefix) {
if (newKeyPrefix === prefix) return this;
return createInstance(
Expand Down
44 changes: 28 additions & 16 deletions src/expires.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
export default function createExpires(keyPrefix = '') {
export function createSharedExpires() {
const expires = {};

return Object.freeze({
get(key) {
return expires[key];
},
set(key, timestamp) {
expires[key] = +timestamp;
},
has(key) {
return {}.hasOwnProperty.call(expires, key);
},
isExpired(key) {
return expires[key] <= Date.now();
},
delete(key) {
delete expires[key];
},
});
}

export function createExpires(sharedExpires, keyPrefix = '') {
function createInstance(prefix) {
return {
get(key) {
return expires[`${prefix}${key}`];
},
set(key, timestamp) {
expires[`${prefix}${key}`] = +timestamp;
},
has(key) {
return {}.hasOwnProperty.call(expires, `${prefix}${key}`);
},
isExpired(key) {
return expires[`${prefix}${key}`] <= Date.now();
},
delete(key) {
delete expires[`${prefix}${key}`];
},
get: (key) => sharedExpires.get(`${prefix}${key}`),
set: (key, timestamp) => sharedExpires.set(`${prefix}${key}`, timestamp),
has: (key) => sharedExpires.has(`${prefix}${key}`),
isExpired: (key) => sharedExpires.isExpired(`${prefix}${key}`),
delete: (key) => sharedExpires.delete(`${prefix}${key}`),
withKeyPrefix(newPrefix) {
if (newPrefix === prefix) return this;
return createInstance(newPrefix);
},
};
}

return createInstance(keyPrefix);
}
54 changes: 48 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import redisCommands from 'redis-commands';
import * as commands from './commands';
import * as commandsStream from './commands-stream';
import createCommand, { Command } from './command';
import createData from './data';
import createExpires from './expires';
import emitConnectEvent from './commands-utils/emitConnectEvent';
import Pipeline from './pipeline';
import promiseContainer from './promise-container';
import parseKeyspaceEvents from './keyspace-notifications';
import contextMap, { createContext } from './context';
import { createExpires } from './expires';
import { createData } from './data';

const defaultOptions = {
data: {},
keyPrefix: '',
lazyConnect: false,
notifyKeyspaceEvents: '', // string pattern as specified in https://redis.io/topics/notifications#configuration e.g. 'gxK'
host: 'localhost',
port: '6379',
};

class RedisMock extends EventEmitter {
Expand All @@ -29,8 +32,7 @@ class RedisMock extends EventEmitter {

constructor(options = {}) {
super();
this.channels = new EventEmitter();
this.patternChannels = new EventEmitter();

this.batch = undefined;
this.connected = false;
this.subscriberMode = false;
Expand All @@ -41,9 +43,19 @@ class RedisMock extends EventEmitter {
// eslint-disable-next-line prefer-object-spread
const optionsWithDefault = Object.assign({}, defaultOptions, options);

this.expires = createExpires(optionsWithDefault.keyPrefix);
this.keyData = `${optionsWithDefault.host}:${optionsWithDefault.port}`;

if (!contextMap.get(this.keyData)) {
const context = createContext(optionsWithDefault.keyPrefix);

contextMap.set(this.keyData, context);
}

const context = contextMap.get(this.keyData);

this.expires = createExpires(context.expires, optionsWithDefault.keyPrefix);
this.data = createData(
context.data,
this.expires,
optionsWithDefault.data,
optionsWithDefault.keyPrefix
Expand All @@ -61,6 +73,36 @@ class RedisMock extends EventEmitter {
}
}

get channels() {
return contextMap.get(this.keyData).channels;
}

set channels(channels) {
const oldContext = contextMap.get(this.keyData);

const newContext = {
...oldContext,
channels,
};

contextMap.set(this.keyData, newContext);
}

get patternChannels() {
return contextMap.get(this.keyData).patternChannels;
}

set patternChannels(patternChannels) {
const oldContext = contextMap.get(this.keyData);

const newContext = {
...oldContext,
patternChannels,
};

contextMap.set(this.keyData, newContext);
}

multi(batch = []) {
this.batch = new Pipeline(this);
// eslint-disable-next-line no-underscore-dangle
Expand Down Expand Up @@ -106,7 +148,7 @@ class RedisMock extends EventEmitter {
}

duplicate() {
return this.createConnectedClient()
return this.createConnectedClient();
}

// eslint-disable-next-line class-methods-use-this
Expand Down
12 changes: 0 additions & 12 deletions test/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ import _ from 'lodash';
import Redis from 'ioredis';
import command from '../src/command';

// Ensure that we're getting the correct instance of Command when running in test:jest.js, as jest.js isn't designed to test code directly imported private functions like src/command
jest.mock('ioredis', () => {
const { Command } = jest.requireActual('ioredis');
const RedisMock = jest.requireActual('../src/index');

return {
__esModule: true,
Command,
default: RedisMock,
};
});

describe('basic command', () => {
const stub = command((...args) => args, 'testCommandName', {
Command: { transformers: { argument: {}, reply: {} } },
Expand Down
Loading