Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OriginEventPublisher #1497

Merged
merged 1 commit into from
Dec 11, 2024
Merged

Add OriginEventPublisher #1497

merged 1 commit into from
Dec 11, 2024

Conversation

allada
Copy link
Member

@allada allada commented Nov 24, 2024

Adds ability for nativelink to publish events to a store/database that can be used by another service to aggregate and/or visualize what is happening related to requests, users or other related metadata. By default bazel's metadata will be published with every event if present giving the ability to combine with BEP data.


This change is Reviewable

Copy link
Member Author

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+@aaronmondal

Reviewable status: 0 of 1 LGTMs obtained, and 0 of 41 files reviewed, and pending CI: Analyze (javascript-typescript), Analyze (python), Bazel Dev / macos-13, Bazel Dev / ubuntu-24.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Coverage, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, NativeLink.com Cloud / Remote Cache / macos-14, NativeLink.com Cloud / Remote Cache / ubuntu-24.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, Web Platform Deployment / macos-14, Web Platform Deployment / ubuntu-24.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (22.04), integration-tests (22.04), macos-13, pre-commit-checks, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, vale, windows-2022 / stable (waiting on @aaronmondal)

Copy link
Member

@aaronmondal aaronmondal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lmao you wouldn't believe the rabbithole I went into with this one 😆

:lgtm:

Reviewed 41 of 41 files at r1, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and all files reviewed, and 8 discussions need to be resolved


nativelink-proto/genproto/blaze.invocation_policy.pb.rs line 1 at r1 (raw file):

// Copyright 2022 The NativeLink Authors. All rights reserved.

nit: Copyright years are wrong and in various other files.


nativelink-util/src/origin_event.rs line 590 at r1 (raw file):

        })
    }
}

nit: Soo I played around with this way too long and I believe something like this could work as well (assuming the proto changes from previously):

use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};

use futures::Stream;
use nativelink_proto::build::bazel::remote::execution::v2::{
    ActionResult, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
    GetActionResultRequest, GetCapabilitiesRequest, ServerCapabilities,
};
use nativelink_proto::com::github::trace_machina::nativelink::events::operation_event::{
    Data, Phase, Type,
};
use nativelink_proto::com::github::trace_machina::nativelink::events::{
    BatchOperation, OperationEvent, StreamOperation,
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_proto::google::rpc::Status;
use rand::RngCore;
use tokio::sync::mpsc;
use uuid::Uuid;

use crate::make_symbol;
use crate::origin_context::ActiveOriginContext;
use crate::origin_event_middleware::BAZEL_REQUEST_METDATA;

static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();

pub fn get_node_id() -> &'static [u8; 6] {
    NODE_ID.get_or_init(|| {
        let mut rng = rand::thread_rng();
        let mut out = [0; 6];
        rng.fill_bytes(&mut out);
        out
    })
}

type OriginEventCollector = mpsc::Sender<OperationEvent>;
make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);

pub trait IntoEventData {
    fn into_event_data(self) -> Data;
}

macro_rules! impl_into_event_data {
    ($($ty:ident, $variant:ident);* $(;)?) => {
        $(
            impl IntoEventData for $ty {
                fn into_event_data(self) -> Data {
                    Data::$variant(self)
                }
            }
        )*
    }
}

impl_into_event_data! {
    GetCapabilitiesRequest, CapabilitiesRequest;
    ServerCapabilities, CapabilitiesResponse;
    GetActionResultRequest, ActionResultRequest;
    ActionResult, ActionResult;
    FindMissingBlobsRequest, FindMissingRequest;
    FindMissingBlobsResponse, FindMissingResponse;
    BatchOperation, BatchOperation;
    StreamOperation, StreamOperation;
    ExecuteRequest, ExecuteRequest;
    Operation, OperationResponse;
}

pub trait OperationType {
    const TYPE: Type;
}

macro_rules! declare_operation {
    ($($op:ident, $ty:ident);* $(;)?) => {
        $(
            pub struct $op;
            impl OperationType for $op {
                const TYPE: Type = Type::$ty;
            }
        )*
    }
}

declare_operation! {
    UnknownOp, Unknown;
    GetCapabilitiesOp, GetCapabilities;
    GetActionResultOp, GetActionResult;
    UpdateActionResultOp, UpdateActionResult;
    FindMissingBlobsOp, FindMissingBlobs;
    BatchReadBlobsOp, BatchReadBlobs;
    BatchUpdateBlobsOp, BatchUpdateBlobs;
    GetTreeOp, GetTree;
    ReadOp, Read;
    WriteOp, Write;
    QueryWriteStatusOp, QueryWriteStatus;
    ExecuteOp, Execute;
    WaitExecutionOp, WaitExecution;
}

