Skip to content

Commit

Permalink
feat(error code when shutdown fails): set exit flag to non-zero when …
Browse files Browse the repository at this point in the history
…shutdown times out (#17676)

<!--
**Your PR title must conform to the conventional commit spec!**

  <type>(<scope>)!: <description>

  * `type` = chore, enhancement, feat, fix, docs
  * `!` = OPTIONAL: signals a breaking change
* `scope` = Optional when `type` is "chore" or "docs", available scopes
https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20
  * `description` = short description of the change

Examples:

  * enhancement(file source): Add `sort` option to sort discovered files
  * feat(new source): Initial `statsd` source
  * fix(file source): Fix a bug discovering new files
  * chore(external docs): Clarify `batch_size` option
-->

Issue: [Exit non-zero when Vector fails to gracefully
shutdown](#13731)
  • Loading branch information
DominicBurkart authored Jun 21, 2023
1 parent 12bc4a7 commit e6e18c8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
43 changes: 37 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(missing_docs)]
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, time::Duration};
use std::{
collections::HashMap, num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand Down Expand Up @@ -32,6 +34,11 @@ use crate::{
trace,
};

#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::ExitStatusExt;

pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new();

use crate::internal_events::{VectorQuit, VectorStarted, VectorStopped};
Expand Down Expand Up @@ -145,10 +152,10 @@ impl ApplicationConfig {
}

impl Application {
pub fn run() {
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));

runtime.block_on(app.run());
runtime.block_on(app.run())
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Expand Down Expand Up @@ -242,7 +249,7 @@ pub struct StartedApplication {
}

impl StartedApplication {
pub async fn run(self) {
pub async fn run(self) -> ExitStatus {
self.main().await.shutdown().await
}

Expand Down Expand Up @@ -317,7 +324,7 @@ pub struct FinishedApplication {
}

impl FinishedApplication {
pub async fn shutdown(self) {
pub async fn shutdown(self) -> ExitStatus {
let FinishedApplication {
signal,
mut signal_rx,
Expand All @@ -335,18 +342,42 @@ impl FinishedApplication {
SignalTo::Shutdown => {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => (), // Graceful shutdown finished
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
// Dropping the shutdown future will immediately shut the server down
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}

}
}
SignalTo::Quit => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
drop(topology_controller);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
_ => unreachable!(),
}
Expand Down
13 changes: 9 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
extern crate vector;
use vector::app::Application;

use std::process::ExitCode;

#[cfg(unix)]
fn main() {
fn main() -> ExitCode {
#[cfg(feature = "allocation-tracing")]
{
use crate::vector::internal_telemetry::allocations::{
Expand Down Expand Up @@ -35,14 +37,17 @@ fn main() {
}
}

Application::run();
let exit_code = Application::run().code().unwrap_or(exitcode::UNAVAILABLE) as u8;
ExitCode::from(exit_code)
}

#[cfg(windows)]
pub fn main() {
pub fn main() -> ExitCode {
// We need to be able to run vector in User Interactive mode. We first try
// to run vector as a service. If we fail, we consider that we are in
// interactive mode and then fallback to console mode. See
// https://docs.microsoft.com/en-us/dotnet/api/system.environment.userinteractive?redirectedfrom=MSDN&view=netcore-3.1#System_Environment_UserInteractive
vector::vector_windows::run().unwrap_or_else(|_| Application::run());
let exit_code = vector::vector_windows::run()
.unwrap_or_else(|_| Application::run().code().unwrap_or(exitcode::UNAVAILABLE));
ExitCode::from(exit_code as u8)
}
17 changes: 13 additions & 4 deletions src/vector_windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const SERVICE_NAME: &str = "vector";
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;

const NO_ERROR: u32 = 0;
const ERROR: u32 = 121;

pub mod service_control {
use std::{ffi::OsString, fmt, fmt::Formatter, time::Duration};
Expand Down Expand Up @@ -361,8 +362,9 @@ fn win_main(arguments: Vec<OsString>) {
if let Err(_e) = run_service(arguments) {}
}

pub fn run() -> Result<()> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
pub fn run() -> Result<i32> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main).map(|()| 0_i32)
// Always returns 0 exit code as errors are handled by the service dispatcher.
}

fn run_service(_arguments: Vec<OsString>) -> Result<()> {
Expand Down Expand Up @@ -398,14 +400,21 @@ fn run_service(_arguments: Vec<OsString>) -> Result<()> {
process_id: None,
})?;

runtime.block_on(app.run());
let program_completion_status = runtime.block_on(app.run());

// Tell the system that service has stopped.
status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(NO_ERROR),
exit_code: {
if program_completion_status.success() {
ServiceExitCode::Win32(NO_ERROR)
} else {
// we didn't gracefully shutdown within grace period.
ServiceExitCode::Win32(ERROR)
}
},
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
Expand Down

0 comments on commit e6e18c8

Please sign in to comment.