Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,21 @@ use arrow_array::RecordBatch;
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use opentelemetry_proto::tonic::logs::v1::LogsData;
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
use opentelemetry_proto::tonic::trace::v1::TracesData;
use serde_json::Value;

use crate::event::error::EventError;
use crate::event::format::{self, EventFormat, LogSource};
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::otel::logs::flatten_otel_logs;
use crate::otel::metrics::flatten_otel_metrics;
use crate::otel::traces::flatten_otel_traces;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;
use crate::{event, LOCK_EXPECT};

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::modal::utils::ingest_utils::flatten_and_push_logs;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;

Expand All @@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
.get(LOG_SOURCE_KEY)
.and_then(|h| h.to_str().ok())
.map_or(LogSource::default(), LogSource::from);

if matches!(
log_source,
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
) {
return Err(PostError::OtelNotSupported);
}

flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -133,11 +135,7 @@ pub async fn handle_otel_logs_ingestion(
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
.await?;

//custom flattening required for otel logs
let logs: LogsData = serde_json::from_value(json)?;
for record in flatten_otel_logs(&logs) {
push_logs(&stream_name, record, &log_source).await?;
}
flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -168,11 +166,7 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

//custom flattening required for otel metrics
let metrics: MetricsData = serde_json::from_value(json)?;
for record in flatten_otel_metrics(metrics) {
push_logs(&stream_name, record, &log_source).await?;
}
flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -200,11 +194,7 @@ pub async fn handle_otel_traces_ingestion(
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
.await?;

//custom flattening required for otel traces
let traces: TracesData = serde_json::from_value(json)?;
for record in flatten_otel_traces(&traces) {
push_logs(&stream_name, record, &log_source).await?;
}
flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -245,6 +235,14 @@ pub async fn post_event(
.get(LOG_SOURCE_KEY)
.and_then(|h| h.to_str().ok())
.map_or(LogSource::default(), LogSource::from);

if matches!(
log_source,
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
) {
return Err(PostError::OtelNotSupported);
}

flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
Expand Down
32 changes: 27 additions & 5 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
use arrow_schema::Field;
use chrono::{DateTime, NaiveDateTime, Utc};
use itertools::Itertools;
use opentelemetry_proto::tonic::{
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};

Expand All @@ -32,6 +35,7 @@ use crate::{
kinesis::{flatten_kinesis_logs, Message},
},
metadata::SchemaVersion,
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
parseable::{StreamNotFound, PARSEABLE},
storage::StreamType,
utils::json::{convert_array_to_object, flatten::convert_to_array},
Expand All @@ -45,14 +49,32 @@ pub async fn flatten_and_push_logs(
) -> Result<(), PostError> {
match log_source {
LogSource::Kinesis => {
//custom flattening required for Amazon Kinesis
let message: Message = serde_json::from_value(json)?;
let json = flatten_kinesis_logs(message);
for record in json {
for record in flatten_kinesis_logs(message) {
push_logs(stream_name, record, &LogSource::default()).await?;
}
}
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
return Err(PostError::OtelNotSupported);
LogSource::OtelLogs => {
//custom flattening required for otel logs
let logs: LogsData = serde_json::from_value(json)?;
for record in flatten_otel_logs(&logs) {
push_logs(stream_name, record, log_source).await?;
}
}
LogSource::OtelTraces => {
//custom flattening required for otel traces
let traces: TracesData = serde_json::from_value(json)?;
for record in flatten_otel_traces(&traces) {
push_logs(stream_name, record, log_source).await?;
}
}
LogSource::OtelMetrics => {
//custom flattening required for otel metrics
let metrics: MetricsData = serde_json::from_value(json)?;
for record in flatten_otel_metrics(metrics) {
push_logs(stream_name, record, log_source).await?;
}
}
_ => {
push_logs(stream_name, json, log_source).await?;
Expand All @@ -61,7 +83,7 @@ pub async fn flatten_and_push_logs(
Ok(())
}

pub async fn push_logs(
async fn push_logs(
stream_name: &str,
json: Value,
log_source: &LogSource,
Expand Down
Loading