pub struct OriginEventContext<T: OperationType> {
    inner: Option<Arc<OriginEventContextImpl>>,
    _phantom: PhantomData<T>,
}

impl<T: OperationType> Clone for OriginEventContext<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            _phantom: PhantomData,
        }
    }
}

#[derive(Clone)]
struct OriginEventContextImpl {
    collector: Arc<mpsc::Sender<OperationEvent>>,
    parent_event_id: Uuid,
}

impl<T: OperationType> OriginEventContext<T> {
    pub async fn new<D: IntoEventData>(request: D) -> Self {
        let collector = ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
            .ok()
            .flatten();

        let inner = if let Some(collector) = collector {
            let event = OperationEvent {
                version: 0,
                event_id: Uuid::now_v6(get_node_id()).to_string(),
                parent_event_id: String::new(),
                bazel_request_metadata: ActiveOriginContext::get_value(&BAZEL_REQUEST_METDATA)
                    .ok()
                    .flatten()
                    .map(|v| v.as_ref().clone()),
                status: None,
                r#type: T::TYPE as i32,
                phase: Phase::Request as i32,
                data: Some(request.into_event_data()),
            };
            let parent_event_id = publish_event(&collector, event).await;
            Some(Arc::new(OriginEventContextImpl {
                collector,
                parent_event_id,
            }))
        } else {
            None
        };

        Self {
            inner,
            _phantom: PhantomData,
        }
    }

    pub async fn emit<D: IntoEventData>(&self, data: D, phase: Phase) {
        if let Some(ctx) = &self.inner {
            let event = OperationEvent {
                version: 0,
                event_id: Uuid::now_v6(get_node_id()).to_string(),
                parent_event_id: ctx.parent_event_id.to_string(),
                bazel_request_metadata: None,
                status: None,
                r#type: T::TYPE as i32,
                phase: phase as i32,
                data: Some(data.into_event_data()),
            };
            publish_event(&ctx.collector, event).await;
        }
    }

    pub fn wrap_stream<S, D>(&self, stream: S) -> impl Stream<Item = Result<D, Status>>
    where
        S: Stream<Item = Result<D, Status>>,
        D: IntoEventData + Clone,
    {
        StreamWrapper {
            stream,
            ctx: Arc::new((*self).clone()),
            _phantom: PhantomData,
        }
    }
}

#[pin_project::pin_project]
struct StreamWrapper<S, D, T: OperationType> {
    #[pin]
    stream: S,
    ctx: Arc<OriginEventContext<T>>,
    _phantom: PhantomData<D>,
}

impl<S, D, T> Stream for StreamWrapper<S, D, T>
where
    S: Stream<Item = Result<D, Status>>,
    D: IntoEventData + Clone,
    T: OperationType,
{
    type Item = Result<D, Status>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        match this.stream.poll_next(cx) {
            Poll::Ready(Some(result)) => {
                if let Some(ctx) = &this.ctx.inner {
                    let event = OperationEvent {
                        version: 0,
                        event_id: Uuid::now_v6(get_node_id()).to_string(),
                        parent_event_id: ctx.parent_event_id.to_string(),
                        bazel_request_metadata: None,
                        status: result.as_ref().err().cloned(),
                        r#type: T::TYPE as i32,
                        phase: Phase::Stream as i32,
                        data: result.as_ref().ok().map(|d| d.clone().into_event_data()),
                    };
                    let _ = ctx.collector.try_send(event);
                }
                Poll::Ready(Some(result))
            }
            other => other,
        }
    }
}

async fn publish_event(collector: &mpsc::Sender<OperationEvent>, event: OperationEvent) -> Uuid {
    let event_id = Uuid::parse_str(&event.event_id).expect("valid UUID");
    let _ = collector.send(event).await;
    event_id
}

nativelink-proto/com/github/trace_machina/nativelink/remote_execution/events.proto line 182 at r1 (raw file):

        build.bazel.remote.execution.v2.WaitExecutionRequest wait_execution_request = 52;
        google.protobuf.Empty wait_execution_response = 53;
        google.rpc.Status wait_execute_error = 54;

