Skip to content

Commit

Permalink
enhancement(vector source): comply with component spec (#11059)
Browse files Browse the repository at this point in the history
* fix EventsSent byte_size value

Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com>

* add stream closed error

Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com>

* update event for v1

Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>

* update documentation

Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>

* fix: instrument well vector v2 source

Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
  • Loading branch information
jdrouet authored Feb 3, 2022
1 parent fce3b5f commit d0f782d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
25 changes: 23 additions & 2 deletions src/internal_events/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ pub struct VectorEventReceived {

impl InternalEvent for VectorEventReceived {
fn emit_logs(&self) {
trace!(message = "Received one event.");
trace!(
message = "Events received.",
count = 1,
byte_size = self.byte_size
);
}

fn emit_metrics(&self) {
counter!("component_received_events_total", 1);
counter!(
"component_received_event_bytes_total",
self.byte_size as u64
);
// deprecated
counter!("events_in_total", 1);
counter!("processed_bytes_total", self.byte_size as u64);
}
Expand All @@ -28,10 +37,22 @@ pub struct VectorProtoDecodeError<'a> {

impl<'a> InternalEvent for VectorProtoDecodeError<'a> {
fn emit_logs(&self) {
error!(message = "Failed to decode protobuf message.", error = ?self.error, internal_log_rate_secs = 10);
error!(
message = "Failed to decode protobuf message.",
error = ?self.error,
error_type = "parser_failed",
stage = "processing",
);
}

fn emit_metrics(&self) {
counter!(
"component_errors_total", 1,
"error" => self.error.to_string(),
"error_type" => "parser_failed",
"stage" => "processing",
);
// decoding
counter!("protobuf_decode_errors_total", 1);
}
}
3 changes: 2 additions & 1 deletion src/sinks/vector/v2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl tower::Service<VectorRequest> for VectorService {
fn call(&mut self, list: VectorRequest) -> Self::Future {
let mut service = self.clone();
let events_count = list.events.len();
let events_byte_size = list.events_byte_size;

let request = proto_vector::PushEventsRequest {
events: list.events,
Expand All @@ -118,7 +119,7 @@ impl tower::Service<VectorRequest> for VectorService {
});
VectorResponse {
events_count,
events_byte_size: 0,
events_byte_size,
}
})
.map_err(|source| VectorSinkError::Request { source }.into())
Expand Down
23 changes: 15 additions & 8 deletions src/sources/vector/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use tonic::{
transport::{server::Connected, Certificate, Server},
Request, Response, Status,
};
use tracing_futures::Instrument;
use vector_core::{
event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
ByteSizeOf,
};

use crate::{
config::{AcknowledgementsConfig, DataType, GenerateConfig, Output, Resource, SourceContext},
internal_events::{EventsReceived, TcpBytesReceived},
internal_events::{EventsReceived, StreamClosedError, TcpBytesReceived},
proto::vector as proto,
serde::bool_or_struct,
shutdown::ShutdownSignalToken,
Expand Down Expand Up @@ -42,17 +43,21 @@ impl proto::Service for Service {
.map(Event::from)
.collect();

emit!(&EventsReceived {
count: events.len(),
byte_size: events.size_of(),
});
let count = events.len();
let byte_size = events.size_of();

emit!(&EventsReceived { count, byte_size });

let receiver = BatchNotifier::maybe_apply_to_events(self.acknowledgements, &mut events);

self.pipeline
.clone()
.send_all(&mut futures::stream::iter(events))
.map_err(|err| Status::unavailable(err.to_string()))
.map_err(|error| {
let message = error.to_string();
emit!(&StreamClosedError { error, count });
Status::unavailable(message)
})
.and_then(|_| handle_batch_status(receiver))
.await?;

Expand Down Expand Up @@ -144,7 +149,7 @@ async fn run(
cx: SourceContext,
acknowledgements: AcknowledgementsConfig,
) -> crate::Result<()> {
let _span = crate::trace::current_span();
let span = crate::trace::current_span();

let service = proto::Server::new(Service {
pipeline: cx.out,
Expand All @@ -159,15 +164,17 @@ async fn run(
socket.after_read(move |byte_size| {
emit!(&TcpBytesReceived {
byte_size,
peer_addr
peer_addr,
})
})
})
});

Server::builder()
.trace_fn(move |_| span.clone())
.add_service(service)
.serve_with_incoming_shutdown(stream, cx.shutdown.map(|token| tx.send(token).unwrap()))
.in_current_span()
.await?;

drop(rx.await);
Expand Down
2 changes: 2 additions & 0 deletions website/cue/reference/components/sources/vector.cue
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ components: sources: vector: {
}

telemetry: metrics: {
component_discarded_events_total: components.sources.internal_metrics.output.metrics.component_discarded_events_total
component_errors_total: components.sources.internal_metrics.output.metrics.component_errors_total
component_received_bytes_total: components.sources.internal_metrics.output.metrics.component_received_bytes_total
component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total
component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total
Expand Down

0 comments on commit d0f782d

Please sign in to comment.