-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.js
136 lines (120 loc) · 4.17 KB
/
run.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
'use strict';
const {BigQuery} = require('@google-cloud/bigquery');
const https = require('https');
const fs = require('fs');
var apiConfig = "config.json"
var datasetId = 'mev_relayer_data';
var tableId = 'blocks';
var dataQueryIntervalInMs = 10000; // 10 secs
var dataApis;
// Load API endpoints and latest processed slot number from config
loadApiConfig();
function loadApiConfig() {
try {
const apiConfigData = fs.readFileSync(apiConfig, 'utf8');
dataApis = JSON.parse(apiConfigData);
} catch (err) {
console.error(err);
}
}
function updateApiConfig() {
try {
fs.writeFileSync(apiConfig, JSON.stringify(dataApis));
} catch (err) {
console.error(err);
}
}
// Regularly queries each relayer data API and stores the data in BigQuery
async function main() {
const bigquery = new BigQuery();
setInterval(function () {
console.log(`${new Date().toISOString()} - Checking for relayer updates...`);
dataApis.forEach( async(dataApi) => {
var prevSlotNumber = dataApi.min_slot_number;
try {
var blocks = await getBlocksFromRelayer(dataApi);
if (blocks.length > 0) {
console.log(`Found ${blocks.length} new blocks from ${dataApi.relay_operator}-${dataApi.relay_name} relayer`);
await storeBlockRecord(bigquery, blocks);
// Successfully stored, update config with latest block
updateApiConfig();
}
}
catch(e) {
console.log(JSON.stringify(e));
// revert slot number
dataApi.min_slot_number = prevSlotNumber;
}
});
}, dataQueryIntervalInMs);
}
// In case of reorgs there could be multiple bids per slot.
async function getBlocksFromRelayer(dataApi) {
var dataPath = `/relay/v1/data/bidtraces/proposer_payload_delivered?limit=100`;
var url = dataApi.relay_url + dataPath;
return new Promise ((resolve, reject) => {
var jsonResult = '';
var req = https.get(url, function(res) {
res.setEncoding('utf8');
res.on('data', function (chunk) {
jsonResult += chunk;
});
res.on('end', function () {
try {
var parsedBlocks = JSON.parse(jsonResult);
var unstoredBlocks = [];
var greatest_slot = dataApi.min_slot_number;
parsedBlocks.forEach(block => {
block.relay_operator = dataApi.relay_operator;
block.relay_name = dataApi.relay_name;
block.timestamp = new Date().toISOString();
if (block.slot > dataApi.min_slot_number) {
unstoredBlocks.push(block);
if (Number.parseInt(block.slot) > greatest_slot) {
greatest_slot = Number.parseInt(block.slot);
}
}
})
dataApi.min_slot_number = greatest_slot;
resolve(unstoredBlocks);
}
catch(e) {
console.log(JSON.stringify(e));
console.log("Unable to parse response from server " + jsonResult);
reject(e)
}
});
});
req.on('error', function(e) {
console.log(e.message);
reject(e)
});
req.end();
});
}
async function storeBlockRecord(bigquery, blocks) {
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(blocks);
console.log(`Inserted ${blocks.length} rows`);
}
async function getLatestSlotFromBeacon() {
return new Promise ((resolve, reject) => {
var req = https.get(beaconApiServer + beaconHeaderApiPath, function(res) {
res.setEncoding('utf8');
res.on('data', function (chunk) {
var body = JSON.parse(chunk);
var latestSlot = Number.parseInt(body.data[0].header.message.slot);
resolve(latestSlot);
});
});
req.on('error', function(e) {
console.log(e.message);
reject(e)
});
req.end();
});
}
main();