nit: I wonder whether we actually need separate rpc calls for all of these. For instance, all the different error types seem to require a bunch of duplicate impls for impl OriginEventSource<T> for TonicStatus. If we're handling all those errors in the same way anyways it might be possible to just have a single error type and have a single impl.

Roughly something like this maybe?

syntax = "proto3";

package com.github.trace_machina.nativelink.events;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/bytestream/bytestream.proto";
import "google/longrunning/operations.proto";
import "google/rpc/status.proto";

message OperationEvent {
    uint32 version = 1;
    string event_id = 2;
    string parent_event_id = 3;
    build.bazel.remote.execution.v2.RequestMetadata bazel_request_metadata = 4;
    google.rpc.Status status = 5;

    enum Type {
        UNKNOWN = 0;
        GET_CAPABILITIES = 1;
        GET_ACTION_RESULT = 2;
        UPDATE_ACTION_RESULT = 3;
        FIND_MISSING_BLOBS = 4;
        BATCH_READ_BLOBS = 5;
        BATCH_UPDATE_BLOBS = 6;
        GET_TREE = 7;
        READ = 8;
        WRITE = 9;
        QUERY_WRITE_STATUS = 10;
        EXECUTE = 11;
        WAIT_EXECUTION = 12;
    }
    Type type = 6;

    enum Phase {
        PHASE_UNKNOWN = 0;
        REQUEST = 1;
        RESPONSE = 2;
        STREAM = 3;
    }
    Phase phase = 7;

    oneof data {
        build.bazel.remote.execution.v2.GetCapabilitiesRequest capabilities_request = 10;
        build.bazel.remote.execution.v2.ServerCapabilities capabilities_response = 11;
        build.bazel.remote.execution.v2.GetActionResultRequest action_result_request = 12;
        build.bazel.remote.execution.v2.ActionResult action_result = 13;
        build.bazel.remote.execution.v2.FindMissingBlobsRequest find_missing_request = 14;
        build.bazel.remote.execution.v2.FindMissingBlobsResponse find_missing_response = 15;
        BatchOperation batch_operation = 16;
        StreamOperation stream_operation = 17;
        build.bazel.remote.execution.v2.ExecuteRequest execute_request = 18;
        google.longrunning.Operation operation_response = 19;
    }
}

message BatchOperation {
    string instance_name = 1;
    build.bazel.remote.execution.v2.DigestFunction.Value digest_function = 2;
    
    message BlobInfo {
        build.bazel.remote.execution.v2.Digest digest = 1;
        build.bazel.remote.execution.v2.Compressor.Value compressor = 2;
        uint64 data_len = 3;
        google.rpc.Status status = 4; // Per-blob status
    }
    repeated BlobInfo blobs = 3;
}

message StreamOperation {
    string resource_name = 1;
    int64 offset = 2;
    bool finish = 3;
    uint64 data_len = 4;
    uint64 processed_bytes = 5; // For tracking progress
}

message OriginEvents {
    repeated OperationEvent events = 1;
}

nativelink-config/src/cas_server.rs line 215 at r1 (raw file):

#[derive(Deserialize, Debug)]
pub struct OriginEventsPublisherConfig {

nit: Since we're moving to a model where "options" are called Spec and Configs refer to {name, spec} pairs, it would be more future-proof to name this OriginEventsPublisherSpec and similarly OriginEventsSpec.


nativelink-config/src/cas_server.rs line 231 at r1 (raw file):

    /// Zero is default.
    ///
    /// Default: 65536

nit: Is the default 0 or 65536?


nativelink-proto/gen_lib_rs_tool.py line 41 at r1 (raw file):

#![allow(warnings)]
#![allow(clippy::all)]

nit: Instead of disabling everything, we could fix the ~5 instances that complain about indentation and disable the unfixable ones explicitly.

It's also worth noting that apparently one of the lints might warn against stack overflows: https://rust-lang.github.io/rust-clippy/master/index.html#large_futures


nativelink-service/tests/ac_server_test.rs line 118 at r1 (raw file):

