From b25c070bba44d7ca48366f8cd5007f7919b3faae Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 9 Feb 2025 22:23:52 +0530 Subject: [PATCH 1/5] refactor: specialized flatten together --- src/handlers/http/ingest.rs | 26 +++------------- src/handlers/http/modal/utils/ingest_utils.rs | 31 ++++++++++++++++--- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b3da07761..a13a38cdd 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -17,7 +17,7 @@ */ use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; +use super::modal::utils::ingest_utils::flatten_and_push_logs; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; use crate::event::format::LogSource; @@ -31,9 +31,6 @@ use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; -use crate::otel::logs::flatten_otel_logs; -use crate::otel::metrics::flatten_otel_metrics; -use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; @@ -44,9 +41,6 @@ use arrow_schema::Schema; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use opentelemetry_proto::tonic::logs::v1::LogsData; -use opentelemetry_proto::tonic::metrics::v1::MetricsData; -use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -130,11 +124,7 @@ pub async fn handle_otel_logs_ingestion( let stream_name = stream_name.to_str().unwrap().to_owned(); create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs).await?; - //custom flattening required for otel logs - let logs: LogsData = serde_json::from_value(json)?; - for record in flatten_otel_logs(&logs) { - push_logs(&stream_name, record, &log_source).await?; - } + flatten_and_push_logs(json, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } @@ -164,11 +154,7 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - //custom flattening required for otel metrics - let metrics: MetricsData = serde_json::from_value(json)?; - for record in flatten_otel_metrics(metrics) { - push_logs(&stream_name, record, &log_source).await?; - } + flatten_and_push_logs(json, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } @@ -195,11 +181,7 @@ pub async fn handle_otel_traces_ingestion( create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces) .await?; - //custom flattening required for otel traces - let traces: TracesData = serde_json::from_value(json)?; - for record in flatten_otel_traces(&traces) { - push_logs(&stream_name, record, &log_source).await?; - } + flatten_and_push_logs(json, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3a2b9c797..98f5fa7bc 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -19,6 +19,9 @@ use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; +use opentelemetry_proto::tonic::{ + logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, +}; use serde_json::Value; use std::{collections::HashMap, sync::Arc}; @@ -32,6 +35,7 @@ use crate::{ kinesis::{flatten_kinesis_logs, Message}, }, metadata::{SchemaVersion, STREAM_INFO}, + otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -44,13 +48,30 @@ pub async fn flatten_and_push_logs( match log_source { LogSource::Kinesis => { let message: Message = serde_json::from_value(json)?; - let json = flatten_kinesis_logs(message); - for record in json { + for record in flatten_kinesis_logs(message) { push_logs(stream_name, record, &LogSource::default()).await?; } } - LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::OtelNotSupported); + LogSource::OtelLogs => { + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_value(json)?; + for record in flatten_otel_logs(&logs) { + push_logs(&stream_name, record, &log_source).await?; + } + } + LogSource::OtelTraces => { + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_value(json)?; + for record in flatten_otel_traces(&traces) { + push_logs(&stream_name, record, &log_source).await?; + } + } + LogSource::OtelMetrics => { + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_value(json)?; + for record in flatten_otel_metrics(metrics) { + push_logs(&stream_name, record, &log_source).await?; + } } _ => { push_logs(stream_name, json, log_source).await?; @@ -59,7 +80,7 @@ pub async fn flatten_and_push_logs( Ok(()) } -pub async fn push_logs( +async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, From b7bcb7a572ba590c7fa01324ac4d845e150264e7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 10 Feb 2025 01:10:55 +0530 Subject: [PATCH 2/5] ci: clippy suggestions --- src/handlers/http/modal/utils/ingest_utils.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 98f5fa7bc..1544f0d51 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -56,21 +56,21 @@ pub async fn flatten_and_push_logs( //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; for record in flatten_otel_logs(&logs) { - push_logs(&stream_name, record, &log_source).await?; + push_logs(stream_name, record, log_source).await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; for record in flatten_otel_traces(&traces) { - push_logs(&stream_name, record, &log_source).await?; + push_logs(stream_name, record, log_source).await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; for record in flatten_otel_metrics(metrics) { - push_logs(&stream_name, record, &log_source).await?; + push_logs(stream_name, record, log_source).await?; } } _ => { From a46e4db9cbe5e242a28ed32bb05d2ad886b59140 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 11:20:02 +0530 Subject: [PATCH 3/5] chore: fmt --- src/handlers/http/modal/utils/ingest_utils.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index baa2eb474..596271a65 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -29,10 +29,17 @@ use crate::{ event::{ format::{json, EventFormat, LogSource}, Event, - }, handlers::http::{ + }, + handlers::http::{ ingest::PostError, kinesis::{flatten_kinesis_logs, Message}, - }, metadata::SchemaVersion, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::{StreamNotFound, PARSEABLE}, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, LOCK_EXPECT + }, + metadata::SchemaVersion, + otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, + parseable::{StreamNotFound, PARSEABLE}, + storage::StreamType, + utils::json::{convert_array_to_object, flatten::convert_to_array}, + LOCK_EXPECT, }; pub async fn flatten_and_push_logs( From 4e2f51a93bc79b79ea616105f838046b3b79c70a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 15:06:56 +0530 Subject: [PATCH 4/5] fix: block otel ingestion through general path --- src/handlers/http/ingest.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0d948a335..42de1cb4f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -64,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Date: Tue, 18 Feb 2025 15:07:11 +0530 Subject: [PATCH 5/5] doc: describe kinesis flattening --- src/handlers/http/modal/utils/ingest_utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 596271a65..37f5ee368 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -49,6 +49,7 @@ pub async fn flatten_and_push_logs( ) -> Result<(), PostError> { match log_source { LogSource::Kinesis => { + //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; for record in flatten_kinesis_logs(message) { push_logs(stream_name, record, &LogSource::default()).await?;