Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nour/swift filler #300

Merged
merged 14 commits into from
Dec 23, 2024
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ export interface GlobalConfig {
/// ws endpoint to use (inferred from endpoint using web3.js rules, only provide if you want to use a different one)
wsEndpoint?: string;
hermesEndpoint?: string;
lazerEndpoint?: string;
lazerToken?: string;
numNonActiveOraclesToPush?: number;

// Optional to specify markets loaded by drift client
Expand Down
111 changes: 77 additions & 34 deletions src/experimental-bots/entrypoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import { setGlobalDispatcher, Agent } from 'undici';
import { PythPriceFeedSubscriber } from '../pythPriceFeedSubscriber';
import { SwiftMaker } from './swift/makerExample';
import { SwiftTaker } from './swift/takerExample';
import * as net from 'net';
import { PythLazerClient } from '@pythnetwork/pyth-lazer-sdk';

setGlobalDispatcher(
new Agent({
Expand Down Expand Up @@ -307,6 +309,19 @@ const runBot = async () => {
throw new Error('fillerMultithreaded bot config not found');
}

let pythLazerClient: PythLazerClient | undefined;
if (config.global.driftEnv! === 'devnet') {
if (!config.global.lazerEndpoint || !config.global.lazerToken) {
throw new Error(
'Must set environment variables LAZER_ENDPOINT and LAZER_TOKEN'
);
}
pythLazerClient = new PythLazerClient(
config.global.lazerEndpoint,
config.global.lazerToken
);
}

// Ensure that there are no duplicate market indexes in the Array<number[]> marketIndexes config
const marketIndexes = new Set<number>();
for (const marketIndexList of config.botConfigs.fillerMultithreaded
Expand Down Expand Up @@ -335,6 +350,7 @@ const runBot = async () => {
},
bundleSender,
pythPriceSubscriber,
pythLazerClient,
[]
);
bots.push(fillerMultithreaded);
Expand Down Expand Up @@ -423,46 +439,50 @@ const runBot = async () => {

// start http server listening to /health endpoint using http package
const startupTime = Date.now();
http
.createServer(async (req, res) => {
if (req.url === '/health') {
if (config.global.testLiveness) {
if (Date.now() > startupTime + 60 * 1000) {
res.writeHead(500);
res.end('Testing liveness test fail');
return;
}
}

/* @ts-ignore */
if (!driftClient.connection._rpcWebSocketConnected) {
logger.error(`Connection rpc websocket disconnected`);
const createServerCallback = async (req: any, res: any) => {
if (req.url === '/health') {
if (config.global.testLiveness) {
if (Date.now() > startupTime + 60 * 1000) {
res.writeHead(500);
res.end(`Connection rpc websocket disconnected`);
res.end('Testing liveness test fail');
return;
}
}

// check all bots if they're live
for (const bot of bots) {
const healthCheck = await promiseTimeout(bot.healthCheck(), 1000);
if (!healthCheck) {
logger.error(`Health check failed for bot`);
res.writeHead(503);
res.end(`Bot is not healthy`);
return;
}
}
/* @ts-ignore */
if (!driftClient.connection._rpcWebSocketConnected) {
logger.error(`Connection rpc websocket disconnected`);
res.writeHead(500);
res.end(`Connection rpc websocket disconnected`);
return;
}

// liveness check passed
res.writeHead(200);
res.end('OK');
} else {
res.writeHead(404);
res.end('Not found');
// check all bots if they're live
for (const bot of bots) {
const healthCheck = await promiseTimeout(bot.healthCheck(), 1000);
if (!healthCheck) {
logger.error(`Health check failed for bot`);
res.writeHead(503);
res.end(`Bot is not healthy`);
return;
}
}
})
.listen(healthCheckPort);
logger.info(`Health check server listening on port ${healthCheckPort}`);

// liveness check passed
res.writeHead(200);
res.end('OK');
} else {
res.writeHead(404);
res.end('Not found');
}
};

let healthCheckPortToUse = Number(healthCheckPort);
while (await isPortInUse(healthCheckPortToUse)) {
healthCheckPortToUse++;
}
http.createServer(createServerCallback).listen(healthCheckPortToUse);
logger.info(`Server listening on port ${healthCheckPortToUse}`);
};

recursiveTryCatch(() => runBot());
Expand All @@ -476,3 +496,26 @@ async function recursiveTryCatch(f: () => void) {
await recursiveTryCatch(f);
}
}

function isPortInUse(port: number, host = '127.0.0.1'): Promise<boolean> {
return new Promise((resolve) => {
const server = net.createServer();

server.once('error', (err) => {
if (
err.name?.includes('EADDRINUSE') ||
err.message?.includes('EADDRINUSE')
) {
resolve(true);
} else {
resolve(false);
}
});

server.once('listening', () => {
server.close(() => resolve(false));
});

server.listen(port, host);
});
}
Loading