-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy paththrottled-queue.js
129 lines (120 loc) · 4.13 KB
/
throttled-queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* Helper function to sleep for a specified number of milliseconds
*/
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms, 'slept'));
}
/**
* Helper function that returns the "origin" of a URL, defined in a loose way
* as the part of the true origin that identifies the server that's going to
* serve the resource.
*
* For example "github.io" for all specs under github.io, "whatwg.org" for
* all WHATWG specs, "csswg.org" for CSS specs at large (including Houdini
* and FXTF specs since they are served by the same server).
*/
function getOrigin(url) {
if (!url) {
return '';
}
const origin = (new URL(url)).origin;
if (origin.endsWith('.whatwg.org')) {
return 'https://whatwg.org';
}
else if (origin.endsWith('.github.io')) {
return 'https://github.io';
}
else if (origin.endsWith('.csswg.org') ||
origin.endsWith('.css-houdini.org') ||
origin.endsWith('.fxtf.org')) {
return 'https://csswg.org';
}
else {
return origin;
}
}
/**
* The ThrottledQueue class can be used to run a series of tasks that send
* network requests to an origin server in parallel, up to a certain limit,
* while guaranteeing that only one request will be sent to a given origin
* server at a time.
*/
export default class ThrottledQueue {
originQueue = {};
maxParallel = 4;
sleepInterval = 2000;
ongoing = 0;
pending = [];
constructor(options = { maxParallel: 4, sleepInterval: 2000 }) {
if (options.maxParallel >= 0) {
this.maxParallel = options.maxParallel;
}
if (options.sleepInterval) {
this.sleepInterval = options.sleepInterval;
}
}
/**
* Run the given processing function with the given parameters, immediately
* if possible or as soon as possible when too many tasks are already running
* in parallel.
*
* Note this function has no notion of origin. Users may call the function
* directly if they don't need any throttling per origin.
*/
async runThrottled(processFunction, ...params) {
if (this.ongoing >= this.maxParallel) {
return new Promise((resolve, reject) => {
this.pending.push({ params, resolve, reject });
});
}
else {
this.ongoing += 1;
const result = await processFunction.call(null, ...params);
this.ongoing -= 1;
// Done with current task, trigger next pending task in the background
setTimeout(_ => {
if (this.pending.length && this.ongoing < this.maxParallel) {
const next = this.pending.shift();
this.runThrottled(processFunction, ...next.params)
.then(result => next.resolve(result))
.catch(err => next.reject(err));
}
}, 0);
return result;
}
}
/**
* Run the given processing function with the given parameters, immediately
* if possible or as soon as possible when too many tasks are already running
* in parallel, or when there's already a task being run against the same
* origin as that of the provided URL.
*
* Said differently, the function serializes tasks per origin, and calls
* "runThrottled" to restrict the number of tasks that run in parallel to the
* requested maximum.
*
* Additionally, the function forces a 2 second sleep after processing to
* keep a low network profile (sleeping time can be adjusted per origin
* depending if the sleepInterval parameter that was passed to the
* constructor is a function.
*/
async runThrottledPerOrigin(url, processFunction, ...params) {
const origin = getOrigin(url);
if (!this.originQueue[origin]) {
this.originQueue[origin] = Promise.resolve(true);
}
return new Promise((resolve, reject) => {
this.originQueue[origin] = this.originQueue[origin]
.then(async _ => this.runThrottled(processFunction, ...params))
.then(async result => {
const interval = (typeof this.sleepInterval === 'function') ?
this.sleepInterval(origin) :
this.sleepInterval;
await sleep(interval);
return result;
})
.then(resolve)
.catch(reject);
});
}
}