Skip to content

Commit

Permalink
final touches
Browse files Browse the repository at this point in the history
  • Loading branch information
robertohuertasm committed Oct 30, 2024
1 parent 1b32b6c commit c5dc666
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 55 deletions.
5 changes: 4 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
"kind": "bin"
}
},
"args": ["-p", "49159", "-e", "-l"],
"args": ["-p", "49159", "-e"],
"env": {
"RUST_LOG": "trace"
},
"cwd": "${workspaceFolder}"
},
{
Expand Down
146 changes: 99 additions & 47 deletions crates/bins/src/bin/datadog_static_analyzer_server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::Duration;
use std::{env, process, thread};
use thiserror::Error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
use tracing_subscriber::EnvFilter;

use super::state::ServerState;
Expand Down Expand Up @@ -37,20 +38,35 @@ fn get_opts() -> Options {
opts.optflag("e", "enable-shutdown", "enables the shutdown endpoint");
opts.optflag("h", "help", "print this help");
opts.optflag("v", "version", "shows the tool version");
opts.optflag(
opts.optopt(
"l",
"logs",
"Enables logs to a file. Usually /tmp/static-analyzer-server/logs",
"[minutely, hourly, daily]",
);

// TODO (JF): Remove this when releasing 0.3.8
opts.optflag("", "ddsa-runtime", "(deprecated)");
opts
}

fn get_log_dir() -> PathBuf {
let mut log_dir = env::temp_dir();
log_dir.push("static-analysis-server/logs");
log_dir
let path = "static-analysis-server/logs";
#[cfg(unix)]
{
PathBuf::from(format!("/tmp/{path}"))
}
#[cfg(windows)]
{
let mut log_dir = std::env::var("TEMP")
.map(PathBuf::from)
.unwrap_or_else(|_| {
// Fallback in case TEMP is not set
PathBuf::from("C:\\Temp")
});
log_dir.push("static-analysis-server/logs");
log_dir
}
}

#[derive(Debug, Error)]
Expand All @@ -61,11 +77,29 @@ pub enum CliError {
InvalidPort(String),
#[error("Invalid address argument {0:?}.")]
InvalidAddress(String),
#[error("Invalid log argument: {0}. It must be 'minutely', 'hourly' or 'daily'.")]
InvalidLogRolling(String),
}

fn try_to_file_appender(
value: String,
log_dir: PathBuf,
file_name_prefix: String,
) -> Result<RollingFileAppender, CliError> {
match value.to_lowercase().as_ref() {
"minutely" => Ok(tracing_appender::rolling::minutely(
log_dir,
file_name_prefix,
)),
"hourly" => Ok(tracing_appender::rolling::hourly(log_dir, file_name_prefix)),
"daily" => Ok(tracing_appender::rolling::daily(log_dir, file_name_prefix)),
_ => Err(CliError::InvalidLogRolling(value)),
}
}

pub enum RocketPreparation {
ServerInfo {
rocket: Rocket<Build>,
rocket: Box<Rocket<Build>>,
state: ServerState,
tx_shutdown: Sender<Shutdown>,
guard: Option<Arc<WorkerGuard>>,
Expand All @@ -77,11 +111,13 @@ pub enum RocketPreparation {
///
/// # Panics
///
/// This function can panic or end the process in case keep-alive is on and graceful exit doesn't work.
/// It should not happen frequently, but it's one of the possibilities.
/// This function can panic or end the process in the case keep-alive is on and:
/// - graceful exit doesn't work (abort)
/// - the keep-alive channel is disconnected (exit code 70)
/// Although it should not happen frequently, it's certainly a possibility.

Check failure on line 117 in crates/bins/src/bin/datadog_static_analyzer_server/cli.rs

View workflow job for this annotation

GitHub Actions / Clippy

doc list item missing indentation
pub fn prepare_rocket() -> Result<RocketPreparation, CliError> {
let args: Vec<String> = env::args().collect();
let program = args[0].clone();
let program = &args[0];
let opts = get_opts();

let matches = opts.parse(&args[1..])?;
Expand All @@ -92,23 +128,23 @@ pub fn prepare_rocket() -> Result<RocketPreparation, CliError> {
}

if matches.opt_present("h") {
print_usage(&program, &opts);
print_usage(program, &opts);
return Ok(RocketPreparation::NoServerInteraction);
}

// initialize the tracing subscriber here
// we're only interested in the server logs
let guard = if matches.opt_present("l") {
// initialize the tracing subscriber here as we're only interested in the server logs not the other CLI instructions
let guard = if let Some(log_rolling) = matches.opt_str("l") {
// tracing with logs
let log_dir = get_log_dir();
let pid = std::process::id();
let file_appender = tracing_appender::rolling::daily(log_dir, format!("server.{pid}.log"));
let file_appender = try_to_file_appender(
log_rolling,
log_dir,
format!("server.{}.log", std::process::id()),
)?;
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);

// check levels compatibility with from default env
tracing_subscriber::fmt()
// .with_env_filter(EnvFilter::from_default_env())
.with_max_level(tracing::Level::TRACE)
.with_env_filter(EnvFilter::from_default_env())
.json()
.with_writer(non_blocking)
.init();
Expand All @@ -122,27 +158,38 @@ pub fn prepare_rocket() -> Result<RocketPreparation, CliError> {
};

// server state
tracing::debug!("Preparing the server state and rocket configuration");
let mut server_state = ServerState::new(matches.opt_str("s"), matches.opt_present("e"));
let mut rocket_configuration = rocket::config::Config::default();

// Set up the port in rocket configuration if --port is passed
// disable rocket colors and emojis if we're logging to a file as we will be using json format
rocket_configuration.cli_colors = !matches.opt_present("l");

Check failure on line 166 in crates/bins/src/bin/datadog_static_analyzer_server/cli.rs

View workflow job for this annotation

GitHub Actions / Clippy

field assignment outside of initializer for an instance created with Default::default()

// set up the port if present
if let Some(port_str) = matches.opt_str("p") {
rocket_configuration.port = match port_str.parse::<u16>() {
Ok(port) => port,
Err(_e) => return Err(CliError::InvalidPort(port_str)),
}
Err(e) => {
tracing::error!(error=?e, port=port_str, "Error trying to parse the address");
return Err(CliError::InvalidPort(port_str));
}
};
tracing::debug!("Port set to {port_str}");
}

if matches.opt_present("a") {
let addr_opt = matches.opt_str("a");
if let Some(addr) = addr_opt {
rocket_configuration.address = match addr.parse() {
Ok(parsed_addr) => parsed_addr,
Err(_) => return Err(CliError::InvalidAddress(addr)),
};
}
// set up the address if present
if let Some(addr) = matches.opt_str("a") {
rocket_configuration.address = match addr.parse() {
Ok(parsed_addr) => parsed_addr,
Err(e) => {
tracing::error!(error=?e, address=addr, "Error trying to parse the address");
return Err(CliError::InvalidAddress(addr));
}
};
tracing::debug!("Address set to {addr}");
}

// TODO: should this be removed already?
if matches.opt_present("ddsa-runtime") {
println!("[WARNING] the --ddsa-runtime flag is deprecated and will be removed in the next version");
}
Expand All @@ -154,44 +201,49 @@ pub fn prepare_rocket() -> Result<RocketPreparation, CliError> {
// 1. Get the timeout value as parameter
// 2. Start the thread that checks every 5 seconds if we should exit the server
if matches.opt_present("k") {
// TODO: (ROB) review this
// let timeout_sec = match keepalive_timeout.parse::<u128>() {
// Ok(sec) => sec,
// Err(_) => {
// eprintln!("Invalid keep-alive timeout value");
// process::exit(1);
// }
// };

let keepalive_timeout_sec = matches.opt_str("k");

if let Some(keepalive_timeout) = keepalive_timeout_sec {
let timeout_sec = keepalive_timeout.parse::<u128>().unwrap();
let timeout_ms = timeout_sec * 1000;
if let Some(timeout_sec) = matches.opt_str("k").and_then(|k| k.parse::<u128>().ok()) {
tracing::info!("Keep alive is set to {timeout_sec} seconds");
// this will ensure that the keep-alive fairing is added, which is going to be the one
// updating the `last_ping_request_timestamp_ms`.
server_state.is_keepalive_enabled = true;

let timeout_ms = timeout_sec * 1000;
let last_ping_request_timestamp_ms =
server_state.last_ping_request_timestamp_ms.clone();

// TODO: (ROB) handle the case where the thread can die/error
let cloned_guard = guard.clone();

// thread that periodically checks if we should exit the server
thread::spawn(move || {
let shutdown_handle: Shutdown = rx.recv().unwrap();
let Ok(shutdown_handle): Result<Shutdown, _> = rx.recv() else {
tracing::error!("CRITICAL: The channel has disconnected");
// if the channel dies we're going to exit the process with a custom error code.
process::exit(100);
};

tracing::info!("Starting the keep alive loop");

loop {
// get the latest request timestamp and the current one
// remember that the `last_ping_request_timestamp_ms` state will be written by a fairing on every successful request
let latest_timestamp = last_ping_request_timestamp_ms
.try_read()
.map(|x| *x)
.unwrap_or_default();

let current_timestamp = get_current_timestamp_ms();

if latest_timestamp > 0 && current_timestamp > latest_timestamp + timeout_ms {
eprintln!("exiting because of timeout, trying to exit gracefully");
eprintln!("Exiting because of timeout. Trying to exit gracefully");
tracing::info!("Exiting because of timeout. Trying to exit gracefully");
shutdown_handle.notify();
// we give 10 seconds for the process to terminate
// if it does not, we abort
thread::sleep(Duration::from_secs(10));
eprintln!("no graceful exit, aborting the process");
eprintln!("No graceful exit, aborting the process");
tracing::error!("No graceful exit, aborting the process");
// dropping the guard here to flush the pending tracing messages before exiting abruptly
drop(cloned_guard);
process::abort();
}
thread::sleep(Duration::from_secs(5));
Expand All @@ -204,7 +256,7 @@ pub fn prepare_rocket() -> Result<RocketPreparation, CliError> {
let rocket = rocket::custom(rocket_configuration).manage(state);

Ok(RocketPreparation::ServerInfo {
rocket,
rocket: Box::new(rocket),
state: server_state,
tx_shutdown: tx,
guard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl Fairing for KeepAlive {
if state.is_keepalive_enabled {
// mutate the keep alive ms
if let Ok(mut x) = state.last_ping_request_timestamp_ms.try_write() {
tracing::trace!("Updating the last_ping_request_timestamp_ms");
*x = get_current_timestamp_ms();
}
}
Expand Down
12 changes: 5 additions & 7 deletions crates/bins/src/bin/datadog_static_analyzer_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ mod utils;
///
/// This function will exit the process and panic when it finds an error.
pub async fn start() {
println!("Starting!!");
// prepare the rocket based on the cli args.
// NOTE: shared state is already managed by the rocket. No need to use `manage` again.
// we get the state back just in case we want to add a particular fairing based on it.
Expand All @@ -24,9 +23,8 @@ pub async fn start() {
Err(e) => {
eprintln!("Error found: {e}");
match e {
CliError::Parsing(_f) => process::exit(22), // invalid argument code
CliError::InvalidPort(_port) => process::exit(1), // generic error code
CliError::InvalidAddress(_address) => process::exit(1), // generic error code
CliError::Parsing(_) => process::exit(22), // invalid argument code
_ => process::exit(1), // generic error code
}
}
Ok(RocketPreparation::NoServerInteraction) => {
Expand All @@ -39,15 +37,15 @@ pub async fn start() {
guard,
}) => {
// set fairings
rocket = rocket
*rocket = rocket
.attach(fairings::Cors)
.attach(fairings::CustomHeaders)
.attach(fairings::TracingFairing);
if state.is_keepalive_enabled {
rocket = rocket.attach(fairings::KeepAlive);
*rocket = rocket.attach(fairings::KeepAlive);
}
// launch the rocket
if let Err(e) = endpoints::launch_rocket_with_endpoints(rocket, tx_shutdown).await {
if let Err(e) = endpoints::launch_rocket_with_endpoints(*rocket, tx_shutdown).await {
tracing::error!("Something went wrong while trying to ignite the rocket {e:?}");
// flushing the pending logs by dropping the guard before panic
drop(guard);
Expand Down

0 comments on commit c5dc666

Please sign in to comment.