    let err = raw_response.unwrap_err();
    assert_eq!(err.code(), Code::NotFound);
    assert!(err.message().is_empty());

nit: Is it correct that this is now empty?

@allada allada force-pushed the boobar5 branch 2 times, most recently from 7657c37 to 32437c5 Compare December 10, 2024 16:45
Copy link
Member Author

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed @MarcusSorealheis from a discussion.
Reviewable status: 1 of 1 LGTMs obtained, and 25 of 43 files reviewed, and pending CI: Analyze (javascript-typescript), Analyze (python), Bazel Dev / macos-13, Bazel Dev / macos-14, Bazel Dev / ubuntu-24.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Coverage, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, NativeLink.com Cloud / Remote Cache / macos-14, NativeLink.com Cloud / Remote Cache / ubuntu-24.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, Web Platform Deployment / macos-14, Web Platform Deployment / ubuntu-24.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (22.04), integration-tests (22.04), macos-13, pre-commit-checks, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, vale, windows-2022 / stable, and 1 discussions need to be resolved


nativelink-config/src/cas_server.rs line 215 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Since we're moving to a model where "options" are called Spec and Configs refer to {name, spec} pairs, it would be more future-proof to name this OriginEventsPublisherSpec and similarly OriginEventsSpec.

Done.


nativelink-config/src/cas_server.rs line 231 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Is the default 0 or 65536?

Revised comment.


nativelink-proto/gen_lib_rs_tool.py line 41 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Instead of disabling everything, we could fix the ~5 instances that complain about indentation and disable the unfixable ones explicitly.

It's also worth noting that apparently one of the lints might warn against stack overflows: https://rust-lang.github.io/rust-clippy/master/index.html#large_futures

Done.


nativelink-proto/com/github/trace_machina/nativelink/remote_execution/events.proto line 182 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: I wonder whether we actually need separate rpc calls for all of these. For instance, all the different error types seem to require a bunch of duplicate impls for impl OriginEventSource<T> for TonicStatus. If we're handling all those errors in the same way anyways it might be possible to just have a single error type and have a single impl.

Roughly something like this maybe?

syntax = "proto3";

package com.github.trace_machina.nativelink.events;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/bytestream/bytestream.proto";
import "google/longrunning/operations.proto";
import "google/rpc/status.proto";

message OperationEvent {
    uint32 version = 1;
    string event_id = 2;
    string parent_event_id = 3;
    build.bazel.remote.execution.v2.RequestMetadata bazel_request_metadata = 4;
    google.rpc.Status status = 5;

    enum Type {
        UNKNOWN = 0;
        GET_CAPABILITIES = 1;
        GET_ACTION_RESULT = 2;
        UPDATE_ACTION_RESULT = 3;
        FIND_MISSING_BLOBS = 4;
        BATCH_READ_BLOBS = 5;
        BATCH_UPDATE_BLOBS = 6;
        GET_TREE = 7;
        READ = 8;
        WRITE = 9;
        QUERY_WRITE_STATUS = 10;
        EXECUTE = 11;
        WAIT_EXECUTION = 12;
    }
    Type type = 6;

    enum Phase {
        PHASE_UNKNOWN = 0;
        REQUEST = 1;
        RESPONSE = 2;
        STREAM = 3;
    }
    Phase phase = 7;

    oneof data {
        build.bazel.remote.execution.v2.GetCapabilitiesRequest capabilities_request = 10;
        build.bazel.remote.execution.v2.ServerCapabilities capabilities_response = 11;
        build.bazel.remote.execution.v2.GetActionResultRequest action_result_request = 12;
        build.bazel.remote.execution.v2.ActionResult action_result = 13;
        build.bazel.remote.execution.v2.FindMissingBlobsRequest find_missing_request = 14;
        build.bazel.remote.execution.v2.FindMissingBlobsResponse find_missing_response = 15;
        BatchOperation batch_operation = 16;
        StreamOperation stream_operation = 17;
        build.bazel.remote.execution.v2.ExecuteRequest execute_request = 18;
        google.longrunning.Operation operation_response = 19;
    }
}

message BatchOperation {
    string instance_name = 1;
    build.bazel.remote.execution.v2.DigestFunction.Value digest_function = 2;
    
    message BlobInfo {
        build.bazel.remote.execution.v2.Digest digest = 1;
        build.bazel.remote.execution.v2.Compressor.Value compressor = 2;
        uint64 data_len = 3;
        google.rpc.Status status = 4; // Per-blob status
    }
    repeated BlobInfo blobs = 3;
}

message StreamOperation {
    string resource_name = 1;
    int64 offset = 2;
    bool finish = 3;
    uint64 data_len = 4;
    uint64 processed_bytes = 5; // For tracking progress
}

message OriginEvents {
    repeated OperationEvent events = 1;
}

Done. Followed a similar pattern.


nativelink-proto/genproto/blaze.invocation_policy.pb.rs line 1 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Copyright years are wrong and in various other files.

Yeah, lets do this in another PR, since this change didn't affect it.


nativelink-service/tests/ac_server_test.rs line 118 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Is it correct that this is now empty?

Now it is, yes... We now remove the error messages in a NotFound, since NotFound is so common.


nativelink-util/src/origin_event.rs line 590 at r1 (raw file):

Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…

nit: Soo I played around with this way too long and I believe something like this could work as well (assuming the proto changes from previously):

use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};

