Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added retrying on indexer failures #636

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,54 @@ impl Cli {
}
}

fn spinup_indexer(
options: &indexer::Options,
mpc_contract_id: &AccountId,
account_id: &AccountId,
sign_queue: &Arc<RwLock<SignQueue>>,
gcp: &GcpService,
) -> std::thread::JoinHandle<()> {
let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();
std::thread::spawn(move || {
// If indexer fails for whatever reason, let's spin it back up:
let mut i = 0;
loop {
if i > 0 {
tracing::info!("restarting indexer after failure: restart count={i}");
}

let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();

// TODO/NOTE: currently indexer does not have any interrupt handlers and will never yield back
// as successful. We can add interrupt handlers in the future but this is not important right
// now since we managing nodes through integration tests that can kill it or through docker.
let Err(err) = indexer::run(options, mpc_contract_id, account_id, sign_queue, gcp)
else {
break;
};
tracing::error!(%err, "indexer failed");

let delay = if i <= 7 {
2u64.pow(i)
} else {
// Max out at 2 minutes
120
};

std::thread::sleep(std::time::Duration::from_secs(delay));
i += 1;
}
})
}

pub fn run(cmd: Cli) -> anyhow::Result<()> {
// Install global collector configured based on RUST_LOG env var.
let mut subscriber = tracing_subscriber::fmt()
Expand Down Expand Up @@ -201,15 +249,13 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
.block_on(async {
let (sender, receiver) = mpsc::channel(16384);
let gcp_service = GcpService::init(&account_id, &storage_options).await?;

let join_handle = std::thread::spawn({
let options = indexer_options.clone();
let mpc_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp_service.clone();
move || indexer::run(options, mpc_id, account_id, sign_queue, gcp).unwrap()
});
let indexer_handle = spinup_indexer(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
);

let key_storage = storage::secret_storage::init(
Some(&gcp_service),
Expand Down Expand Up @@ -276,7 +322,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
web_handle.await??;
tracing::debug!("spinning down");

join_handle.join().unwrap();
indexer_handle.join().unwrap();
anyhow::Ok(())
})?;
}
Expand Down
Loading