-
Notifications
You must be signed in to change notification settings - Fork 0
/
BucketClient.js
67 lines (53 loc) · 1.62 KB
/
BucketClient.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
const { Dealer } = require("zeromq");
class BucketSocket {
constructor() {
this.socket = new Dealer();
this.messageHandler();
this.service = null;
this.proto = null;
this.waiting = new Map();
}
start(proto) {
this.proto = proto;
this.service = proto.discord.lookup("DiscordAPI")
.create(this.rpc.bind(this), false, false);
this.socket.connect(`tcp://discord-bucket-zmq-proxy:${process.env.DISCORD_BUCKET_ZMQ_PROXY_SERVICE_PORT_ROUTER}`);
}
close() {
this.socket.close();
}
rpc(method, data, callback) {
const id = (Date.now() + process.hrtime().reduce((a, b) => a + b)).toString(36);
const buffer = this.proto.rpc.lookup("Request").encode({
id: id,
name: method.name,
requestType: method.requestType,
data: data
}).finish();
this.socket.send(buffer);
this.waiting.set(id, response => {
this.waiting.delete(id);
if(response.responseType === "discord.types.HTTPError") {
const httpError = this.proto.discord.lookup("discord.types.HTTPError").decode(response.data);
const error = new Error(httpError.message);
error.code = httpError.code;
error.status = httpError.status;
return callback(error, null);
} else {
return callback(null, response.data);
}
});
}
async request(type, data = {}) {
return await this.service[type](data);
}
async messageHandler() {
while(!this.socket.closed) {
const [message] = await this.socket.receive();
const response = this.proto.rpc.lookup("Response");
const decoded = response.decode(message);
if(this.waiting.has(decoded.id)) this.waiting.get(decoded.id)(decoded);
}
}
}
module.exports = BucketSocket;