use futures::Stream;
use nativelink_proto::build::bazel::remote::execution::v2::{
    ActionResult, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
    GetActionResultRequest, GetCapabilitiesRequest, ServerCapabilities,
};
use nativelink_proto::com::github::trace_machina::nativelink::events::operation_event::{
    Data, Phase, Type,
};
use nativelink_proto::com::github::trace_machina::nativelink::events::{
    BatchOperation, OperationEvent, StreamOperation,
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_proto::google::rpc::Status;
use rand::RngCore;
use tokio::sync::mpsc;
use uuid::Uuid;

use crate::make_symbol;
use crate::origin_context::ActiveOriginContext;
use crate::origin_event_middleware::BAZEL_REQUEST_METDATA;

static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();

pub fn get_node_id() -> &'static [u8; 6] {
    NODE_ID.get_or_init(|| {
        let mut rng = rand::thread_rng();
        let mut out = [0; 6];
        rng.fill_bytes(&mut out);
        out
    })
}

type OriginEventCollector = mpsc::Sender<OperationEvent>;
make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);

pub trait IntoEventData {
    fn into_event_data(self) -> Data;
}

macro_rules! impl_into_event_data {
    ($($ty:ident, $variant:ident);* $(;)?) => {
        $(
            impl IntoEventData for $ty {
                fn into_event_data(self) -> Data {
                    Data::$variant(self)
                }
            }
        )*
    }
}

impl_into_event_data! {
    GetCapabilitiesRequest, CapabilitiesRequest;
    ServerCapabilities, CapabilitiesResponse;
    GetActionResultRequest, ActionResultRequest;
    ActionResult, ActionResult;
    FindMissingBlobsRequest, FindMissingRequest;
    FindMissingBlobsResponse, FindMissingResponse;
    BatchOperation, BatchOperation;
    StreamOperation, StreamOperation;
    ExecuteRequest, ExecuteRequest;
    Operation, OperationResponse;
}

pub trait OperationType {
    const TYPE: Type;
}

macro_rules! declare_operation {
    ($($op:ident, $ty:ident);* $(;)?) => {
        $(
            pub struct $op;
            impl OperationType for $op {
                const TYPE: Type = Type::$ty;
            }
        )*
    }
}

declare_operation! {
    UnknownOp, Unknown;
    GetCapabilitiesOp, GetCapabilities;
    GetActionResultOp, GetActionResult;
    UpdateActionResultOp, UpdateActionResult;
    FindMissingBlobsOp, FindMissingBlobs;
    BatchReadBlobsOp, BatchReadBlobs;
    BatchUpdateBlobsOp, BatchUpdateBlobs;
    GetTreeOp, GetTree;
    ReadOp, Read;
    WriteOp, Write;
    QueryWriteStatusOp, QueryWriteStatus;
    ExecuteOp, Execute;
    WaitExecutionOp, WaitExecution;
}

pub struct OriginEventContext<T: OperationType> {
    inner: Option<Arc<OriginEventContextImpl>>,
    _phantom: PhantomData<T>,
}

impl<T: OperationType> Clone for OriginEventContext<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            _phantom: PhantomData,
        }
    }
}

#[derive(Clone)]
struct OriginEventContextImpl {
    collector: Arc<mpsc::Sender<OperationEvent>>,
    parent_event_id: Uuid,
}

impl<T: OperationType> OriginEventContext<T> {
    pub async fn new<D: IntoEventData>(request: D) -> Self {
        let collector = ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
            .ok()
            .flatten();

        let inner = if let Some(collector) = collector {
            let event = OperationEvent {
                version: 0,
                event_id: Uuid::now_v6(get_node_id()).to_string(),
                parent_event_id: String::new(),
                bazel_request_metadata: ActiveOriginContext::get_value(&BAZEL_REQUEST_METDATA)
                    .ok()
                    .flatten()
                    .map(|v| v.as_ref().clone()),
                status: None,
                r#type: T::TYPE as i32,
                phase: Phase::Request as i32,
                data: Some(request.into_event_data()),
            };
            let parent_event_id = publish_event(&collector, event).await;
            Some(Arc::new(OriginEventContextImpl {
                collector,
                parent_event_id,
            }))
        } else {
            None
        };

        Self {
            inner,
            _phantom: PhantomData,
        }
    }

