-
Notifications
You must be signed in to change notification settings - Fork 0
/
trading-service.js
154 lines (130 loc) · 6.08 KB
/
trading-service.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
const Queue = require("bull");
require('dotenv').config();
const {initLogger, captureConsoleLog} = require("./src/utils/logger");
const { balanceWorker } = require('./src/workers/balance-worker');
const { orderWorker } = require('./src/workers/order-worker');
const { tradeWorker } = require('./src/workers/trade-worker');
const Redis = require("ioredis");
const Redlock = require("redlock");
const { orderSenderWorker } = require("./src/workers/order-sender-worker");
const { startStopProcessPromise } = require("./src/workers/start-stop-worker");
const { recoverOrdersWorkerPromise } = require("./src/workers/recover-orders-worker");
const { broadcastWorkerPromise } = require("./src/workers/broadcast-worker");
const {NotificationEventService} = require('./src/services/NotificationEventService');
const OrderSenderEventService = require('./src/services/OrderSenderEventService');
const OrderEventService = require("./src/services/OrderEventService");
const TradeEventService = require("./src/services/TradeEventService");
const GridNoFundsEventService = require("./src/services/GridNoFundsEventService");
const GridDirtyEventService = require("./src/services/GridDirtyEventService");
const CheckAccountDepositEventService = require("./src/services/CheckAccountDepositEventService");
const StopGridEventService = require('./src/services/StopGridEventService');
const LockService = require("./src/services/LockService");
const { checkDepositWorker } = require("./src/workers/deposit-worker");
const { gridNoFundsWorker } = require("./src/workers/gridnofunds-worker");
const { gridDirtyWorker } = require("./src/workers/gridDirtyWorker");
const { tradingServiceBootstrap } = require("./src/bootstrap");
const { recoverTradesWorkerPromise } = require("./src/workers/recover-trades-worker");
const { stopGridWorker } = require("./src/workers/stop-grid-worker");
require('events').defaultMaxListeners = 20;
/** @typedef {import('./src/services/TradeEventService').TradeDataEvent} TradeDataEvent */
/** @typedef {import('./src/services/OrderEventService').OrderDataEvent} OrderDataEvent */
/** @typedef {import('./src/services/BalanceEventService').BalanceDataEvent} BalanceDataEvent */
initLogger(
process.env.LOGGER_SERVICE_ALL_FILE || 'logs/service-all.log' ,
process.env.LOGGER_SERVICE_ERROR_FILE || 'logs/service-error.log',
);
captureConsoleLog();
const redisConnOpts = {
maxRetriesPerRequest: null,
enableReadyCheck: false,
host: process.env.REDIS_SERVER || "127.0.0.1",
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD,
};
const client = new Redis(redisConnOpts);
const subscriber = new Redis(redisConnOpts);
const opts = {
// redisOpts here will contain at least a property of
// connectionName which will identify the queue based on its name
createClient: function (type, redisOpts) {
switch (type) {
case "client":
return client;
case "subscriber":
return subscriber;
case "bclient":
return new Redis(redisConnOpts, redisOpts);
default:
throw new Error("Unexpected connection type: ", type);
}
},
};
const redisLock = new Redis(redisConnOpts);
// Here we pass our client to redlock.
const redlock = new Redlock(
// You should have one client for each independent node
// or cluster.
[redisLock],
{
// The expected clock drift; for more details see:
// http://redis.io/topics/distlock
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
// The max number of times Redlock will attempt to lock a resource
// before erroring.
retryCount: 120,
// the time in ms between attempts
retryDelay: 1000, // time in ms
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200, // time in ms
// The minimum remaining time on a lock before an extension is automatically
// attempted with the `using` API.
automaticExtensionThreshold: 500, // time in ms
}
);
const myTradesQueue = new Queue("myTrades", opts);
const myOrdersQueue = new Queue("myOrders", opts);
const myBalanceQueue = new Queue("myBalance", opts);
const myOrderSenderQueue = new Queue("myOrderSender", opts);
const myNotificationQueue = new Queue("myNotification", opts);
const myGridNoFundsQueue = new Queue("myGridNoFunds", opts);
const myGridDirtyQueue = new Queue("myGridDirty", opts);
const myCheckAccountDepositQueue = new Queue("myCheckAccountDeposit", opts);
const myStopGridQueue = new Queue("myStopGridQueue", opts);
OrderEventService.init(myOrdersQueue);
TradeEventService.init(myTradesQueue);
OrderSenderEventService.init(myOrderSenderQueue);
NotificationEventService.init(myNotificationQueue);
GridNoFundsEventService.init(myGridNoFundsQueue);
GridDirtyEventService.init(myGridDirtyQueue);
CheckAccountDepositEventService.init(myCheckAccountDepositQueue);
StopGridEventService.init(myStopGridQueue);
LockService.init(redlock);
// wait for orders from redis server
myOrdersQueue.process(orderWorker);
// wait for trades from redis server
myTradesQueue.process(tradeWorker);
myOrderSenderQueue.process(orderSenderWorker);
// wait for balance from redis server
myBalanceQueue.process(balanceWorker);
myCheckAccountDepositQueue.process(checkDepositWorker);
myGridNoFundsQueue.process(gridNoFundsWorker);
myGridDirtyQueue.process(gridDirtyWorker);
myStopGridQueue.process(stopGridWorker);
tradingServiceBootstrap().then(res=> console.log("bootstrap executed")).catch(ex => console.error("Error in bootstrap", ex));
const promises = [
startStopProcessPromise(),
recoverOrdersWorkerPromise(),
recoverTradesWorkerPromise(),
broadcastWorkerPromise(),
];
Promise.race(promises.map(x => x.promise))
.then(res => {
console.error("One process has finished, stopping all");
promises.forEach(x => x.cancel())
})
.catch(ex => {
console.error("Error waiting processes: ", ex);
promises.forEach(x => x.cancel());
});