Skip to content

Commit

Permalink
Improve shutdown process and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jun 18, 2024
1 parent 58d39aa commit db42f30
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl RunningJobModel {
};
}
} else {
warn!(
debug!(
message = "Received checkpoint event but not checkpointing",
job_id = *self.job_id,
event = format!("{:?}", c)
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-controller/src/schedulers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl Scheduler for ProcessScheduler {
}
let mut child = command
.arg("worker")
.env("RUST_LOG", "info")
.env("ARROYO__ADMIN__HTTP_PORT", "0")
.env("ARROYO__WORKER__TASK_SLOTS", format!("{}", slots_here))
.env("ARROYO__WORKER__ID", format!("{}", worker_id)) // start at 100 to make same length
Expand Down
9 changes: 7 additions & 2 deletions crates/arroyo-controller/src/states/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use cornucopia_async::DatabaseSource;
use crate::job_controller::JobController;
use crate::queries::controller_queries;
use crate::types::public::StopMode;
use crate::{schedulers::Scheduler, JobConfig, JobMessage, JobStatus};
use crate::{schedulers::Scheduler, JobConfig, JobMessage, JobStatus, RunningMessage};
use arroyo_datastream::logical::LogicalProgram;
use arroyo_rpc::config::config;
use arroyo_server_common::shutdown::ShutdownGuard;
Expand Down Expand Up @@ -367,7 +367,12 @@ pub struct JobContext<'a> {

impl<'a> JobContext<'a> {
pub fn handle(&mut self, msg: JobMessage) -> Result<(), StateError> {
warn!("unhandled job message {:?}", msg);
if !matches!(
msg,
JobMessage::RunningMessage(RunningMessage::WorkerHeartbeat { .. })
) {
warn!("unhandled job message {:?}", msg);
}
Ok(())
}

Expand Down
16 changes: 11 additions & 5 deletions crates/arroyo-server-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,21 @@ pub const VERSION: &str = "0.11.0-dev";

static CLUSTER_ID: OnceCell<String> = OnceCell::new();

pub fn init_logging(_name: &str) -> WorkerGuard {
pub fn init_logging(name: &str) -> WorkerGuard {
init_logging_with_filter(
name,
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
}

pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard {
if let Err(e) = LogTracer::init() {
eprintln!("Failed to initialize log tracer {:?}", e);
}

let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
.add_directive("refinery_core=warn".parse().unwrap());
let filter = filter.add_directive("refinery_core=warn".parse().unwrap());

let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr());

Expand Down
16 changes: 11 additions & 5 deletions crates/arroyo-server-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,25 @@ impl Shutdown {

// call the handler if there is one
if let Some(handler) = self.handler {
info!("Running shutdown handler");
handler.shutdown().await;
tokio::spawn(async move {
info!("Running shutdown handler");
handler.shutdown().await;
info!("Finished shutdown handler");
self.guard.token.cancel();
drop(self.guard);
});
} else {
self.guard.token.cancel();
drop(self.guard)
}

self.guard.token.cancel();
}
_ = self.guard.token.cancelled() => {
// Or some part of the system shut down
info!("{} shutting down", self.name);
drop(self.guard)
}
}

drop(self.guard);
select! {
_ = self.rx.recv() => {
// everything has shutdown
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio = { version = "1", features = ["full"] }
serde = "1"
serde_json = "1"
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = [ "env-filter", "json" ]}

postgres-types = { version = "*", features = ["derive"] }
tokio-postgres = { version = "*", features = ["with-serde_json-1", "with-time-0_3", "with-uuid-1"] }
Expand Down
47 changes: 35 additions & 12 deletions crates/arroyo/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,47 @@ use crate::{db_source, RunArgs};
use anyhow::bail;
use arroyo_openapi::types::{Pipeline, PipelinePatch, PipelinePost, StopType, ValidateQueryPost};
use arroyo_openapi::Client;
use arroyo_rpc::config;
use arroyo_rpc::config::{config, DatabaseType, DefaultSink, Scheduler};
use arroyo_rpc::{config, retry};
use arroyo_server_common::log_event;
use arroyo_server_common::shutdown::{Shutdown, ShutdownHandler, SignalBehavior};
use async_trait::async_trait;
use rand::random;
use serde_json::json;
use std::env;
use std::env::set_var;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::timeout;
use tracing::level_filters::LevelFilter;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;

async fn get_state(client: &Client, pipeline_id: &str) -> String {
let jobs = retry!(
client.get_pipeline_jobs().id(pipeline_id).send().await,
10,
Duration::from_millis(100),
Duration::from_secs(2),
|e| { warn!("Failed to get job state from API: {}", e) }
)
.unwrap()
.into_inner();

jobs.data.into_iter().next().unwrap().state
}

async fn wait_for_state(
client: &Client,
pipeline_id: &str,
expected_states: &[&str],
) -> anyhow::Result<()> {
let mut last_state = "None".to_string();
let mut last_state: String = get_state(client, pipeline_id).await;
while !expected_states.contains(&last_state.as_str()) {
let jobs = client
.get_pipeline_jobs()
.id(pipeline_id)
.send()
.await
.unwrap();
let job = jobs.data.first().unwrap();

let state = job.state.clone();
let state = get_state(client, pipeline_id).await;
if last_state != state {
info!("Job transitioned to {}", state);
last_state = state;
Expand Down Expand Up @@ -195,7 +205,20 @@ async fn run_pipeline(
}

pub async fn run(args: RunArgs) {
let _guard = arroyo_server_common::init_logging("pipeline");
let _guard = arroyo_server_common::init_logging_with_filter(
"pipeline",
if !env::var("RUST_LOG").is_ok() {
set_var("RUST_LOG", "WARN");
EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.from_env_lossy()
.add_directive("arroyo::run=info".parse().unwrap())
} else {
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
},
);

let query = match config().query.clone() {
Some(query) => query,
Expand Down

0 comments on commit db42f30

Please sign in to comment.