diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 61b72efb470e..bca33a607f6e 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -23,11 +23,11 @@ pub mod value; use std::collections::BTreeMap; use std::sync::Arc; -use ahash::HashSet; -use common_telemetry::debug; -use error::{IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu}; +use error::{ + IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu, +}; use itertools::Itertools; -use processor::{Processor, ProcessorBuilder, Processors}; +use processor::{IntermediateStatus, Processor, Processors}; use snafu::{OptionExt, ResultExt}; use transform::{Transformer, Transforms}; use value::Value; @@ -56,6 +56,10 @@ where Content::Yaml(str) => { let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?; + if docs.len() != 1 { + return YamlParseSnafu.fail(); + } + let doc = &docs[0]; let description = doc[DESCRIPTION].as_str().map(|s| s.to_string()); @@ -144,6 +148,25 @@ impl PipelineExecOutput { } } +pub fn json_to_intermediate_state(val: serde_json::Value) -> Result { + match val { + serde_json::Value::Object(map) => { + let mut intermediate_state = BTreeMap::new(); + for (k, v) in map { + intermediate_state.insert(k, Value::try_from(v)?); + } + Ok(intermediate_state) + } + _ => PrepareValueMustBeObjectSnafu.fail(), + } +} + +pub fn json_array_to_intermediate_state( + val: Vec, +) -> Result> { + val.into_iter().map(json_to_intermediate_state).collect() +} + impl Pipeline where T: Transformer, @@ -152,27 +175,22 @@ where &self, val: &mut BTreeMap, ) -> Result> { - // for processor in self.processors.iter() { - // processor.exec_mut(val)?; - // } - - // let matched_rule = self - // .dispatcher - // .as_ref() - // .and_then(|dispatcher| dispatcher.exec(&self.intermediate_keys, val)); - - // match matched_rule { - // None => self - // .transformer - // .transform_mut(val) - // .map(PipelineExecOutput::Transformed), - // Some(rule) => Ok(PipelineExecOutput::DispatchedTo(rule.into())), - // } - todo!() - } + for processor in self.processors.iter() { + processor.exec_mut(val)?; + } - pub fn prepare(&self, val: serde_json::Value) -> Result> { - todo!() + let matched_rule = self + .dispatcher + .as_ref() + .and_then(|dispatcher| dispatcher.exec(val)); + + match matched_rule { + None => self + .transformer + .transform_mut(val) + .map(PipelineExecOutput::Transformed), + Some(rule) => Ok(PipelineExecOutput::DispatchedTo(rule.into())), + } } pub fn processors(&self) -> &processor::Processors { @@ -254,242 +272,242 @@ mod tests { use super::*; use crate::etl::transform::GreptimeTransformer; -// #[test] -// fn test_pipeline_prepare() { -// let input_value_str = r#" -// { -// "my_field": "1,2", -// "foo": "bar" -// } -// "#; -// let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); - -// let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat' -// processors: -// - csv: -// field: my_field -// target_fields: field1, field2 -// transform: -// - field: field1 -// type: uint32 -// - field: field2 -// type: uint32 -// "#; -// let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); -// let mut payload = pipeline.init_intermediate_state(); -// pipeline.prepare(input_value, &mut payload).unwrap(); -// assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); -// assert_eq!( -// payload, -// vec![Value::String("1,2".to_string()), Value::Null, Value::Null] -// ); -// let result = pipeline -// .exec_mut(&mut payload) -// .unwrap() -// .into_transformed() -// .unwrap(); - -// assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); -// assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); -// match &result.values[2].value_data { -// Some(ValueData::TimestampNanosecondValue(v)) => { -// assert_ne!(*v, 0); -// } -// _ => panic!("expect null value"), -// } -// } - -// #[test] -// fn test_dissect_pipeline() { -// let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string(); -// let pipeline_str = r#"processors: -// - dissect: -// fields: -// - message -// patterns: -// - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}" -// - timestamp: -// fields: -// - ts -// formats: -// - "%d/%b/%Y:%H:%M:%S %z" - -// transform: -// - fields: -// - ip -// - username -// - method -// - path -// - proto -// type: string -// - fields: -// - status -// type: uint16 -// - fields: -// - bytes -// type: uint32 -// - field: ts -// type: timestamp, ns -// index: time"#; -// let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); -// let mut payload = pipeline.init_intermediate_state(); -// pipeline -// .prepare(serde_json::Value::String(message), &mut payload) -// .unwrap(); -// let result = pipeline -// .exec_mut(&mut payload) -// .unwrap() -// .into_transformed() -// .unwrap(); -// let sechema = pipeline.schemas(); - -// assert_eq!(sechema.len(), result.values.len()); -// let test = vec![ -// ( -// ColumnDataType::String as i32, -// Some(ValueData::StringValue("129.37.245.88".into())), -// ), -// ( -// ColumnDataType::String as i32, -// Some(ValueData::StringValue("meln1ks".into())), -// ), -// ( -// ColumnDataType::String as i32, -// Some(ValueData::StringValue("PATCH".into())), -// ), -// ( -// ColumnDataType::String as i32, -// Some(ValueData::StringValue( -// "/observability/metrics/production".into(), -// )), -// ), -// ( -// ColumnDataType::String as i32, -// Some(ValueData::StringValue("HTTP/1.0".into())), -// ), -// ( -// ColumnDataType::Uint16 as i32, -// Some(ValueData::U16Value(501)), -// ), -// ( -// ColumnDataType::Uint32 as i32, -// Some(ValueData::U32Value(33085)), -// ), -// ( -// ColumnDataType::TimestampNanosecond as i32, -// Some(ValueData::TimestampNanosecondValue(1722493367000000000)), -// ), -// ]; -// for i in 0..sechema.len() { -// let schema = &sechema[i]; -// let value = &result.values[i]; -// assert_eq!(schema.datatype, test[i].0); -// assert_eq!(value.value_data, test[i].1); -// } -// } - -// #[test] -// fn test_csv_pipeline() { -// let input_value_str = r#" -// { -// "my_field": "1,2", -// "foo": "bar" -// } -// "#; -// let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); - -// let pipeline_yaml = r#" -// description: Pipeline for Apache Tomcat -// processors: -// - csv: -// field: my_field -// target_fields: field1, field2 -// transform: -// - field: field1 -// type: uint32 -// - field: field2 -// type: uint32 -// "#; - -// let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); -// let mut payload = pipeline.init_intermediate_state(); -// pipeline.prepare(input_value, &mut payload).unwrap(); -// assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); -// assert_eq!( -// payload, -// vec![Value::String("1,2".to_string()), Value::Null, Value::Null] -// ); -// let result = pipeline -// .exec_mut(&mut payload) -// .unwrap() -// .into_transformed() -// .unwrap(); -// assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); -// assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); -// match &result.values[2].value_data { -// Some(ValueData::TimestampNanosecondValue(v)) => { -// assert_ne!(*v, 0); -// } -// _ => panic!("expect null value"), -// } -// } - -// #[test] -// fn test_date_pipeline() { -// let input_value_str = r#" -// { -// "my_field": "1,2", -// "foo": "bar", -// "test_time": "2014-5-17T04:34:56+00:00" -// } -// "#; -// let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); - -// let pipeline_yaml = r#" -// --- -// description: Pipeline for Apache Tomcat - -// processors: -// - timestamp: -// field: test_time - -// transform: -// - field: test_time -// type: timestamp, ns -// index: time -// "#; - -// let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); -// let schema = pipeline.schemas().clone(); -// let mut result = pipeline.init_intermediate_state(); -// pipeline.prepare(input_value, &mut result).unwrap(); -// let row = pipeline -// .exec_mut(&mut result) -// .unwrap() -// .into_transformed() -// .unwrap(); -// let output = Rows { -// schema, -// rows: vec![row], -// }; -// let schemas = output.schema; - -// assert_eq!(schemas.len(), 1); -// let schema = schemas[0].clone(); -// assert_eq!("test_time", schema.column_name); -// assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype); -// assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type); - -// let row = output.rows[0].clone(); -// assert_eq!(1, row.values.len()); -// let value_data = row.values[0].clone().value_data; -// assert_eq!( -// Some(v1::value::ValueData::TimestampNanosecondValue( -// 1400301296000000000 -// )), -// value_data -// ); -// } + // #[test] + // fn test_pipeline_prepare() { + // let input_value_str = r#" + // { + // "my_field": "1,2", + // "foo": "bar" + // } + // "#; + // let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); + + // let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat' + // processors: + // - csv: + // field: my_field + // target_fields: field1, field2 + // transform: + // - field: field1 + // type: uint32 + // - field: field2 + // type: uint32 + // "#; + // let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + // let mut payload = pipeline.init_intermediate_state(); + // pipeline.prepare(input_value, &mut payload).unwrap(); + // assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); + // assert_eq!( + // payload, + // vec![Value::String("1,2".to_string()), Value::Null, Value::Null] + // ); + // let result = pipeline + // .exec_mut(&mut payload) + // .unwrap() + // .into_transformed() + // .unwrap(); + + // assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); + // assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); + // match &result.values[2].value_data { + // Some(ValueData::TimestampNanosecondValue(v)) => { + // assert_ne!(*v, 0); + // } + // _ => panic!("expect null value"), + // } + // } + + // #[test] + // fn test_dissect_pipeline() { + // let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string(); + // let pipeline_str = r#"processors: + // - dissect: + // fields: + // - message + // patterns: + // - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}" + // - timestamp: + // fields: + // - ts + // formats: + // - "%d/%b/%Y:%H:%M:%S %z" + + // transform: + // - fields: + // - ip + // - username + // - method + // - path + // - proto + // type: string + // - fields: + // - status + // type: uint16 + // - fields: + // - bytes + // type: uint32 + // - field: ts + // type: timestamp, ns + // index: time"#; + // let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); + // let mut payload = pipeline.init_intermediate_state(); + // pipeline + // .prepare(serde_json::Value::String(message), &mut payload) + // .unwrap(); + // let result = pipeline + // .exec_mut(&mut payload) + // .unwrap() + // .into_transformed() + // .unwrap(); + // let sechema = pipeline.schemas(); + + // assert_eq!(sechema.len(), result.values.len()); + // let test = vec![ + // ( + // ColumnDataType::String as i32, + // Some(ValueData::StringValue("129.37.245.88".into())), + // ), + // ( + // ColumnDataType::String as i32, + // Some(ValueData::StringValue("meln1ks".into())), + // ), + // ( + // ColumnDataType::String as i32, + // Some(ValueData::StringValue("PATCH".into())), + // ), + // ( + // ColumnDataType::String as i32, + // Some(ValueData::StringValue( + // "/observability/metrics/production".into(), + // )), + // ), + // ( + // ColumnDataType::String as i32, + // Some(ValueData::StringValue("HTTP/1.0".into())), + // ), + // ( + // ColumnDataType::Uint16 as i32, + // Some(ValueData::U16Value(501)), + // ), + // ( + // ColumnDataType::Uint32 as i32, + // Some(ValueData::U32Value(33085)), + // ), + // ( + // ColumnDataType::TimestampNanosecond as i32, + // Some(ValueData::TimestampNanosecondValue(1722493367000000000)), + // ), + // ]; + // for i in 0..sechema.len() { + // let schema = &sechema[i]; + // let value = &result.values[i]; + // assert_eq!(schema.datatype, test[i].0); + // assert_eq!(value.value_data, test[i].1); + // } + // } + + // #[test] + // fn test_csv_pipeline() { + // let input_value_str = r#" + // { + // "my_field": "1,2", + // "foo": "bar" + // } + // "#; + // let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); + + // let pipeline_yaml = r#" + // description: Pipeline for Apache Tomcat + // processors: + // - csv: + // field: my_field + // target_fields: field1, field2 + // transform: + // - field: field1 + // type: uint32 + // - field: field2 + // type: uint32 + // "#; + + // let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + // let mut payload = pipeline.init_intermediate_state(); + // pipeline.prepare(input_value, &mut payload).unwrap(); + // assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); + // assert_eq!( + // payload, + // vec![Value::String("1,2".to_string()), Value::Null, Value::Null] + // ); + // let result = pipeline + // .exec_mut(&mut payload) + // .unwrap() + // .into_transformed() + // .unwrap(); + // assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); + // assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); + // match &result.values[2].value_data { + // Some(ValueData::TimestampNanosecondValue(v)) => { + // assert_ne!(*v, 0); + // } + // _ => panic!("expect null value"), + // } + // } + + // #[test] + // fn test_date_pipeline() { + // let input_value_str = r#" + // { + // "my_field": "1,2", + // "foo": "bar", + // "test_time": "2014-5-17T04:34:56+00:00" + // } + // "#; + // let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); + + // let pipeline_yaml = r#" + // --- + // description: Pipeline for Apache Tomcat + + // processors: + // - timestamp: + // field: test_time + + // transform: + // - field: test_time + // type: timestamp, ns + // index: time + // "#; + + // let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); + // let schema = pipeline.schemas().clone(); + // let mut result = pipeline.init_intermediate_state(); + // pipeline.prepare(input_value, &mut result).unwrap(); + // let row = pipeline + // .exec_mut(&mut result) + // .unwrap() + // .into_transformed() + // .unwrap(); + // let output = Rows { + // schema, + // rows: vec![row], + // }; + // let schemas = output.schema; + + // assert_eq!(schemas.len(), 1); + // let schema = schemas[0].clone(); + // assert_eq!("test_time", schema.column_name); + // assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype); + // assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type); + + // let row = output.rows[0].clone(); + // assert_eq!(1, row.values.len()); + // let value_data = row.values[0].clone().value_data; + // assert_eq!( + // Some(v1::value::ValueData::TimestampNanosecondValue( + // 1400301296000000000 + // )), + // value_data + // ); + // } #[test] fn test_dispatcher() { diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index d1e0b56e6e9d..51080c86eebf 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -543,6 +543,11 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Yaml parse error."))] + YamlParse { + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Prepare value must be an object"))] PrepareValueMustBeObject { #[snafu(implicit)] diff --git a/src/pipeline/src/etl/field.rs b/src/pipeline/src/etl/field.rs index 10fa681f236c..dd4835ec9279 100644 --- a/src/pipeline/src/etl/field.rs +++ b/src/pipeline/src/etl/field.rs @@ -19,133 +19,12 @@ use snafu::OptionExt; use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu}; use crate::etl::error::{Error, Result}; -use crate::etl::find_key_index; - -/// Information about the input field including the name and index in intermediate keys. -#[derive(Debug, Default, Clone)] -pub struct InputFieldInfo { - pub(crate) name: String, - pub(crate) index: usize, -} - -impl InputFieldInfo { - /// Create a new input field info with the given field name and index. - pub(crate) fn new(field: impl Into, index: usize) -> Self { - InputFieldInfo { - name: field.into(), - index, - } - } -} - -/// Information about a field that has one input and one output. -#[derive(Debug, Default, Clone)] -pub struct OneInputOneOutputField { - input: InputFieldInfo, - output: Option<(String, usize)>, -} - -impl OneInputOneOutputField { - /// Create a new field with the given input and output. - pub(crate) fn new(input: InputFieldInfo, output: (String, usize)) -> Self { - OneInputOneOutputField { - input, - output: Some(output), - } - } - - /// Build a new field with the given processor kind, intermediate keys, input field, and target field. - pub(crate) fn build( - processor_kind: &str, - intermediate_keys: &[String], - input_field: &str, - target_field: &str, - ) -> Result { - let input_index = find_key_index(intermediate_keys, input_field, processor_kind)?; - - let input_field_info = InputFieldInfo::new(input_field, input_index); - let output_index = find_key_index(intermediate_keys, target_field, processor_kind)?; - Ok(OneInputOneOutputField::new( - input_field_info, - (target_field.to_string(), output_index), - )) - } - - /// Get the input field information. - pub(crate) fn input(&self) -> &InputFieldInfo { - &self.input - } - - /// Get the index of the input field. - pub(crate) fn input_index(&self) -> usize { - self.input.index - } - - /// Get the name of the input field. - pub(crate) fn input_name(&self) -> &str { - &self.input.name - } - - /// Get the index of the output field. - pub(crate) fn output_index(&self) -> usize { - *self.output().1 - } - - /// Get the name of the output field. - pub(crate) fn output_name(&self) -> &str { - self.output().0 - } - - /// Get the output field information. - pub(crate) fn output(&self) -> (&String, &usize) { - if let Some((name, index)) = &self.output { - (name, index) - } else { - (&self.input.name, &self.input.index) - } - } -} - -/// Information about a field that has one input and multiple outputs. -#[derive(Debug, Default, Clone)] -pub struct OneInputMultiOutputField { - input: InputFieldInfo, - /// Typically, processors that output multiple keys need to be distinguished by splicing the keys together. - prefix: Option, -} - -impl OneInputMultiOutputField { - /// Create a new field with the given input and prefix. - pub(crate) fn new(input: InputFieldInfo, prefix: Option) -> Self { - OneInputMultiOutputField { input, prefix } - } - - /// Get the input field information. - pub(crate) fn input(&self) -> &InputFieldInfo { - &self.input - } - - /// Get the index of the input field. - pub(crate) fn input_index(&self) -> usize { - self.input.index - } - - /// Get the name of the input field. - pub(crate) fn input_name(&self) -> &str { - &self.input.name - } - - /// Get the prefix for the output fields. - pub(crate) fn target_prefix(&self) -> &str { - self.prefix.as_deref().unwrap_or(&self.input.name) - } -} /// Raw processor-defined inputs and outputs #[derive(Debug, Default, Clone)] pub struct Field { - pub(crate) input_field: String, - pub(crate) target_field: Option, + input_field: String, + target_field: Option, } impl FromStr for Field { @@ -194,6 +73,10 @@ impl Field { pub(crate) fn target_or_input_field(&self) -> &str { self.target_field.as_deref().unwrap_or(&self.input_field) } + + pub(crate) fn set_target_field(&mut self, target_field: Option) { + self.target_field = target_field; + } } /// A collection of fields. diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index b6df91204c39..63854ad552a7 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -12,49 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cmcd; -pub mod csv; +// pub mod cmcd; +// pub mod csv; pub mod date; pub mod decolorize; pub mod digest; -pub mod dissect; +// pub mod dissect; pub mod epoch; pub mod gsub; pub mod join; pub mod json_path; pub mod letter; -pub mod regex; +// pub mod regex; pub mod timestamp; pub mod urlencoding; use std::collections::BTreeMap; -use ahash::{HashSet, HashSetExt}; -use cmcd::{CmcdProcessor, CmcdProcessorBuilder}; -use csv::{CsvProcessor, CsvProcessorBuilder}; -use date::{DateProcessor, DateProcessorBuilder}; -use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder}; -use digest::{DigestProcessor, DigestProcessorBuilder}; -use dissect::{DissectProcessor, DissectProcessorBuilder}; +// use cmcd::CmcdProcessor; +// use csv::CsvProcessor; +use date::DateProcessor; +use decolorize::DecolorizeProcessor; +use digest::DigestProcessor; +// use dissect::DissectProcessor; use enum_dispatch::enum_dispatch; -use epoch::{EpochProcessor, EpochProcessorBuilder}; -use gsub::{GsubProcessor, GsubProcessorBuilder}; -use itertools::Itertools; -use join::{JoinProcessor, JoinProcessorBuilder}; -use json_path::{JsonPathProcessor, JsonPathProcessorBuilder}; -use letter::{LetterProcessor, LetterProcessorBuilder}; -use regex::{RegexProcessor, RegexProcessorBuilder}; +use epoch::EpochProcessor; +use gsub::GsubProcessor; +use join::JoinProcessor; +use json_path::JsonPathProcessor; +use letter::LetterProcessor; +// use regex::RegexProcessor; use snafu::{OptionExt, ResultExt}; -use timestamp::{TimestampProcessor, TimestampProcessorBuilder}; -use urlencoding::{UrlEncodingProcessor, UrlEncodingProcessorBuilder}; +use timestamp::TimestampProcessor; +use urlencoding::UrlEncodingProcessor; use super::error::{ FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu, - ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, UnsupportedProcessorSnafu, + ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, }; use super::field::{Field, Fields}; use crate::etl::error::{Error, Result}; use crate::etl::value::Value; +use crate::etl_error::UnsupportedProcessorSnafu; const FIELD_NAME: &str = "field"; const FIELDS_NAME: &str = "fields"; @@ -67,6 +66,8 @@ const TARGET_FIELDS_NAME: &str = "target_fields"; const JSON_PATH_NAME: &str = "json_path"; const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index"; +pub type IntermediateStatus = BTreeMap; + /// Processor trait defines the interface for all processors. /// /// A processor is a transformation that can be applied to a field in a document @@ -82,19 +83,19 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static { fn ignore_missing(&self) -> bool; /// Execute the processor on a vector which be preprocessed by the pipeline - fn exec_mut(&self, val: &mut BTreeMap) -> Result<()>; + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()>; } #[derive(Debug)] #[enum_dispatch] pub enum ProcessorKind { - Cmcd(CmcdProcessor), - Csv(CsvProcessor), - Dissect(DissectProcessor), + // Cmcd(CmcdProcessor), + // Csv(CsvProcessor), + // Dissect(DissectProcessor), Gsub(GsubProcessor), Join(JoinProcessor), Letter(LetterProcessor), - Regex(RegexProcessor), + // Regex(RegexProcessor), Timestamp(TimestampProcessor), UrlEncoding(UrlEncodingProcessor), Epoch(EpochProcessor), @@ -104,18 +105,6 @@ pub enum ProcessorKind { Digest(DigestProcessor), } -/// ProcessorBuilder trait defines the interface for all processor builders -/// A processor builder is used to create a processor -#[enum_dispatch(ProcessorBuilders)] -pub trait ProcessorBuilder: std::fmt::Debug + Send + Sync + 'static { - /// Get the processor's output keys - fn output_keys(&self) -> HashSet<&str>; - /// Get the processor's input keys - fn input_keys(&self) -> HashSet<&str>; - /// Build the processor - fn build(self, intermediate_keys: &[String]) -> Result; -} - #[derive(Debug, Default)] pub struct Processors { /// A ordered list of processors @@ -166,7 +155,33 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?; - todo!() + let processor = match str_key { + // cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?), + // csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?), + // dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?), + epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?), + date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?), + gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?), + join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?), + letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?), + // regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?), + timestamp::PROCESSOR_TIMESTAMP => { + ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?) + } + urlencoding::PROCESSOR_URL_ENCODING => { + ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?) + } + json_path::PROCESSOR_JSON_PATH => { + ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?) + } + decolorize::PROCESSOR_DECOLORIZE => { + ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?) + } + digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?), + _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), + }; + + Ok(processor) } pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result { diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 086fe8f3d610..944487472691 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -27,7 +27,7 @@ use crate::etl::error::{ FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Field, Fields, InputFieldInfo, OneInputMultiOutputField}; +use crate::etl::field::{Field, Fields, InputField, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, Processor, ProcessorBuilder, ProcessorKind, @@ -35,6 +35,8 @@ use crate::etl::processor::{ }; use crate::etl::value::Value; +use super::IntermediateStatus; + pub(crate) const PROCESSOR_CMCD: &str = "cmcd"; const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps @@ -135,7 +137,7 @@ impl CmcdProcessorBuilder { for field in self.fields.into_iter() { let input_index = find_key_index(intermediate_keys, field.input_field(), "cmcd")?; - let input_field_info = InputFieldInfo::new(field.input_field(), input_index); + let input_field_info = InputField::new(field.input_field(), input_index); let (_, cmcd_field_outputs) = Self::build_cmcd_outputs(&field, intermediate_keys)?; @@ -372,7 +374,7 @@ impl Processor for CmcdProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for (field_index, field) in self.fields.iter().enumerate() { let field_value_index = field.input_index(); match val.get(field_value_index) { diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index c9cb5f847db1..86f39fc89369 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -24,7 +24,7 @@ use crate::etl::error::{ CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; +use crate::etl::field::{Fields, InputField, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, @@ -64,7 +64,7 @@ impl CsvProcessorBuilder { for field in self.fields { let input_index = find_key_index(intermediate_keys, field.input_field(), "csv")?; - let input_field_info = InputFieldInfo::new(field.input_field(), input_index); + let input_field_info = InputField::new(field.input_field(), input_index); let real_field = OneInputMultiOutputField::new(input_field_info, None); real_fields.push(real_field); } diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index fa202a0edff2..e080b795402c 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -14,21 +14,21 @@ use std::sync::Arc; -use ahash::HashSet; use chrono::{DateTime, NaiveDateTime}; use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; +use super::IntermediateStatus; use crate::etl::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu, DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, - ProcessorBuilder, ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::{Timestamp, Value}; @@ -88,55 +88,7 @@ impl std::ops::Deref for Formats { } } -#[derive(Debug, Default)] -pub struct DateProcessorBuilder { - fields: Fields, - formats: Formats, - timezone: Option>, - locale: Option>, - ignore_missing: bool, -} - -impl ProcessorBuilder for DateProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Date) - } -} - -impl DateProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "date", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(DateProcessor { - fields: real_fields, - formats: self.formats, - timezone: self.timezone, - locale: self.locale, - ignore_missing: self.ignore_missing, - }) - } -} - -impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor { type Error = Error; fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { @@ -181,7 +133,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder { } } - let builder = DateProcessorBuilder { + let builder = DateProcessor { fields, formats, timezone, @@ -197,7 +149,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessorBuilder { /// Reserved for compatibility only #[derive(Debug, Default)] pub struct DateProcessor { - fields: Vec, + fields: Fields, formats: Formats, timezone: Option>, locale: Option>, // to support locale @@ -242,20 +194,20 @@ impl Processor for DateProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let timestamp = self.parse(s)?; - let output_index = field.output_index(); - val[output_index] = Value::Timestamp(timestamp); + let output_key = field.target_or_input_field(); + val.insert(output_key.to_string(), Value::Timestamp(timestamp)); } Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind().to_string(), - field: field.input_name().to_string(), + field: field.input_field().to_string(), } .fail(); } diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs index e72bc28a1e66..2547b99d6824 100644 --- a/src/pipeline/src/etl/processor/decolorize.rs +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -18,18 +18,17 @@ //! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes) //! from Vector VRL. -use ahash::HashSet; use once_cell::sync::Lazy; use regex::Regex; use snafu::OptionExt; +use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME, - FIELD_NAME, IGNORE_MISSING_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; @@ -37,52 +36,10 @@ pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize"; static RE: Lazy = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap()); -#[derive(Debug, Default)] -pub struct DecolorizeProcessorBuilder { - fields: Fields, - ignore_missing: bool, -} - -impl ProcessorBuilder for DecolorizeProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Decolorize) - } -} - -impl DecolorizeProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "decolorize", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(DecolorizeProcessor { - fields: real_fields, - ignore_missing: self.ignore_missing, - }) - } -} - /// Remove ANSI color control codes from the input text. #[derive(Debug, Default)] pub struct DecolorizeProcessor { - fields: Vec, + fields: Fields, ignore_missing: bool, } @@ -103,7 +60,7 @@ impl DecolorizeProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -129,7 +86,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessorBuilder { } } - Ok(DecolorizeProcessorBuilder { + Ok(DecolorizeProcessor { fields, ignore_missing, }) @@ -145,23 +102,23 @@ impl crate::etl::processor::Processor for DecolorizeProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } } Some(v) => { let result = self.process(v)?; - let output_index = field.output_index(); - val[output_index] = result; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), result); } } } @@ -176,7 +133,7 @@ mod tests { #[test] fn test_decolorize_processor() { let processor = DecolorizeProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, }; diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs index 29054365ad03..64bb2a2f6d8a 100644 --- a/src/pipeline/src/etl/processor/digest.rs +++ b/src/pipeline/src/etl/processor/digest.rs @@ -21,17 +21,16 @@ use std::borrow::Cow; -use ahash::HashSet; use regex::Regex; use snafu::OptionExt; +use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME, - FIELD_NAME, IGNORE_MISSING_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::Value; use crate::etl_error::DigestPatternInvalidSnafu; @@ -88,54 +87,10 @@ impl PresetPattern { } } -#[derive(Debug, Default)] -pub struct DigestProcessorBuilder { - fields: Fields, - patterns: Vec, - ignore_missing: bool, -} - -impl ProcessorBuilder for DigestProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Digest) - } -} - -impl DigestProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = Vec::with_capacity(self.fields.len()); - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "digest", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(DigestProcessor { - fields: real_fields, - ignore_missing: self.ignore_missing, - patterns: self.patterns, - }) - } -} - /// Computes a digest (hash) of the input string. #[derive(Debug, Default)] pub struct DigestProcessor { - fields: Vec, + fields: Fields, ignore_missing: bool, patterns: Vec, } @@ -169,7 +124,7 @@ impl DigestProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -226,10 +181,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessorBuilder { } for field in fields.iter_mut() { - field.target_field = Some(format!("{}_digest", field.input_field())); + field.set_target_field(Some(format!("{}_digest", field.input_field()))); } - Ok(DigestProcessorBuilder { + Ok(DigestProcessor { fields, patterns, ignore_missing, @@ -246,23 +201,23 @@ impl crate::etl::processor::Processor for DigestProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } } Some(v) => { let result = self.process(v)?; - let output_index = field.output_index(); - val[output_index] = result; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), result); } } } @@ -278,7 +233,7 @@ mod tests { #[test] fn test_digest_processor_ip() { let processor = DigestProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, patterns: vec![PresetPattern::Ip.regex()], }; @@ -306,7 +261,7 @@ mod tests { #[test] fn test_digest_processor_uuid() { let processor = DigestProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, patterns: vec![PresetPattern::Uuid.regex()], }; @@ -339,7 +294,7 @@ mod tests { #[test] fn test_digest_processor_brackets() { let processor = DigestProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, patterns: vec![PresetPattern::Bracketed.regex()], }; @@ -389,7 +344,7 @@ mod tests { #[test] fn test_digest_processor_quotes() { let processor = DigestProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, patterns: vec![PresetPattern::Quoted.regex()], }; @@ -409,7 +364,7 @@ mod tests { #[test] fn test_digest_processor_custom_regex() { let processor = DigestProcessor { - fields: vec![], + fields: Fields::default(), ignore_missing: false, patterns: vec![Regex::new(r"\d+").unwrap()], }; diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index a9ccf5e8735e..13ad9175e7df 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -25,7 +25,7 @@ use crate::etl::error::{ DissectOrderOnlyAppendSnafu, DissectSplitExceedsInputSnafu, DissectSplitNotMatchInputSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; +use crate::etl::field::{Fields, InputField, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_parse_string, yaml_parse_strings, yaml_string, @@ -612,7 +612,7 @@ impl ProcessorBuilder for DissectProcessorBuilder { for field in self.fields.into_iter() { let input_index = find_key_index(intermediate_keys, field.input_field(), "dissect")?; - let input_field_info = InputFieldInfo::new(field.input_field(), input_index); + let input_field_info = InputField::new(field.input_field(), input_index); let real_field = OneInputMultiOutputField::new(input_field_info, field.target_field); real_fields.push(real_field); diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index f2c03fd120de..29ad6bd3d97d 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; use snafu::{OptionExt, ResultExt}; +use super::IntermediateStatus; use crate::etl::error::{ EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, - ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, }; use crate::etl::value::time::{ MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION, @@ -57,56 +57,12 @@ impl TryFrom<&str> for Resolution { } } -#[derive(Debug, Default)] -pub struct EpochProcessorBuilder { - fields: Fields, - resolution: Resolution, - ignore_missing: bool, -} - -impl ProcessorBuilder for EpochProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Epoch) - } -} - -impl EpochProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "epoch", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(EpochProcessor { - fields: real_fields, - resolution: self.resolution, - ignore_missing: self.ignore_missing, - }) - } -} - /// support string, integer, float, time, epoch /// deprecated it should be removed in the future /// Reserved for compatibility only #[derive(Debug, Default)] pub struct EpochProcessor { - fields: Vec, + fields: Fields, resolution: Resolution, ignore_missing: bool, // description @@ -157,7 +113,7 @@ impl EpochProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor { type Error = Error; fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { @@ -188,7 +144,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessorBuilder { _ => {} } } - let builder = EpochProcessorBuilder { + let builder = EpochProcessor { fields, resolution, ignore_missing, @@ -207,23 +163,23 @@ impl Processor for EpochProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } } Some(v) => { let timestamp = self.parse(v)?; - let output_index = field.output_index(); - val[output_index] = Value::Timestamp(timestamp); + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), Value::Timestamp(timestamp)); } } } diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 54c8306ec4de..dbdb9c5c3047 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; use regex::Regex; use snafu::{OptionExt, ResultExt}; +use super::IntermediateStatus; use crate::etl::error::{ Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind, - FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, PATTERN_NAME, }; use crate::etl::value::Value; @@ -31,68 +31,10 @@ pub(crate) const PROCESSOR_GSUB: &str = "gsub"; const REPLACEMENT_NAME: &str = "replacement"; -#[derive(Debug, Default)] -pub struct GsubProcessorBuilder { - fields: Fields, - pattern: Option, - replacement: Option, - ignore_missing: bool, -} - -impl ProcessorBuilder for GsubProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Gsub) - } -} - -impl GsubProcessorBuilder { - fn check(self) -> Result { - if self.pattern.is_none() { - return GsubPatternRequiredSnafu.fail(); - } - - if self.replacement.is_none() { - return GsubReplacementRequiredSnafu.fail(); - } - - Ok(self) - } - - fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "gsub", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(GsubProcessor { - fields: real_fields, - pattern: self.pattern, - replacement: self.replacement, - ignore_missing: self.ignore_missing, - }) - } -} - /// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value #[derive(Debug, Default)] pub struct GsubProcessor { - fields: Vec, + fields: Fields, pattern: Option, replacement: Option, ignore_missing: bool, @@ -136,7 +78,7 @@ impl GsubProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -176,7 +118,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessorBuilder { } } - let builder = GsubProcessorBuilder { + let builder = GsubProcessor { fields, pattern, replacement, @@ -196,23 +138,23 @@ impl crate::etl::processor::Processor for GsubProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } } Some(v) => { let result = self.process(v)?; - let output_index = field.output_index(); - val[output_index] = result; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), result); } } } diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index ddbc086ab8da..6913a5428873 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -12,79 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; use snafu::OptionExt; +use super::IntermediateStatus; use crate::etl::error::{ Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, - ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, SEPARATOR_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, SEPARATOR_NAME, }; use crate::etl::value::{Array, Value}; pub(crate) const PROCESSOR_JOIN: &str = "join"; -#[derive(Debug, Default)] -pub struct JoinProcessorBuilder { - fields: Fields, - separator: Option, - ignore_missing: bool, -} - -impl ProcessorBuilder for JoinProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Join) - } -} - -impl JoinProcessorBuilder { - fn check(self) -> Result { - if self.separator.is_none() { - return JoinSeparatorRequiredSnafu.fail(); - } - - Ok(self) - } - - pub fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "join", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - - Ok(JoinProcessor { - fields: real_fields, - separator: self.separator, - ignore_missing: self.ignore_missing, - }) - } -} - /// A processor to join each element of an array into a single string using a separator string between each element #[derive(Debug, Default)] pub struct JoinProcessor { - fields: Vec, + fields: Fields, separator: Option, ignore_missing: bool, } @@ -110,7 +57,7 @@ impl JoinProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -140,7 +87,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessorBuilder { } } - let builder = JoinProcessorBuilder { + let builder = JoinProcessor { fields, separator, ignore_missing, @@ -158,20 +105,20 @@ impl Processor for JoinProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::Array(arr)) => { let result = self.process(arr)?; - let output_index = field.output_index(); - val[output_index] = result; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), result); } Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } diff --git a/src/pipeline/src/etl/processor/json_path.rs b/src/pipeline/src/etl/processor/json_path.rs index c09d338c637f..c7b4210e83f1 100644 --- a/src/pipeline/src/etl/processor/json_path.rs +++ b/src/pipeline/src/etl/processor/json_path.rs @@ -12,17 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; use jsonpath_rust::JsonPath; use snafu::{OptionExt, ResultExt}; use super::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, IntermediateStatus, Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, }; use crate::etl::error::{Error, Result}; -use crate::etl::field::{Fields, OneInputOneOutputField}; -use crate::etl::processor::ProcessorKind; +use crate::etl::field::Fields; use crate::etl_error::{ JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, @@ -31,54 +29,7 @@ use crate::Value; pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path"; -#[derive(Debug)] -pub struct JsonPathProcessorBuilder { - fields: Fields, - json_path: JsonPath, - ignore_missing: bool, - result_idex: Option, -} - -impl JsonPathProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - JSON_PATH_NAME, - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - - Ok(JsonPathProcessor { - fields: real_fields, - json_path: self.json_path, - ignore_missing: self.ignore_missing, - result_idex: self.result_idex, - }) - } -} - -impl ProcessorBuilder for JsonPathProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::JsonPath) - } -} - -impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result { @@ -117,7 +68,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessorBuilder { } } if let Some(json_path) = json_path { - let processor = JsonPathProcessorBuilder { + let processor = JsonPathProcessor { fields, json_path, ignore_missing, @@ -137,7 +88,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessorBuilder { #[derive(Debug)] pub struct JsonPathProcessor { - fields: Vec, + fields: Fields, json_path: JsonPath, ignore_missing: bool, result_idex: Option, @@ -146,7 +97,7 @@ pub struct JsonPathProcessor { impl Default for JsonPathProcessor { fn default() -> Self { JsonPathProcessor { - fields: vec![], + fields: Fields::default(), json_path: JsonPath::try_from("$").unwrap(), ignore_missing: false, result_idex: None, @@ -179,21 +130,20 @@ impl Processor for JsonPathProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(v) => { let processed = self.process_field(v)?; - - let output_index = field.output_index(); - val[output_index] = processed; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), processed); } None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index 8eb939918104..960521853e48 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; use snafu::OptionExt; +use super::IntermediateStatus; use crate::etl::error::{ Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder, - ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, METHOD_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; @@ -59,55 +59,10 @@ impl std::str::FromStr for Method { } } -#[derive(Debug, Default)] -pub struct LetterProcessorBuilder { - fields: Fields, - method: Method, - ignore_missing: bool, -} - -impl ProcessorBuilder for LetterProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Letter) - } -} - -impl LetterProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "letter", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - - Ok(LetterProcessor { - fields: real_fields, - method: self.method, - ignore_missing: self.ignore_missing, - }) - } -} - /// only support string value #[derive(Debug, Default)] pub struct LetterProcessor { - fields: Vec, + fields: Fields, method: Method, ignore_missing: bool, } @@ -125,7 +80,7 @@ impl LetterProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -154,7 +109,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessorBuilder { } } - Ok(LetterProcessorBuilder { + Ok(LetterProcessor { fields, method, ignore_missing, @@ -171,20 +126,20 @@ impl Processor for LetterProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let result = self.process_field(s)?; - let (_, output_index) = field.output(); - val[*output_index] = result; + let output_key = field.target_or_input_field(); + val.insert(output_key.to_string(), result); } Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index de25195f99ab..a6ffa86d1689 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -28,7 +28,7 @@ use crate::etl::error::{ RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu, Result, }; -use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; +use crate::etl::field::{Fields, InputField, OneInputMultiOutputField}; use crate::etl::find_key_index; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, @@ -173,7 +173,7 @@ impl RegexProcessorBuilder { let mut real_fields = vec![]; for field in self.fields.into_iter() { let input_index = find_key_index(intermediate_keys, field.input_field(), "regex")?; - let input_field_info = InputFieldInfo::new(field.input_field(), input_index); + let input_field_info = InputField::new(field.input_field(), input_index); let input = OneInputMultiOutputField::new(input_field_info, field.target_field); real_fields.push(input); diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index 18b6711c1d80..bf90e78f2165 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -14,22 +14,22 @@ use std::sync::Arc; -use ahash::HashSet; use chrono::{DateTime, NaiveDateTime}; use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; +use super::IntermediateStatus; use crate::etl::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu, DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error, KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, - ProcessorBuilder, ProcessorKind, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, }; use crate::etl::value::time::{ MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION, @@ -114,56 +114,10 @@ impl std::ops::Deref for Formats { } } -#[derive(Debug)] -pub struct TimestampProcessorBuilder { - fields: Fields, - formats: Formats, - resolution: Resolution, - ignore_missing: bool, -} - -impl ProcessorBuilder for TimestampProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys).map(ProcessorKind::Timestamp) - } -} - -impl TimestampProcessorBuilder { - pub fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "timestamp", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(TimestampProcessor { - fields: real_fields, - formats: self.formats, - resolution: self.resolution, - ignore_missing: self.ignore_missing, - }) - } -} - /// support string, integer, float, time, epoch #[derive(Debug, Default)] pub struct TimestampProcessor { - fields: Vec, + fields: Fields, formats: Formats, resolution: Resolution, ignore_missing: bool, @@ -289,7 +243,7 @@ fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result, Tz)>> } } -impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor { type Error = Error; fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { @@ -324,7 +278,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder { } } - let processor_builder = TimestampProcessorBuilder { + let processor_builder = TimestampProcessor { fields, formats, resolution, @@ -344,23 +298,23 @@ impl Processor for TimestampProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> { for field in self.fields.iter() { - let index = field.input().index; + let index = field.input_field(); match val.get(index) { Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } } Some(v) => { let result = self.parse(v)?; - let (_, index) = field.output(); - val[*index] = Value::Timestamp(result); + let output_key = field.target_or_input_field(); + val.insert(output_key.to_string(), Value::Timestamp(result)); } } } @@ -372,18 +326,9 @@ impl Processor for TimestampProcessor { mod tests { use yaml_rust::YamlLoader; - use super::{TimestampProcessor, TimestampProcessorBuilder}; + use super::TimestampProcessor; use crate::etl::value::{Timestamp, Value}; - fn builder_to_native_processor(builder: TimestampProcessorBuilder) -> TimestampProcessor { - TimestampProcessor { - fields: vec![], - formats: builder.formats, - resolution: builder.resolution, - ignore_missing: builder.ignore_missing, - } - } - #[test] fn test_parse_epoch() { let processor_yaml_str = r#"fields: @@ -397,9 +342,7 @@ formats: "#; let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0]; let timestamp_yaml = yaml.as_hash().unwrap(); - let processor = builder_to_native_processor( - TimestampProcessorBuilder::try_from(timestamp_yaml).unwrap(), - ); + let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap(); let values = [ ( @@ -451,9 +394,7 @@ formats: "#; let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0]; let timestamp_yaml = yaml.as_hash().unwrap(); - let processor = builder_to_native_processor( - TimestampProcessorBuilder::try_from(timestamp_yaml).unwrap(), - ); + let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap(); let values: Vec<&str> = vec![ "2014-5-17T12:34:56", diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index ca42aae23677..c14c7d87b11f 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; +use std::collections::BTreeMap; + use snafu::{OptionExt, ResultExt}; use urlencoding::{decode, encode}; @@ -20,10 +21,10 @@ use crate::etl::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu, }; -use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::field::Fields; use crate::etl::processor::{ - yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, ProcessorBuilder, ProcessorKind, - FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, METHOD_NAME, + yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, + IGNORE_MISSING_NAME, METHOD_NAME, }; use crate::etl::value::Value; @@ -57,55 +58,10 @@ impl std::str::FromStr for Method { } } -#[derive(Debug, Default)] -pub struct UrlEncodingProcessorBuilder { - fields: Fields, - method: Method, - ignore_missing: bool, -} - -impl ProcessorBuilder for UrlEncodingProcessorBuilder { - fn output_keys(&self) -> HashSet<&str> { - self.fields - .iter() - .map(|f| f.target_or_input_field()) - .collect() - } - - fn input_keys(&self) -> HashSet<&str> { - self.fields.iter().map(|f| f.input_field()).collect() - } - - fn build(self, intermediate_keys: &[String]) -> Result { - self.build(intermediate_keys) - .map(ProcessorKind::UrlEncoding) - } -} - -impl UrlEncodingProcessorBuilder { - fn build(self, intermediate_keys: &[String]) -> Result { - let mut real_fields = vec![]; - for field in self.fields.into_iter() { - let input = OneInputOneOutputField::build( - "urlencoding", - intermediate_keys, - field.input_field(), - field.target_or_input_field(), - )?; - real_fields.push(input); - } - Ok(UrlEncodingProcessor { - fields: real_fields, - method: self.method, - ignore_missing: self.ignore_missing, - }) - } -} - /// only support string value #[derive(Debug, Default)] pub struct UrlEncodingProcessor { - fields: Vec, + fields: Fields, method: Method, ignore_missing: bool, } @@ -120,7 +76,7 @@ impl UrlEncodingProcessor { } } -impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder { +impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor { type Error = Error; fn try_from(value: &yaml_rust::yaml::Hash) -> Result { @@ -152,7 +108,7 @@ impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessorBuilder { _ => {} } } - let processor = UrlEncodingProcessorBuilder { + let processor = UrlEncodingProcessor { fields, method, ignore_missing, @@ -171,20 +127,20 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { self.ignore_missing } - fn exec_mut(&self, val: &mut Vec) -> Result<()> { + fn exec_mut(&self, val: &mut BTreeMap) -> Result<()> { for field in self.fields.iter() { - let index = field.input_index(); + let index = field.input_field(); match val.get(index) { Some(Value::String(s)) => { let result = self.process_field(s)?; - let output_index = field.output_index(); - val[output_index] = result; + let output_index = field.target_or_input_field(); + val.insert(output_index.to_string(), result); } Some(Value::Null) | None => { if !self.ignore_missing { return ProcessorMissingFieldSnafu { processor: self.kind(), - field: field.input_name(), + field: field.input_field(), } .fail(); } @@ -205,6 +161,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor { #[cfg(test)] mod tests { + use crate::etl::field::Fields; use crate::etl::processor::urlencoding::UrlEncodingProcessor; use crate::etl::value::Value; @@ -220,7 +177,7 @@ mod tests { } { let processor = UrlEncodingProcessor { - fields: vec![], + fields: Fields::default(), method: super::Method::Encode, ignore_missing: false, }; diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 4daa3a4d8cf4..7191d272069c 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -17,11 +17,7 @@ pub mod transformer; use std::collections::BTreeMap; -use snafu::OptionExt; - use crate::etl::error::{Error, Result}; -use crate::etl::find_key_index; -use crate::etl::processor::yaml_string; use crate::etl::transform::index::Index; use crate::etl::value::Value; @@ -32,14 +28,15 @@ const TRANSFORM_INDEX: &str = "index"; const TRANSFORM_DEFAULT: &str = "default"; const TRANSFORM_ON_FAILURE: &str = "on_failure"; +use snafu::OptionExt; pub use transformer::greptime::GreptimeTransformer; use super::error::{ KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu, }; -use super::field::{Fields, InputFieldInfo, OneInputOneOutputField}; -use super::processor::{yaml_new_field, yaml_new_fields}; +use super::field::Fields; +use super::processor::{yaml_new_field, yaml_new_fields, yaml_string}; pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static { type Output; @@ -104,14 +101,43 @@ impl TryFrom<&Vec> for Transforms { type Error = Error; fn try_from(docs: &Vec) -> Result { - todo!() + let mut transforms = Vec::with_capacity(100); + let mut all_output_keys: Vec = Vec::with_capacity(100); + let mut all_required_keys = Vec::with_capacity(100); + for doc in docs { + let transform_builder: Transform = doc + .as_hash() + .context(TransformElementMustBeMapSnafu)? + .try_into()?; + let mut transform_output_keys = transform_builder + .fields + .iter() + .map(|f| f.target_or_input_field().to_string()) + .collect(); + all_output_keys.append(&mut transform_output_keys); + + let mut transform_required_keys = transform_builder + .fields + .iter() + .map(|f| f.input_field().to_string()) + .collect(); + all_required_keys.append(&mut transform_required_keys); + + transforms.push(transform_builder); + } + + all_required_keys.sort(); + + Ok(Transforms { + transforms: transforms, + }) } } /// only field is required #[derive(Debug, Clone)] pub struct Transform { - pub real_fields: Vec, + pub fields: Fields, pub type_: Value, @@ -125,7 +151,7 @@ pub struct Transform { impl Default for Transform { fn default() -> Self { Transform { - real_fields: Vec::new(), + fields: Fields::default(), type_: Value::Null, default: None, index: None, @@ -143,3 +169,78 @@ impl Transform { &self.type_ } } + +impl TryFrom<&yaml_rust::yaml::Hash> for Transform { + type Error = Error; + + fn try_from(hash: &yaml_rust::yaml::Hash) -> Result { + let mut fields = Fields::default(); + let mut type_ = Value::Null; + let mut default = None; + let mut index = None; + let mut on_failure = None; + + for (k, v) in hash { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + match key { + TRANSFORM_FIELD => { + fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?); + } + + TRANSFORM_FIELDS => { + fields = yaml_new_fields(v, TRANSFORM_FIELDS)?; + } + + TRANSFORM_TYPE => { + let t = yaml_string(v, TRANSFORM_TYPE)?; + type_ = Value::parse_str_type(&t)?; + } + + TRANSFORM_INDEX => { + let index_str = yaml_string(v, TRANSFORM_INDEX)?; + index = Some(index_str.try_into()?); + } + + TRANSFORM_DEFAULT => { + default = Some(Value::try_from(v)?); + } + + TRANSFORM_ON_FAILURE => { + let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?; + on_failure = Some(on_failure_str.parse()?); + } + + _ => {} + } + } + let mut final_default = None; + + if let Some(default_value) = default { + match (&type_, &default_value) { + (Value::Null, _) => { + return TransformTypeMustBeSetSnafu { + fields: format!("{:?}", fields), + default: default_value.to_string(), + } + .fail(); + } + (_, Value::Null) => {} // if default is not set, then it will be regarded as default null + (_, _) => { + let target = type_.parse_str_value(default_value.to_str_value().as_str())?; + final_default = Some(target); + } + } + } + let builder = Transform { + fields, + type_, + default: final_default, + index, + on_failure, + }; + + Ok(builder) + } +} diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index f7e59904a313..eeff061f755c 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -22,7 +22,7 @@ use api::helper::proto_value_type; use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType}; -use coerce::coerce_columns; +use coerce::{coerce_columns, coerce_value}; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; use serde_json::{Map, Number, Value as JsonValue}; @@ -33,6 +33,7 @@ use crate::etl::error::{ TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, }; +use crate::etl::processor::IntermediateStatus; use crate::etl::transform::index::Index; use crate::etl::transform::{Transformer, Transforms}; use crate::etl::value::{Timestamp, Value}; @@ -142,9 +143,9 @@ impl Transformer for GreptimeTransformer { for transform in transforms.iter() { let target_fields_set = transform - .real_fields + .fields .iter() - .map(|f| f.output_name()) + .map(|f| f.target_or_input_field()) .collect::>(); let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect(); @@ -157,16 +158,17 @@ impl Transformer for GreptimeTransformer { if let Some(idx) = transform.index { if idx == Index::Time { - match transform.real_fields.len() { + match transform.fields.len() { //Safety unwrap is fine here because we have checked the length of real_fields - 1 => timestamp_columns - .push(transform.real_fields.first().unwrap().input_name()), + 1 => { + timestamp_columns.push(transform.fields.first().unwrap().input_field()) + } _ => { return TransformMultipleTimestampIndexSnafu { columns: transform - .real_fields + .fields .iter() - .map(|x| x.input_name()) + .map(|x| x.input_field()) .join(", "), } .fail(); @@ -195,31 +197,31 @@ impl Transformer for GreptimeTransformer { } } - fn transform_mut(&self, val: &mut BTreeMap) -> Result { - // let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; - // for transform in self.transforms.iter() { - // for field in transform.real_fields.iter() { - // let index = field.input_index(); - // let output_index = field.output_index(); - // match val.get(index) { - // Some(v) => { - // let value_data = coerce_value(v, transform)?; - // // every transform fields has only one output field - // values[output_index] = GreptimeValue { value_data }; - // } - // None => { - // let default = transform.get_default(); - // let value_data = match default { - // Some(default) => coerce_value(default, transform)?, - // None => None, - // }; - // values[output_index] = GreptimeValue { value_data }; - // } - // } - // } - // } - // Ok(Row { values }) - todo!() + fn transform_mut(&self, val: &mut IntermediateStatus) -> Result { + let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()]; + let mut output_index = 0; + for transform in self.transforms.iter() { + for field in transform.fields.iter() { + let index = field.input_field(); + match val.get(index) { + Some(v) => { + let value_data = coerce_value(v, transform)?; + // every transform fields has only one output field + values[output_index] = GreptimeValue { value_data }; + } + None => { + let default = transform.get_default(); + let value_data = match default { + Some(default) => coerce_value(default, transform)?, + None => None, + }; + values[output_index] = GreptimeValue { value_data }; + } + } + output_index += 1; + } + } + Ok(Row { values }) } fn transforms(&self) -> &Transforms { @@ -643,6 +645,7 @@ mod tests { use crate::etl::transform::transformer::greptime::{ flatten_json_object, identity_pipeline_inner, GreptimePipelineParams, }; + use crate::etl::{json_array_to_intermediate_state, json_to_intermediate_state}; use crate::{identity_pipeline, Pipeline}; #[test] @@ -668,7 +671,7 @@ mod tests { "gaga": "gaga" }), ]; - let array = Pipeline::prepare(array).unwrap(); + let array = json_array_to_intermediate_state(array).unwrap(); let rows = identity_pipeline(array, None, &GreptimePipelineParams::default()); assert!(rows.is_err()); assert_eq!( @@ -698,7 +701,7 @@ mod tests { }), ]; let rows = identity_pipeline( - Pipeline::prepare(array).unwrap(), + json_array_to_intermediate_state(array).unwrap(), None, &GreptimePipelineParams::default(), ); @@ -730,7 +733,7 @@ mod tests { }), ]; let rows = identity_pipeline( - Pipeline::prepare(array).unwrap(), + json_array_to_intermediate_state(array).unwrap(), None, &GreptimePipelineParams::default(), ); @@ -764,7 +767,7 @@ mod tests { ]; let tag_column_names = ["name".to_string(), "address".to_string()]; let rows = identity_pipeline_inner( - Pipeline::prepare(array).uwnrap(), + json_array_to_intermediate_state(array).unwrap(), Some(tag_column_names.iter()), &GreptimePipelineParams::default(), ); diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 5f448b386cbd..da345b3bdeb3 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -71,12 +71,11 @@ impl TryFrom for ValueData { } } -// TODO(yuanbohan): add fulltext support in datatype_extension pub(crate) fn coerce_columns(transform: &Transform) -> Result> { let mut columns = Vec::new(); - for field in transform.real_fields.iter() { - let column_name = field.output_name().to_string(); + for field in transform.fields.iter() { + let column_name = field.target_or_input_field().to_string(); let (datatype, datatype_extension) = coerce_type(transform)?; @@ -477,12 +476,14 @@ fn coerce_json_value(v: &Value, transform: &Transform) -> Result