diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8a37e4c51..02bab9e97 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,10 +26,10 @@ use chrono::Utc; use http::StatusCode; use serde_json::Value; -use crate::event; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; +use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; @@ -124,7 +124,9 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< let size: usize = body.len(); let json: Value = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); - + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); + p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); // For internal streams, use old schema format::json::Event::new(json) .into_event( @@ -136,7 +138,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< None, SchemaVersion::V0, StreamType::Internal, - &HashMap::new(), + &p_custom_fields, )? .process()?;