Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using per-key basis queue #5420

Merged
Merged
105 changes: 105 additions & 0 deletions spec/RedisCacheAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,48 @@ describe_only(() => {
.then(done);
});

it('should not store value for ttl=0', done => {
const cache = new RedisCacheAdapter(null, 5);

cache
.put(KEY, VALUE, 0)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
.then(done);
});

it('should not expire when ttl=Infinity', done => {
const cache = new RedisCacheAdapter(null, 1);

cache
.put(KEY, VALUE, Infinity)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then(wait.bind(null, 1))
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then(done);
});

it('should fallback to default ttl', done => {
const cache = new RedisCacheAdapter(null, 1);
let promise = Promise.resolve();

[-100, null, undefined, 'not number', true].forEach(ttl => {
promise = promise.then(() =>
cache
.put(KEY, VALUE, ttl)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then(wait.bind(null, 2))
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
);
});

promise.then(done);
});

it('should find un-expired records', done => {
const cache = new RedisCacheAdapter(null, 5);

Expand All @@ -58,3 +100,66 @@ describe_only(() => {
.then(done);
});
});

describe_only(() => {
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
})('RedisCacheAdapter/KeyPromiseQueue', function() {
const KEY1 = 'key1';
const KEY2 = 'key2';
const VALUE = 'hello';

// number of chained ops on a single key
function getQueueCountForKey(cache, key) {
return cache.queue.queue[key][0];
}

// total number of queued keys
function getQueueCount(cache) {
return Object.keys(cache.queue.queue).length;
}

it('it should clear completed operations from queue', done => {
const cache = new RedisCacheAdapter({ ttl: NaN });

// execute a bunch of operations in sequence
let promise = Promise.resolve();
for (let index = 1; index < 100; index++) {
promise = promise.then(() => {
const key = `${index}`;
return cache
.put(key, VALUE)
.then(() => expect(getQueueCount(cache)).toEqual(0))
.then(() => cache.get(key))
.then(() => expect(getQueueCount(cache)).toEqual(0))
.then(() => cache.clear())
.then(() => expect(getQueueCount(cache)).toEqual(0));
});
}

// at the end the queue should be empty
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
});

it('it should count per key chained operations correctly', done => {
const cache = new RedisCacheAdapter({ ttl: NaN });

let key1Promise = Promise.resolve();
let key2Promise = Promise.resolve();
for (let index = 1; index < 100; index++) {
key1Promise = cache.put(KEY1, VALUE);
key2Promise = cache.put(KEY2, VALUE);
// per key chain should be equal to index, which is the
// total number of operations on that key
expect(getQueueCountForKey(cache, KEY1)).toEqual(index);
expect(getQueueCountForKey(cache, KEY2)).toEqual(index);
// the total keys counts should be equal to the different keys
// we have currently being processed.
expect(getQueueCount(cache)).toEqual(2);
}

// at the end the queue should be empty
Promise.all([key1Promise, key2Promise])
.then(() => expect(getQueueCount(cache)).toEqual(0))
.then(done);
});
});
83 changes: 0 additions & 83 deletions src/Adapters/Cache/RedisCacheAdapter.js

This file was deleted.

43 changes: 43 additions & 0 deletions src/Adapters/Cache/RedisCacheAdapter/KeyPromiseQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// KeyPromiseQueue is a simple promise queue
// used to queue operations per key basis.
// Once the tail promise in the key-queue fulfills,
// the chain on that key will be cleared.
export class KeyPromiseQueue {
constructor() {
this.queue = {};
}

enqueue(key, operation) {
const tuple = this.beforeOp(key);
const toAwait = tuple[1];
const nextOperation = toAwait.then(operation);
const wrappedOperation = nextOperation.then(result => {
this.afterOp(key);
return result;
});
tuple[1] = wrappedOperation;
return wrappedOperation;
}

beforeOp(key) {
let tuple = this.queue[key];
if (!tuple) {
tuple = [0, Promise.resolve()];
this.queue[key] = tuple;
}
tuple[0]++;
return tuple;
}

afterOp(key) {
const tuple = this.queue[key];
if (!tuple) {
return;
}
tuple[0]--;
if (tuple[0] <= 0) {
delete this.queue[key];
return;
}
}
}
101 changes: 101 additions & 0 deletions src/Adapters/Cache/RedisCacheAdapter/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import redis from 'redis';
import logger from '../../../logger';
import { KeyPromiseQueue } from './KeyPromiseQueue';

const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds
const FLUSH_DB_KEY = '__flush_db__';

function debug() {
logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]);
}

const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;

export class RedisCacheAdapter {
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
this.client = redis.createClient(redisCtx);
this.queue = new KeyPromiseQueue();
}

get(key) {
debug('get', key);
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.get(key, function(err, res) {
debug('-> get', key, res);
if (!res) {
return resolve(null);
}
resolve(JSON.parse(res));
});
})
);
}

put(key, value, ttl = this.ttl) {
value = JSON.stringify(value);
debug('put', key, value, ttl);

if (ttl === 0) {
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
return this.queue.enqueue(key, () => Promise.resolve());
}

if (ttl === Infinity) {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.set(key, value, function() {
resolve();
});
})
);
}

if (!isValidTTL(ttl)) {
ttl = this.ttl;
}

return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.psetex(key, ttl, value, function() {
resolve();
});
})
);
}

del(key) {
debug('del', key);
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.del(key, function() {
resolve();
});
})
);
}

clear() {
debug('clear');
return this.queue.enqueue(
FLUSH_DB_KEY,
() =>
new Promise(resolve => {
this.client.flushdb(function() {
resolve();
});
})
);
}
}

export default RedisCacheAdapter;