Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buck2_events: refactor scribe to remote sink #686

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/buck2/bin/buck2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn init_logging(_fb: FacebookInit) -> anyhow::Result<Arc<dyn LogConfigurationRel
#[cfg(fbcode_build)]
{
use buck2_event_log::should_upload_log;
use buck2_events::sink::scribe;
use buck2_events::sink::remote;
use gflags::GflagValue;

// There are two sources of log spew when building buck2 with Buck and linking against fbcode:
Expand All @@ -73,7 +73,7 @@ fn init_logging(_fb: FacebookInit) -> anyhow::Result<Arc<dyn LogConfigurationRel
gflags::set_gflag_value(_fb, "stderrthreshold", GflagValue::U32(5))?;

if !should_upload_log()? {
scribe::disable();
remote::disable();
}
}

Expand Down
8 changes: 4 additions & 4 deletions app/buck2/src/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod imp {
use buck2_core::error::StructuredErrorOptions;
use buck2_data::Location;
use buck2_events::metadata;
use buck2_events::sink::scribe::new_thrift_scribe_sink_if_enabled;
use buck2_events::sink::remote::new_remote_event_sink_if_enabled;
use buck2_events::BuckEvent;
use buck2_util::threads::thread_spawn;
use fbinit::FacebookInit;
Expand Down Expand Up @@ -206,15 +206,15 @@ mod imp {

use buck2_core::facebook_only;
use buck2_data::InstantEvent;
use buck2_events::sink::scribe;
use buck2_events::sink::remote;
use buck2_wrapper_common::invocation_id::TraceId;

facebook_only();
if !scribe::is_enabled() {
if !remote::is_enabled() {
return;
}

let sink = match new_thrift_scribe_sink_if_enabled(
let sink = match new_remote_event_sink_if_enabled(
fb,
/* buffer size */ 100,
/* retry_backoff */ Duration::from_millis(500),
Expand Down
10 changes: 5 additions & 5 deletions app/buck2_client/src/commands/debug/persist_event_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use buck2_data::instant_event::Data;
use buck2_data::InstantEvent;
use buck2_data::PersistEventLogSubprocess;
use buck2_event_log::ttl::manifold_event_log_ttl;
use buck2_events::sink::scribe::new_thrift_scribe_sink_if_enabled;
use buck2_events::sink::scribe::ThriftScribeSink;
use buck2_events::sink::remote::new_remote_event_sink_if_enabled;
use buck2_events::sink::remote::RemoteEventSink;
use buck2_events::BuckEvent;
use buck2_wrapper_common::invocation_id::TraceId;
use tokio::fs::File;
Expand Down Expand Up @@ -334,7 +334,7 @@ fn categorize_error(err: &anyhow::Error) -> &'static str {
}

async fn dispatch_event_to_scribe(
sink: Option<&ThriftScribeSink>,
sink: Option<&RemoteEventSink>,
invocation_id: &TraceId,
result: PersistEventLogSubprocess,
) {
Expand All @@ -354,8 +354,8 @@ async fn dispatch_event_to_scribe(
};
}

fn create_scribe_sink(ctx: &ClientCommandContext) -> anyhow::Result<Option<ThriftScribeSink>> {
new_thrift_scribe_sink_if_enabled(
fn create_scribe_sink(ctx: &ClientCommandContext) -> anyhow::Result<Option<RemoteEventSink>> {
new_remote_event_sink_if_enabled(
ctx.fbinit(),
/* buffer size */ 100,
/* retry_backoff */ Duration::from_millis(500),
Expand Down
14 changes: 7 additions & 7 deletions app/buck2_client/src/commands/rage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use buck2_event_log::file_names::do_find_log_by_trace_id;
use buck2_event_log::file_names::get_local_logs;
use buck2_event_log::read::EventLogPathBuf;
use buck2_event_log::read::EventLogSummary;
use buck2_events::sink::scribe::new_thrift_scribe_sink_if_enabled;
use buck2_events::sink::scribe::ThriftScribeSink;
use buck2_events::sink::remote::new_remote_event_sink_if_enabled;
use buck2_events::sink::remote::RemoteEventSink;
use buck2_events::BuckEvent;
use buck2_util::process::async_background_command;
use buck2_wrapper_common::invocation_id::TraceId;
Expand Down Expand Up @@ -271,7 +271,7 @@ impl RageCommand {

async fn send_to_scuba(
&self,
sink: Option<ThriftScribeSink>,
sink: Option<RemoteEventSink>,
invocation_id: Option<TraceId>,
system_info: RageSection<system_info::SystemInfo>,
daemon_stderr_dump: RageSection<String>,
Expand Down Expand Up @@ -521,7 +521,7 @@ async fn upload_re_logs_impl(
}

async fn dispatch_result_event(
sink: Option<&ThriftScribeSink>,
sink: Option<&RemoteEventSink>,
rage_id: &TraceId,
result: RageResult,
) -> anyhow::Result<()> {
Expand All @@ -531,7 +531,7 @@ async fn dispatch_result_event(
}

async fn dispatch_event_to_scribe(
sink: Option<&ThriftScribeSink>,
sink: Option<&RemoteEventSink>,
trace_id: &TraceId,
event: InstantEvent,
) -> anyhow::Result<()> {
Expand All @@ -554,10 +554,10 @@ async fn dispatch_event_to_scribe(
}

#[allow(unused_variables)] // Conditional compilation
fn create_scribe_sink(ctx: &ClientCommandContext) -> anyhow::Result<Option<ThriftScribeSink>> {
fn create_scribe_sink(ctx: &ClientCommandContext) -> anyhow::Result<Option<RemoteEventSink>> {
// TODO(swgiillespie) scribe_logging is likely the right feature for this, but we should be able to inject a sink
// without using configurations at the call site
new_thrift_scribe_sink_if_enabled(
new_remote_event_sink_if_enabled(
ctx.fbinit(),
/* buffer size */ 100,
/* retry_backoff */ Duration::from_millis(500),
Expand Down
4 changes: 2 additions & 2 deletions app/buck2_client_ctx/src/subscribers/build_graph_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::time::SystemTime;

use async_trait::async_trait;
use buck2_cli_proto::command_result;
use buck2_events::sink::scribe::new_thrift_scribe_sink_if_enabled;
use buck2_events::sink::remote::new_remote_event_sink_if_enabled;
use buck2_wrapper_common::invocation_id::TraceId;
use dupe::Dupe;
use fbinit::FacebookInit;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl BuildGraphStats {

async fn send_events(&self, events: Vec<buck2_events::BuckEvent>) {
if let Ok(Some(sink)) =
new_thrift_scribe_sink_if_enabled(self.fb, 1, Duration::from_millis(100), 2, None)
new_remote_event_sink_if_enabled(self.fb, 1, Duration::from_millis(100), 2, None)
{
tracing::info!("Sending events to Scribe: {:?}", &events);
sink.send_messages_now(events).await;
Expand Down
4 changes: 2 additions & 2 deletions app/buck2_client_ctx/src/subscribers/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use buck2_event_observer::cache_hit_rate::total_cache_hit_rate;
use buck2_event_observer::last_command_execution_kind;
use buck2_event_observer::last_command_execution_kind::LastCommandExecutionKind;
use buck2_events::errors::create_error_report;
use buck2_events::sink::scribe::new_thrift_scribe_sink_if_enabled;
use buck2_events::sink::remote::new_remote_event_sink_if_enabled;
use buck2_events::BuckEvent;
use buck2_util::cleanup_ctx::AsyncCleanupContext;
use buck2_util::sliding_window::SlidingWindow;
Expand Down Expand Up @@ -536,7 +536,7 @@ impl<'a> InvocationRecorder<'a> {
}

if let Ok(Some(scribe_sink)) =
new_thrift_scribe_sink_if_enabled(self.fb, 1, Duration::from_millis(500), 5, None)
new_remote_event_sink_if_enabled(self.fb, 1, Duration::from_millis(500), 5, None)
{
tracing::info!("Recording invocation to Scribe: {:?}", &event);
Some(async move {
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_events/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
//! sink during normal operation.
pub(crate) mod channel;
pub(crate) mod null;
pub mod scribe;
pub mod remote;
pub(crate) mod smart_truncate_event;
pub mod tee;
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
* of this source tree.
*/

//! A Sink for forwarding events directly to Scribe.
//! A Sink for forwarding events directly to Remote service.
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;

use buck2_core::buck2_env;
use fbinit::FacebookInit;

#[cfg(fbcode_build)]
Expand All @@ -21,6 +20,7 @@ mod fbcode {
use std::time::Duration;
use std::time::SystemTime;

use buck2_core::buck2_env;
use buck2_data::InstantEvent;
use buck2_data::Location;
use buck2_data::StructuredError;
Expand All @@ -42,30 +42,30 @@ mod fbcode {
// 50k characters
static TRUNCATED_SCRIBE_MESSAGE_SIZE: usize = 50000;

/// ThriftScribeSink is a ScribeSink backed by the Thrift-based client in the `buck2_scribe_client` crate.
pub struct ThriftScribeSink {
/// RemoteEventSink is a ScribeSink backed by the Thrift-based client in the `buck2_scribe_client` crate.
pub struct RemoteEventSink {
category: String,
client: scribe_client::ScribeClient,
}

impl ThriftScribeSink {
/// Creates a new ThriftScribeSink that forwards messages onto the Thrift-backed Scribe client.
impl RemoteEventSink {
/// Creates a new RemoteEventSink that forwards messages onto the Thrift-backed Scribe client.
pub fn new(
fb: FacebookInit,
category: String,
buffer_size: usize,
retry_backoff: Duration,
retry_attempts: usize,
message_batch_size: Option<usize>,
) -> anyhow::Result<ThriftScribeSink> {
) -> anyhow::Result<RemoteEventSink> {
let client = scribe_client::ScribeClient::new(
fb,
buffer_size,
retry_backoff,
retry_attempts,
message_batch_size,
)?;
Ok(ThriftScribeSink { category, client })
Ok(RemoteEventSink { category, client })
}

// Send this event now, bypassing internal message queue.
Expand Down Expand Up @@ -157,7 +157,7 @@ mod fbcode {
}
}

impl EventSink for ThriftScribeSink {
impl EventSink for RemoteEventSink {
fn send(&self, event: Event) {
match event {
Event::Buck(event) => {
Expand All @@ -171,7 +171,7 @@ mod fbcode {
}
}

impl EventSinkWithStats for ThriftScribeSink {
impl EventSinkWithStats for RemoteEventSink {
fn to_event_sync(self: Arc<Self>) -> Arc<dyn EventSink> {
self as _
}
Expand Down Expand Up @@ -257,6 +257,17 @@ mod fbcode {
}
}
}

pub fn scribe_category() -> anyhow::Result<String> {
const DEFAULT_SCRIBE_CATEGORY: &str = "buck2_events";
// Note that both daemon and client are emitting events, and that changing this variable has
// no effect on the daemon until buckd is restarted but has effect on the client.
Ok(
buck2_env!("BUCK2_SCRIBE_CATEGORY", applicability = internal)?
.unwrap_or(DEFAULT_SCRIBE_CATEGORY)
.to_owned(),
)
}
}

#[cfg(not(fbcode_build))]
Expand All @@ -269,18 +280,18 @@ mod fbcode {
use crate::EventSinkStats;
use crate::EventSinkWithStats;

pub enum ThriftScribeSink {}
pub enum RemoteEventSink {}

impl ThriftScribeSink {
impl RemoteEventSink {
pub async fn send_now(&self, _event: BuckEvent) {}
pub async fn send_messages_now(&self, _events: Vec<BuckEvent>) {}
}

impl EventSink for ThriftScribeSink {
impl EventSink for RemoteEventSink {
fn send(&self, _event: Event) {}
}

impl EventSinkWithStats for ThriftScribeSink {
impl EventSinkWithStats for RemoteEventSink {
fn to_event_sync(self: Arc<Self>) -> Arc<dyn EventSink> {
self as _
}
Expand All @@ -293,16 +304,16 @@ mod fbcode {

pub use fbcode::*;

fn new_thrift_scribe_sink_if_fbcode(
fn new_remote_event_sink_if_fbcode(
fb: FacebookInit,
buffer_size: usize,
retry_backoff: Duration,
retry_attempts: usize,
message_batch_size: Option<usize>,
) -> anyhow::Result<Option<ThriftScribeSink>> {
) -> anyhow::Result<Option<RemoteEventSink>> {
#[cfg(fbcode_build)]
{
Ok(Some(ThriftScribeSink::new(
Ok(Some(RemoteEventSink::new(
fb,
scribe_category()?,
buffer_size,
Expand All @@ -324,15 +335,15 @@ fn new_thrift_scribe_sink_if_fbcode(
}
}

pub fn new_thrift_scribe_sink_if_enabled(
pub fn new_remote_event_sink_if_enabled(
fb: FacebookInit,
buffer_size: usize,
retry_backoff: Duration,
retry_attempts: usize,
message_batch_size: Option<usize>,
) -> anyhow::Result<Option<ThriftScribeSink>> {
) -> anyhow::Result<Option<RemoteEventSink>> {
if is_enabled() {
new_thrift_scribe_sink_if_fbcode(
new_remote_event_sink_if_fbcode(
fb,
buffer_size,
retry_backoff,
Expand All @@ -344,28 +355,17 @@ pub fn new_thrift_scribe_sink_if_enabled(
}
}

/// Whether or not Scribe logging is enabled for this process. It must be explicitly disabled via `disable()`.
static SCRIBE_ENABLED: AtomicBool = AtomicBool::new(true);
/// Whether or not remote event logging is enabled for this process. It must be explicitly disabled via `disable()`.
static REMOTE_EVENT_SINK_ENABLED: AtomicBool = AtomicBool::new(true);

/// Returns whether this process should actually write to Scribe, even if it is fully supported by the platform and
/// Returns whether this process should actually write to remote sink, even if it is fully supported by the platform and
/// binary.
pub fn is_enabled() -> bool {
SCRIBE_ENABLED.load(Ordering::Relaxed)
REMOTE_EVENT_SINK_ENABLED.load(Ordering::Relaxed)
}

/// Disables Scribe logging for this process. Scribe logging must be disabled explicitly on startup, otherwise it is
/// Disables remote event logging for this process. Remote event logging must be disabled explicitly on startup, otherwise it is
/// on by default.
pub fn disable() {
SCRIBE_ENABLED.store(false, Ordering::Relaxed);
}

pub fn scribe_category() -> anyhow::Result<String> {
const DEFAULT_SCRIBE_CATEGORY: &str = "buck2_events";
// Note that both daemon and client are emitting events, and that changing this variable has
// no effect on the daemon until buckd is restarted but has effect on the client.
Ok(
buck2_env!("BUCK2_SCRIBE_CATEGORY", applicability = internal)?
.unwrap_or(DEFAULT_SCRIBE_CATEGORY)
.to_owned(),
)
REMOTE_EVENT_SINK_ENABLED.store(false, Ordering::Relaxed);
}
4 changes: 2 additions & 2 deletions app/buck2_server/src/daemon/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use buck2_core::is_open_source;
use buck2_core::rollout_percentage::RolloutPercentage;
use buck2_core::tag_result;
use buck2_events::dispatch::EventDispatcher;
use buck2_events::sink::scribe;
use buck2_events::sink::remote;
use buck2_events::sink::tee::TeeSink;
use buck2_events::source::ChannelEventSource;
use buck2_events::EventSinkWithStats;
Expand Down Expand Up @@ -652,7 +652,7 @@ impl DaemonState {
message_batch_size: Option<usize>,
) -> anyhow::Result<Option<Arc<dyn EventSinkWithStats>>> {
facebook_only();
scribe::new_thrift_scribe_sink_if_enabled(
remote::new_remote_event_sink_if_enabled(
fb,
buffer_size,
retry_backoff,
Expand Down
Loading