Skip to content

Commit 1f0c726

Browse files
remove redundancy
1 parent 0d924dd commit 1f0c726

File tree

2 files changed

+9
-21
lines changed

2 files changed

+9
-21
lines changed

src/handlers/http/ingest.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,12 @@ pub async fn ingest(
118118

119119
//if stream exists, fetch the stream log source
120120
//return error if the stream log source is otel traces or otel metrics
121-
let stream = validate_stream_for_ingestion(&stream_name)?;
121+
validate_stream_for_ingestion(&stream_name)?;
122122

123123
PARSEABLE
124124
.add_update_log_source(&stream_name, log_source_entry)
125125
.await?;
126126

127-
if stream.get_time_partition().is_some() {
128-
return Err(PostError::CustomError(
129-
"Ingestion is not allowed to stream with time partition".to_string(),
130-
));
131-
}
132-
133127
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
134128

135129
Ok(HttpResponse::Ok().finish())
@@ -397,13 +391,7 @@ pub async fn post_event(
397391

398392
//if stream exists, fetch the stream log source
399393
//return error if the stream log source is otel traces or otel metrics
400-
let stream = validate_stream_for_ingestion(&stream_name)?;
401-
402-
if stream.get_time_partition().is_some() {
403-
return Err(PostError::Invalid(anyhow::anyhow!(
404-
"Ingestion is not allowed to stream with time partition. Please upgrade to Parseable enterprise to enable this feature"
405-
)));
406-
}
394+
validate_stream_for_ingestion(&stream_name)?;
407395

408396
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
409397

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
2323
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
2424
};
2525
use serde_json::Value;
26-
use std::{collections::HashMap, sync::Arc};
26+
use std::collections::HashMap;
2727
use tracing::warn;
2828

2929
use crate::{
@@ -39,7 +39,7 @@ use crate::{
3939
},
4040
},
4141
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
42-
parseable::{PARSEABLE, Stream},
42+
parseable::PARSEABLE,
4343
storage::StreamType,
4444
utils::json::{convert_array_to_object, flatten::convert_to_array},
4545
};
@@ -268,7 +268,7 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> {
268268
Ok(())
269269
}
270270

271-
pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<Arc<Stream>, PostError> {
271+
pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<(), PostError> {
272272
let stream = PARSEABLE.get_stream(stream_name)?;
273273

274274
// Validate that the stream's log source is compatible
@@ -283,12 +283,12 @@ pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<Arc<Stream>, P
283283

284284
// Check for time partition
285285
if stream.get_time_partition().is_some() {
286-
return Err(PostError::CustomError(
287-
"Ingestion is not allowed to stream with time partition".to_string(),
288-
));
286+
return Err(PostError::Invalid(anyhow::anyhow!(
287+
"Ingestion with time partition is not supported in Parseable OSS"
288+
)));
289289
}
290290

291-
Ok(stream)
291+
Ok(())
292292
}
293293

294294
#[cfg(test)]

0 commit comments

Comments
 (0)