-
Notifications
You must be signed in to change notification settings - Fork 4
/
json-cache.js
113 lines (100 loc) · 2.75 KB
/
json-cache.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* Module to asynchronously download JSON (and text such as JSONata) documents, then cache them in memory and on disk.
* Handles concurrency (when multiple tasks are asking the same document at the same time).
*/
// Location of disk copy of retrieved JSON documents
const cachePath = process.env.SCHEMAS_CACHE_PATH || '/tmp/'; // TODO: use better location for cache
const crypto = require('crypto');
const fetch = require('node-fetch');
const fs = require('fs');
const util = require('util');
const readFileAsync = util.promisify(fs.readFile);
const writeFileAsync = util.promisify(fs.writeFile);
// Memory copy of retrieved JSON documents
const ramCaches = {};
// Queue of concurrent tasks waiting for the same JSON documents
const mutexQueue = {};
module.exports = node => {
'use strict';
node.debug('Disk cache path: ' + cachePath);
/**
* Returns memory copy of desired JSON document if it exists, false otherwise.
*/
function ramCache(url, parse) {
const ramCache = ramCaches[url];
if (ramCache) {
if (parse) {
if (ramCache.json) {
return ramCache.json;
}
} else {
if (ramCache.text) {
return ramCache.text;
}
}
}
return false;
}
async function loadAsync(url, parse = true) {
let result = ramCache(url, parse);
if (result) {
return result;
}
if (mutexQueue[url]) {
// Wait for another task to be done loading the same URL, so that we can use its cache
await new Promise((resolve, reject) => mutexQueue[url].push(resolve));
result = ramCache(url, parse);
if (result) {
return result;
}
} else {
mutexQueue[url] = [];
ramCaches[url] = {};
}
let text, json, hash;
if (cachePath) {
hash = cachePath + 'schema.' + crypto.createHash('sha256').update(url).digest('hex') + '.' + node.id + '.tmp.js';
try {
text = await readFileAsync(hash, 'utf8');
node.debug('Load JSON from disk cache: ' + url);
if (parse) {
json = JSON.parse(text);
ramCaches[url].json = json;
} else {
ramCaches[url].text = text;
}
} catch (ex) {
text = false;
json = false;
}
}
if (!text) {
node.log('Load JSON from Web: ' + url);
const res = await fetch(url);
if (parse) {
json = await res.json();
ramCaches[url].json = json;
} else {
text = await res.text();
ramCaches[url].text = text;
}
if (cachePath) {
try {
await writeFileAsync(hash, parse ? JSON.stringify(json) : text, 'utf8');
} catch (ex) {
node.warn('Error saving disk cache of "%s" in "%s"', url, cachePath);
}
}
}
// Resume tasks waiting for the same URL
let next;
while ((next = mutexQueue[url].shift()) != undefined) {
next(); // Resolve promise
}
delete mutexQueue[url];
return parse ? json : text;
}
return {
loadAsync,
};
};