    pub async fn emit<D: IntoEventData>(&self, data: D, phase: Phase) {
        if let Some(ctx) = &self.inner {
            let event = OperationEvent {
                version: 0,
                event_id: Uuid::now_v6(get_node_id()).to_string(),
                parent_event_id: ctx.parent_event_id.to_string(),
                bazel_request_metadata: None,
                status: None,
                r#type: T::TYPE as i32,
                phase: phase as i32,
                data: Some(data.into_event_data()),
            };
            publish_event(&ctx.collector, event).await;
        }
    }

    pub fn wrap_stream<S, D>(&self, stream: S) -> impl Stream<Item = Result<D, Status>>
    where
        S: Stream<Item = Result<D, Status>>,
        D: IntoEventData + Clone,
    {
        StreamWrapper {
            stream,
            ctx: Arc::new((*self).clone()),
            _phantom: PhantomData,
        }
    }
}

#[pin_project::pin_project]
struct StreamWrapper<S, D, T: OperationType> {
    #[pin]
    stream: S,
    ctx: Arc<OriginEventContext<T>>,
    _phantom: PhantomData<D>,
}

impl<S, D, T> Stream for StreamWrapper<S, D, T>
where
    S: Stream<Item = Result<D, Status>>,
    D: IntoEventData + Clone,
    T: OperationType,
{
    type Item = Result<D, Status>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        match this.stream.poll_next(cx) {
            Poll::Ready(Some(result)) => {
                if let Some(ctx) = &this.ctx.inner {
                    let event = OperationEvent {
                        version: 0,
                        event_id: Uuid::now_v6(get_node_id()).to_string(),
                        parent_event_id: ctx.parent_event_id.to_string(),
                        bazel_request_metadata: None,
                        status: result.as_ref().err().cloned(),
                        r#type: T::TYPE as i32,
                        phase: Phase::Stream as i32,
                        data: result.as_ref().ok().map(|d| d.clone().into_event_data()),
                    };
                    let _ = ctx.collector.try_send(event);
                }
                Poll::Ready(Some(result))
            }
            other => other,
        }
    }
}

async fn publish_event(collector: &mpsc::Sender<OperationEvent>, event: OperationEvent) -> Uuid {
    let event_id = Uuid::parse_str(&event.event_id).expect("valid UUID");
    let _ = collector.send(event).await;
    event_id
}

Done. Did a similar thing.

Adds ability for nativelink to publish events to a store/database
that can be used by another service to aggregate and/or visualize
what is happening related to requests, users or other related
metadata. By default bazel's metadata will be published with every
event if present giving the ability to combine with BEP data.
Copy link
Member

@aaronmondal aaronmondal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the evil templating compression lol

:lgtm:

Reviewed 7 of 18 files at r2, 7 of 7 files at r3, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and 39 of 43 files reviewed, and pending CI: Bazel Dev / macos-13, Bazel Dev / macos-14, Bazel Dev / ubuntu-24.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Coverage, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, NativeLink.com Cloud / Remote Cache / macos-14, NativeLink.com Cloud / Remote Cache / ubuntu-24.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, Web Platform Deployment / macos-14, asan / ubuntu-22.04, docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable


nativelink-proto/com/github/trace_machina/nativelink/remote_execution/events.proto line 182 at r1 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Done. Followed a similar pattern.

Oh this feels much more intuitive ❤️

Copy link
Member

@aaronmondal aaronmondal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 18 files at r2.
Reviewable status: 1 of 1 LGTMs obtained, and all files reviewed, and pending CI: Bazel Dev / macos-13, Bazel Dev / macos-14, Bazel Dev / ubuntu-24.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Coverage, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, NativeLink.com Cloud / Remote Cache / macos-14, NativeLink.com Cloud / Remote Cache / ubuntu-24.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, Web Platform Deployment / macos-14, asan / ubuntu-22.04, docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable

@allada allada merged commit f280e71 into TraceMachina:main Dec 11, 2024
35 checks passed
@allada allada deleted the boobar5 branch December 11, 2024 19:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants