From c072ef82f32ca37ce0b96059a7cf1b63d0e2fcfe Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 8 Aug 2024 16:22:54 -0700 Subject: [PATCH] Fix race condition that could stall scheduling if operator panicked during setup (#712) --- Cargo.lock | 17 ++++++++-------- crates/arroyo-connectors/Cargo.toml | 2 +- crates/arroyo-connectors/src/redis/mod.rs | 5 ++--- .../src/redis/operator/sink.rs | 10 +++------- crates/arroyo-controller/src/lib.rs | 10 +++++++++- .../src/states/scheduling.rs | 7 ++++++- crates/arroyo-operator/src/context.rs | 20 +++++++++++++++++++ crates/arroyo-operator/src/operator.rs | 19 ++++++++++++++++++ crates/arroyo-worker/src/engine.rs | 8 -------- 9 files changed, 69 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d85f9e432..61746ef5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7093,9 +7093,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.24.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" dependencies = [ "arc-swap", "async-trait", @@ -7106,19 +7106,20 @@ dependencies = [ "futures-util", "itoa", "log", + "num-bigint", "percent-encoding", "pin-project-lite", "rand 0.8.5", - "rustls 0.21.12", - "rustls-native-certs 0.6.3", - "rustls-pemfile 1.0.4", - "rustls-webpki 0.101.7", + "rustls 0.23.12", + "rustls-native-certs 0.7.1", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "ryu", "sha1_smol", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tokio-retry", - "tokio-rustls 0.24.1", + "tokio-rustls 0.26.0", "tokio-util", "url", ] diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index 960832d46..e022ee4dc 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -61,7 +61,7 @@ tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } reqwest = { version = "0.11.20", features = ["stream"] } # Redis -redis = { version = "0.24.0", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] } +redis = { version = "0.26.0", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] } # Fluvio fluvio = {version = "0.23", features = ["openssl"]} diff --git a/crates/arroyo-connectors/src/redis/mod.rs b/crates/arroyo-connectors/src/redis/mod.rs index 310d5ecf3..44b333759 100644 --- a/crates/arroyo-connectors/src/redis/mod.rs +++ b/crates/arroyo-connectors/src/redis/mod.rs @@ -1,7 +1,5 @@ mod operator; -use std::collections::HashMap; - use anyhow::{anyhow, bail}; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, Connector}; @@ -11,6 +9,7 @@ use redis::aio::ConnectionManager; use redis::cluster::ClusterClient; use redis::{Client, ConnectionInfo, IntoConnectionInfo}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use tokio::sync::oneshot::Receiver; use typify::import_types; @@ -113,7 +112,7 @@ async fn test_inner( match &client { RedisClient::Standard(client) => { let mut connection = client - .get_async_connection() + .get_multiplexed_async_connection() .await .map_err(|e| anyhow!("Failed to connect to to Redis Cluster: {:?}", e))?; tx.send(TestSourceMessage::info( diff --git a/crates/arroyo-connectors/src/redis/operator/sink.rs b/crates/arroyo-connectors/src/redis/operator/sink.rs index 4736b95e5..5fe10989a 100644 --- a/crates/arroyo-connectors/src/redis/operator/sink.rs +++ b/crates/arroyo-connectors/src/redis/operator/sink.rs @@ -197,11 +197,7 @@ impl RedisWriter { } while attempts < 20 { - match self - .pipeline - .query_async::<_, ()>(&mut self.connection) - .await - { + match self.pipeline.query_async::<()>(&mut self.connection).await { Ok(_) => { self.pipeline.clear(); self.size_estimate = 0; @@ -219,7 +215,7 @@ impl RedisWriter { } tokio::time::sleep(Duration::from_millis((50 * (1 << attempts)).min(5_000))).await; - attempts -= 1; + attempts += 1; } panic!("Exhausted retries writing to Redis"); @@ -319,7 +315,7 @@ impl ArrowOperator for RedisSinkFunc { } tokio::time::sleep(Duration::from_millis((50 * (1 << attempts)).min(5_000))).await; - attempts -= 1; + attempts += 1; } panic!("Failed to establish connection to redis after 20 retries"); diff --git a/crates/arroyo-controller/src/lib.rs b/crates/arroyo-controller/src/lib.rs index 80a3d28a1..e8c709846 100644 --- a/crates/arroyo-controller/src/lib.rs +++ b/crates/arroyo-controller/src/lib.rs @@ -438,8 +438,16 @@ impl ControllerGrpc for ControllerServer { &self, request: Request, ) -> Result, Status> { - info!("Got worker error."); let req = request.into_inner(); + + info!( + job_id = req.job_id, + operator_id = req.operator_id, + message = "operator error", + error_message = req.message, + error_details = req.details + ); + let client = self.db.client().await.unwrap(); match queries::controller_queries::execute_create_job_log_message( &client, diff --git a/crates/arroyo-controller/src/states/scheduling.rs b/crates/arroyo-controller/src/states/scheduling.rs index 4c784a8ab..5b7727a8b 100644 --- a/crates/arroyo-controller/src/states/scheduling.rs +++ b/crates/arroyo-controller/src/states/scheduling.rs @@ -24,7 +24,8 @@ use arroyo_state::{ use crate::job_controller::job_metrics::JobMetrics; use crate::{ - job_controller::JobController, queries::controller_queries, states::stop_if_desired_non_running, + job_controller::JobController, queries::controller_queries, + states::stop_if_desired_non_running, RunningMessage, }; use crate::{schedulers::SchedulerError, JobMessage}; use crate::{ @@ -521,6 +522,10 @@ impl State for Scheduling { }) => { started_tasks.insert((operator_id, operator_subtask)); } + Some(JobMessage::RunningMessage(RunningMessage::TaskFailed {worker_id, operator_id, subtask_index, reason})) => { + return Err(ctx.retryable(self, "task failed on startup", + anyhow!("task failed on job startup on {:?}: {}:{}: {}", worker_id, operator_id, subtask_index, reason), 10)); + } Some(JobMessage::ConfigUpdate(c)) => { stop_if_desired_non_running!(self, &c); } diff --git a/crates/arroyo-operator/src/context.rs b/crates/arroyo-operator/src/context.rs index baf242467..4a57107e8 100644 --- a/crates/arroyo-operator/src/context.rs +++ b/crates/arroyo-operator/src/context.rs @@ -884,4 +884,24 @@ mod tests { assert_eq!(tx.capacity(), 8); } + + #[tokio::test] + async fn test_panic_propagation() { + let (tx, mut rx) = batch_bounded(8); + + let msg = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])), + vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4]))], + ) + .unwrap(); + + tokio::task::spawn(async move { + let f = rx.recv(); + panic!("at the disco"); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert!(tx.send(ArrowMessage::Data(msg)).await.is_err()); + } } diff --git a/crates/arroyo-operator/src/operator.rs b/crates/arroyo-operator/src/operator.rs index 39316a623..179041abf 100644 --- a/crates/arroyo-operator/src/operator.rs +++ b/crates/arroyo-operator/src/operator.rs @@ -89,6 +89,16 @@ impl OperatorNode { "Running source {}-{}", ctx.task_info.operator_name, ctx.task_info.task_index ); + + ctx.control_tx + .send(ControlResp::TaskStarted { + operator_id: ctx.task_info.operator_id.clone(), + task_index: ctx.task_info.task_index, + start_time: SystemTime::now(), + }) + .await + .unwrap(); + let result = s.run(ctx).await; s.on_close(ctx).await; @@ -194,6 +204,15 @@ async fn operator_run_behavior( ctx.task_info.operator_name, ctx.task_info.task_index ); + ctx.control_tx + .send(ControlResp::TaskStarted { + operator_id: ctx.task_info.operator_id.clone(), + task_index: ctx.task_info.task_index, + start_time: SystemTime::now(), + }) + .await + .unwrap(); + let task_info = ctx.task_info.clone(); let name = this.name(); let mut counter = CheckpointCounter::new(in_qs.len()); diff --git a/crates/arroyo-worker/src/engine.rs b/crates/arroyo-worker/src/engine.rs index 8ff67ad44..99a86e4e3 100644 --- a/crates/arroyo-worker/src/engine.rs +++ b/crates/arroyo-worker/src/engine.rs @@ -738,14 +738,6 @@ impl Engine { let send_copy = control_tx.clone(); tokio::spawn(async move { - send_copy - .send(ControlResp::TaskStarted { - operator_id: operator_id.clone(), - task_index, - start_time: SystemTime::now(), - }) - .await - .unwrap(); if let Err(error) = join_task.await { send_copy .send(ControlResp::TaskFailed {