Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: configurable watchdog timeout #93

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import { ApiService } from "../utils/api";
*/
export async function run(options: RunOptions) {
const log = getLogger("commands:run");
const { rpc, deploymentBlock, oneShot, disableApi, apiPort } = options;
const {
rpc,
deploymentBlock,
oneShot,
disableApi,
apiPort,
watchdogTimeout,
} = options;

// Start the API server if it's not disabled
if (!disableApi) {
Expand Down Expand Up @@ -45,7 +52,7 @@ export async function run(options: RunOptions) {

// Run the block watcher after warm up for each chain
const runPromises = chainContexts.map(async (context) => {
return context.warmUp(oneShot);
return context.warmUp(watchdogTimeout, oneShot);
});

// Run all the chain contexts
Expand Down
17 changes: 10 additions & 7 deletions src/domain/chainContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { composableCowContract, DBService, getLogger } from "../utils";
import { MetricsService } from "../utils/metrics";

const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds
const WATCHDOG_KILL_THRESHOLD = 30 * 1000; // 30 seconds

const MULTICALL3 = "0xcA11bde05977b3631167028862bE2a173976CA11";

Expand Down Expand Up @@ -92,10 +91,11 @@ export class ChainContext {
/**
* Warm up the chain watcher by fetching the latest block number and
* checking if the chain is in sync.
* @param watchdogTimeout the timeout for the watchdog
* @param oneShot if true, only warm up the chain watcher and return
* @returns the run promises for what needs to be watched
*/
public async warmUp(oneShot?: boolean) {
public async warmUp(watchdogTimeout: number, oneShot?: boolean) {
const { provider, chainId } = this;
const log = getLogger(`chainContext:warmUp:${chainId}`);
const { lastProcessedBlock } = this.registry;
Expand Down Expand Up @@ -225,19 +225,20 @@ export class ChainContext {
}

// Otherwise, run the block watcher
return await this.runBlockWatcher();
return await this.runBlockWatcher(watchdogTimeout);
}

/**
* Run the block watcher for the chain. As new blocks come in:
* 1. Check if there are any `ConditionalOrderCreated` events, and index these.
* 2. Check if any orders want to create discrete orders.
*/
private async runBlockWatcher() {
private async runBlockWatcher(watchdogTimeout: number) {
const { provider, registry, chainId } = this;
const log = getLogger(`chainContext:runBlockWatcher:${chainId}`);
// Watch for new blocks
log.info("👀 Start block watcher");
log.info(`👀 Start block watcher`);
log.debug(`Watchdog timeout: ${watchdogTimeout} seconds`);
let lastBlockReceived = 0;
let timeLastBlockProcessed = new Date().getTime();
provider.on("block", async (blockNumber: number) => {
Expand Down Expand Up @@ -294,8 +295,10 @@ export class ChainContext {
log.debug(`Time since last block processed: ${timeElapsed}ms`);

// If we haven't received a block in 30 seconds, exit so that the process manager can restart us
if (timeElapsed >= WATCHDOG_KILL_THRESHOLD) {
log.error(`Watchdog timeout`);
if (timeElapsed >= watchdogTimeout * 1000) {
log.error(
`Watchdog timeout (RPC failed, or chain is stuck / not issuing blocks)`
);
await registry.storage.close();
process.exit(1);
}
Expand Down
61 changes: 37 additions & 24 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import "dotenv/config";

import { program, Option } from "@commander-js/extra-typings";
import {
program,
Option,
InvalidArgumentError,
} from "@commander-js/extra-typings";
import { ReplayTxOptions } from "./types";
import { dumpDb, replayBlock, replayTx, run } from "./commands";
import { initLogging } from "./utils";
Expand All @@ -25,10 +29,10 @@ async function main() {
"--deployment-block <deploymentBlock...>",
"Block number at which the contracts were deployed"
)
.option(
"--page-size <pageSize>",
"Number of blocks to fetch per page",
"5000"
.addOption(
new Option("--page-size <pageSize>", "Number of blocks to fetch per page")
.default("5000")
.argParser(parseIntOption)
)
.option("--dry-run", "Do not publish orders to the OrderBook API", false)
.addOption(
Expand All @@ -37,45 +41,46 @@ async function main() {
.default(false)
)
.option("--disable-api", "Disable the REST API", false)
.option("--api-port <apiPort>", "Port for the REST API", "8080")
.addOption(
new Option("--api-port <apiPort>", "Port for the REST API")
.default("8080")
.argParser(parseIntOption)
)
.option("--slack-webhook <slackWebhook>", "Slack webhook URL")
.option("--one-shot", "Run the watchtower once and exit", false)
.addOption(
new Option(
"--watchdog-timeout <watchdogTimeout>",
"Watchdog timeout (in seconds)"
)
.default("30")
.argParser(parseIntOption)
)
.addOption(logLevelOption)
.action((options) => {
const { logLevel } = options;
const [pageSize, apiPort, watchdogTimeout] = [
options.pageSize,
options.apiPort,
options.watchdogTimeout,
].map((value) => Number(value));

initLogging({ logLevel });
const {
rpc,
deploymentBlock: deploymentBlockEnv,
pageSize: pageSizeEnv,
} = options;
const { rpc, deploymentBlock: deploymentBlockEnv } = options;

// Ensure that the deployment blocks are all numbers
const deploymentBlock = deploymentBlockEnv.map((block) => Number(block));
if (deploymentBlock.some((block) => isNaN(block))) {
throw new Error("Deployment blocks must be numbers");
}

// Ensure that pageSize is a number
const pageSize = Number(pageSizeEnv);
if (isNaN(pageSize)) {
throw new Error("Page size must be a number");
}

// Ensure that the port is a number
const apiPort = Number(options.apiPort);
if (isNaN(apiPort)) {
throw new Error("API port must be a number");
}

// Ensure that the RPCs and deployment blocks are the same length
if (rpc.length !== deploymentBlock.length) {
throw new Error("RPC and deployment blocks must be the same length");
}

// Run the watchtower
run({ ...options, deploymentBlock, pageSize, apiPort });
run({ ...options, deploymentBlock, pageSize, apiPort, watchdogTimeout });
});

program
Expand Down Expand Up @@ -134,6 +139,14 @@ async function main() {
await program.parseAsync();
}

function parseIntOption(option: string) {
const parsed = Number(option);
mfw78 marked this conversation as resolved.
Show resolved Hide resolved
if (isNaN(parsed)) {
throw new InvalidArgumentError(`${option} must be a number`);
}
return parsed.toString();
}

main().catch((error) => {
console.error(error);
process.exit(1);
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface RunOptions extends WatchtowerOptions {
oneShot: boolean;
disableApi: boolean;
apiPort: number;
watchdogTimeout: number;
}

export type SingularRunOptions = Omit<RunOptions, "rpc" | "deploymentBlock"> & {
Expand Down
Loading