-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
163 lines (139 loc) · 5.51 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
const { Mutex } = require('async-mutex');
const util = require('util');
const EventEmitter = require('events').EventEmitter
const supportedMethods = 'all,destroy,clear,length,get,set,touch'.split(',');
const supportedLockingMethods = 'set,get,destroy,touch'.split(','); // supportedLockingMethods will always have key as first arg and wait as last
/**
* This is a generic implentation of a key-value store.
* It can be implemented using any 'session stores' that is compatible
* with expressjs session middleware https://github.com/expressjs/session#compatible-session-stores
*/
class GenericStore {
/**
*
* @param {string} storeType
* @param {Object} storeOptions
*/
constructor(storeType = '', storeOptions = {}) {
// super()
// advisory locking, at the object level & in memory only
this.locks = {} // {key: mutex} // { key: { mutex: {}, waitQueue: [] } }
this.storeType = storeType
this.storeOptions = storeOptions
try {
require.resolve(storeType)
} catch (e) {
throw new Error(`The module could not be found. Try installing it first: npm install ${storeType}`)
}
let _Store = require(storeType)({ Store})
this.store = new _Store(storeOptions)
Object.setPrototypeOf(GenericStore.prototype, this.store.__proto__)
let allMethods = getAllMethods(this.store)
for (let f of allMethods) {
if (supportedMethods.includes(f)) {
// console.log('f + Prom for - ', f)
// this[f + 'Prom'] = util.promisify(this.store[f]).bind(this.store)
if (supportedLockingMethods.includes(f)) {
let p = util.promisify(this.store[f]).bind(this.store)
this[f + 'Prom'] = async (...args) => {
let key = args[0]
let mx = this.locks[key]
let wait
let checkIfWait = args.slice(-1)[0]
if (typeof checkIfWait === 'object' && Object.keys(checkIfWait).includes('wait')) {
wait = checkIfWait.wait
args = args.slice(0, -1)
} else {
wait = true
}
if (mx && mx.isLocked()) {
// already locked
if (wait) {
// wait
await mx.acquire()
// release as we are not locking it
await mx.release()
return p(...args)
} else {
// error out
throw new Error(`the resource with key is already locked: ${key}`)
}
}
else {
return p(...args)
}
}
} else {
this[f + 'Prom'] = util.promisify(this.store[f]).bind(this.store)
}
}
}
// Placeholder for addnMethods if required to be added in the future
// for (let f of addnMethods) {
// if (allMethods.includes(f)) {
// this[f + 'Prom'] = util.promisify(this.store[f]).bind(this.store)
// }
// }
} // constructor
lockProm = async function (key, options = { wait: true }) {
let mx = this.locks[key]
let wait = options.wait
if (mx && mx.isLocked()) {
// already locked
if (wait) {
// wait here
await mx.acquire()
} else {
// error out
throw new Error(`the resource with key is already locked: ${key}`)
}
} else {
mx = new Mutex()
this.locks[key] = mx
await mx.acquire()
}
return new KV(this.store, key, mx, this.storeType, this.storeOptions)
}
}
class KV {
constructor(store, key, mutex, storeType, storeOptions) {
// super()
this.key = key
this.mutex = mutex
this.store = store
Object.setPrototypeOf(KV.prototype, this.store.__proto__)
let allMethods = getAllMethods(this.store)
for (let f of allMethods) {
if (supportedMethods.includes(f)) {
if (supportedLockingMethods.includes(f)) {
this[f + 'Prom'] = util.promisify(this.store[f]).bind(this.store, this.key)
} else {
this[f + 'Prom'] = util.promisify(this.store[f]).bind(this.store)
}
}
}
}
unlockProm = async function () {
// TODO: is releasr a sync method? if y, remove await
await this.mutex.release()
// delete this
}
}
function getAllMethods(origObj) {
var props = [];
var obj = origObj;
do {
props = props.concat(Object.getOwnPropertyNames(obj));
} while (obj = Object.getPrototypeOf(obj));
return props.sort().filter(function (e, i, arr) {
if (e != arr[i + 1] && typeof origObj[e] == 'function') return true;
});
}
/**
* This is the Store class that is passed into every store upon initialization by express
*/
function Store() {
EventEmitter.call(this)
}
util.inherits(Store, EventEmitter)
module.exports = GenericStore;