-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrottler.ts
111 lines (91 loc) · 3.28 KB
/
throttler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import { PlatformManager, platformManager } from './platformManager';
import { job } from './job';
import { workerManager, worker } from './workerManager';
interface RateLimit {
requestCount: number; // Number of requests allowed per rolling window.
windowSeconds: number; // Time window defined in seconds.
};
interface Connection {
platform: string; // A unique platform identifier.
connection: string; // A unique connection identifier.
niceness: number; // Associated workflow priority, 0 is the highest priority.
rateLimit: RateLimit; // The rate-limit for the connection.
};
export function throttle<T, U> (
connection: Connection,
fn: (arg?: T) => Promise<U>,
arg?: T,
expectedCalls: number = 1
) : Promise<U> {
const platform = connection.platform;
const niceness = connection.niceness;
const requestCount = connection.rateLimit.requestCount;
const windowSeconds = connection.rateLimit.windowSeconds;
if (!platformManagers[platform]) {
platformManagers[platform] = platformManager(platform, requestCount, windowSeconds);
}
const pm = platformManagers[platform];
const j = job(fn, niceness, expectedCalls, pm.requestTracker, arg)
const promise = j.promise;
pm.add(j);
wm.assign(); // See if there are any workers available to do the job.
return promise;
}
const platformManagers: {[platformName: string]: PlatformManager} = {};
const wm = workerManager(platformManagers);
// Tests
const w1 = worker("worker1", wm);
const w2 = worker("worker2", wm);
const w3 = worker("worker3", wm);
const w4 = worker("worker4", wm);
wm.add(w1);
wm.add(w2);
wm.add(w3);
wm.add(w4);
wm.check();
const connection: Connection = {
platform: "platform1",
connection: "connection1",
niceness: 0,
rateLimit: { requestCount: 5, windowSeconds: 10 }
};
const connection2: Connection = {
platform: "platform2",
connection: "connection2",
niceness: 0,
rateLimit: { requestCount: 5, windowSeconds: 10 }
};
const connection3: Connection = {
platform: "platform3",
connection: "connection3",
niceness: 1,
rateLimit: { requestCount: 5, windowSeconds: 10 }
};
throttle(connection, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world1!");
}, null, 1);
throttle(connection, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world2!");
}, null, 1);
throttle(connection2, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world3!");
}, null, 3);
throttle(connection2, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world4!");
}, null, 3); // Cannot complete the whole activity without exceeding the rate limit, so it will be queued.
throttle(connection3, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world5!");
}, null, 1);
const ret = throttle(connection3, async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Hello, world6!");
return "Hello, I am returned!"
}, null, 1); // Exceeds the worker capacity, so it will be queued.
ret.then((value) => {
console.log(`This was returned: ${value}`);
});