Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1682638 - Multiple fixes: Don't let the queue run, handle empty databases #1398

Merged
merged 10 commits into from
Dec 17, 2020
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

[Full changelog](https://github.com/mozilla/glean/compare/v33.9.0...main)

* Rust
* BUGFIX: Don't panic on shutdown and avoid running tasks if uninitialized ([#1398](https://github.com/mozilla/glean/pull/1398)).
* BUGFIX: Don't fail on empty database files ([#1398](https://github.com/mozilla/glean/pull/1398)).

# v33.9.0 (2020-12-15)

[Full changelog](https://github.com/mozilla/glean/compare/v33.8.0...v33.9.0)
Expand Down
1 change: 0 additions & 1 deletion glean-core/rlb/src/common_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub(crate) fn new_glean(
},
};

crate::shutdown();
crate::test_reset_glean(cfg, ClientInfoMetrics::unknown(), clear_stores);
dir
}
27 changes: 20 additions & 7 deletions glean-core/rlb/src/dispatcher/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ pub fn flush_init() -> Result<(), DispatchError> {
guard().flush_init()
}

/// Shuts down the dispatch queue.
///
/// This will initiate a shutdown of the worker thread
/// and no new tasks will be processed after this.
pub fn shutdown() -> Result<(), DispatchError> {
guard().shutdown()?;

fn join_dispatcher_thread() -> Result<(), DispatchError> {
// After we issue the shutdown command, make sure to wait for the
// worker thread to join.
let mut lock = GLOBAL_DISPATCHER.write().unwrap();
Expand All @@ -78,6 +72,25 @@ pub fn shutdown() -> Result<(), DispatchError> {
Ok(())
}

/// Kill the blocked dispatcher without processing the queue.
///
/// This will immediately shutdown the worker thread
/// and no other tasks will be processed.
/// This only has an effect when the queue is still blocked.
pub fn kill() -> Result<(), DispatchError> {
guard().kill()?;
join_dispatcher_thread()
}

/// Shuts down the dispatch queue.
///
/// This will initiate a shutdown of the worker thread
/// and no new tasks will be processed after this.
pub fn shutdown() -> Result<(), DispatchError> {
guard().shutdown()?;
join_dispatcher_thread()
}

/// TEST ONLY FUNCTION.
/// Resets the Glean state and triggers init again.
pub(crate) fn reset_dispatcher() {
Expand Down
45 changes: 37 additions & 8 deletions glean-core/rlb/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ pub use global::*;

mod global;

/// Command received while blocked from further work.
enum Blocked {
/// Shutdown immediately without processing the queue.
Shutdown,
/// Unblock and continue with work as normal.
Continue,
}

/// The command a worker should execute.
enum Command {
/// A task is a user-defined function to run.
Expand Down Expand Up @@ -100,7 +108,7 @@ struct DispatchGuard {
queue_preinit: Arc<AtomicBool>,

/// Used to unblock the worker thread initially.
block_sender: Sender<()>,
block_sender: Sender<Blocked>,

/// Sender for the preinit queue.
preinit_sender: Sender<Command>,
Expand Down Expand Up @@ -146,6 +154,18 @@ impl DispatchGuard {
.expect("Failed to receive message on single-use channel");
}

fn kill(&mut self) -> Result<(), DispatchError> {
// We immediately stop queueing in the pre-init buffer.
let old_val = self.queue_preinit.swap(false, Ordering::SeqCst);
if !old_val {
return Err(DispatchError::AlreadyFlushed);
}

// Unblock the worker thread exactly once.
self.block_sender.send(Blocked::Shutdown)?;
Ok(())
}

fn flush_init(&mut self) -> Result<(), DispatchError> {
// We immediately stop queueing in the pre-init buffer.
let old_val = self.queue_preinit.swap(false, Ordering::SeqCst);
Expand All @@ -154,7 +174,7 @@ impl DispatchGuard {
}

// Unblock the worker thread exactly once.
self.block_sender.send(())?;
self.block_sender.send(Blocked::Continue)?;

// Single-use channel to communicate with the worker thread.
let (swap_sender, swap_receiver) = bounded(0);
Expand Down Expand Up @@ -193,7 +213,7 @@ impl Dispatcher {
///
/// [`flush_init`]: #method.flush_init
pub fn new(max_queue_size: usize) -> Self {
let (block_sender, block_receiver) = bounded(0);
let (block_sender, block_receiver) = bounded(1);
let (preinit_sender, preinit_receiver) = bounded(max_queue_size);
let (sender, mut unbounded_receiver) = unbounded();

Expand All @@ -202,11 +222,20 @@ impl Dispatcher {
let worker = thread::Builder::new()
.name("glean.dispatcher".into())
.spawn(move || {
if block_receiver.recv().is_err() {
// The other side was disconnected.
// There's nothing the worker thread can do.
log::error!("The task producer was disconnected. Worker thread will exit.");
return;
match block_receiver.recv() {
Err(_) => {
// The other side was disconnected.
// There's nothing the worker thread can do.
log::error!("The task producer was disconnected. Worker thread will exit.");
return;
}
Ok(Blocked::Shutdown) => {
// The other side wants us to stop immediately
return;
}
Ok(Blocked::Continue) => {
// Queue is unblocked, processing continues as normal.
}
}

let mut receiver = preinit_receiver;
Expand Down
6 changes: 5 additions & 1 deletion glean-core/rlb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,12 @@ pub fn initialize(cfg: Configuration, client_info: ClientInfoMetrics) {
/// This currently only attempts to shut down the
/// internal dispatcher.
pub fn shutdown() {
if !was_initialize_called() {
if global_glean().is_none() {
log::warn!("Shutdown called before Glean is initialized");
if let Err(e) = dispatcher::kill() {
log::error!("Can't kill dispatcher thread: {:?}", e);
}

return;
}

Expand Down
50 changes: 50 additions & 0 deletions glean-core/rlb/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

// #[allow(dead_code)] is required on this module as a workaround for
// https://github.com/rust-lang/rust/issues/46379
#![allow(dead_code)]

use std::{panic, process};

use glean::{ClientInfoMetrics, Configuration};

/// Initialize the env logger for a test environment.
///
/// When testing we want all logs to go to stdout/stderr by default.
pub fn enable_test_logging() {
badboy marked this conversation as resolved.
Show resolved Hide resolved
let _ = env_logger::builder().is_test(true).try_init();
}

/// Install a panic handler that exits the whole process when a panic occurs.
///
/// This causes the process to exit even if a thread panics.
/// This is similar to the `panic=abort` configuration, but works in the default configuration
/// (as used by `cargo test`).
fn install_panic_handler() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
}

/// Create a new instance of Glean.
pub fn initialize(cfg: Configuration) {
// Ensure panics in threads, such as the init thread or the dispatcher, cause the process to
// exit.
//
// Otherwise in case of a panic in a thread the integration test will just hang.
// CI will terminate it after a timeout, but why stick around if we know nothing is happening?
install_panic_handler();

// Use some default values to make our life easier a bit.
let client_info = ClientInfoMetrics {
app_build: "1.0.0".to_string(),
app_display_version: "1.0.0".to_string(),
};

glean::initialize(cfg, client_info);
}
84 changes: 84 additions & 0 deletions glean-core/rlb/tests/init_fails.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! This integration test should model how the RLB is used when embedded in another Rust application
//! (e.g. FOG/Firefox Desktop).
//!
//! We write a single test scenario per file to avoid any state keeping across runs
//! (different files run as different processes).

mod common;

use std::{thread, time::Duration};

use glean::Configuration;

/// Some user metrics.
mod metrics {
use glean::private::*;
use glean::{Lifetime, TimeUnit};
use glean_core::CommonMetricData;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static initialization: Lazy<TimespanMetric> = Lazy::new(|| {
TimespanMetric::new(
CommonMetricData {
name: "initialization".into(),
category: "sample".into(),
send_in_pings: vec!["validation".into()],
lifetime: Lifetime::Ping,
disabled: false,
..Default::default()
},
TimeUnit::Nanosecond,
)
});
}

mod pings {
use glean::private::PingType;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static validation: Lazy<PingType> =
Lazy::new(|| glean::private::PingType::new("validation", true, true, vec![]));
}

/// Test scenario: Glean initialization fails.
///
/// App tries to initialize Glean, but that somehow fails.
#[test]
fn init_fails() {
common::enable_test_logging();

metrics::initialization.start();

// Create a custom configuration to use a validating uploader.
let dir = tempfile::tempdir().unwrap();
let tmpname = dir.path().display().to_string();

let cfg = Configuration {
data_path: tmpname,
application_id: "".into(), // An empty application ID is invalid.
upload_enabled: true,
max_events: None,
delay_ping_lifetime_io: false,
channel: Some("testing".into()),
server_endpoint: Some("invalid-test-host".into()),
uploader: None,
};
common::initialize(cfg);

metrics::initialization.stop();

pings::validation.submit(None);

// We don't test for data here, as that would block on the dispatcher.

// Give it a short amount of time to actually finish initialization.
thread::sleep(Duration::from_millis(500));

glean::shutdown();
}
66 changes: 66 additions & 0 deletions glean-core/rlb/tests/never_init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! This integration test should model how the RLB is used when embedded in another Rust application
//! (e.g. FOG/Firefox Desktop).
//!
//! We write a single test scenario per file to avoid any state keeping across runs
//! (different files run as different processes).

mod common;

/// Some user metrics.
mod metrics {
use glean::private::*;
use glean::{Lifetime, TimeUnit};
use glean_core::CommonMetricData;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static initialization: Lazy<TimespanMetric> = Lazy::new(|| {
TimespanMetric::new(
CommonMetricData {
name: "initialization".into(),
category: "sample".into(),
send_in_pings: vec!["validation".into()],
lifetime: Lifetime::Ping,
disabled: false,
..Default::default()
},
TimeUnit::Nanosecond,
)
});
}

mod pings {
use glean::private::PingType;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static validation: Lazy<PingType> =
Lazy::new(|| glean::private::PingType::new("validation", true, true, vec![]));
}

/// Test scenario: Glean is never initialized.
///
/// Glean is never initialized.
/// Some data is recorded early on.
/// And later the whole process is shutdown.
#[test]
fn never_initialize() {
common::enable_test_logging();

metrics::initialization.start();

// NOT calling `initialize` here.
// In apps this might happen for several reasons:
// 1. Process doesn't run long enough for Glean to be initialized.
// 2. Getting some early data used for initialize fails

pings::validation.submit(None);

// We can't test for data either, as that would panic because init was never called.

glean::shutdown();
}
Loading