Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: wip #3

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
538 changes: 278 additions & 260 deletions src/pipeline/src/etl.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Yaml parse error."))]
YamlParse {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Prepare value must be an object"))]
PrepareValueMustBeObject {
#[snafu(implicit)]
Expand Down
129 changes: 6 additions & 123 deletions src/pipeline/src/etl/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,133 +19,12 @@ use snafu::OptionExt;

use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu};
use crate::etl::error::{Error, Result};
use crate::etl::find_key_index;

/// Information about the input field including the name and index in intermediate keys.
#[derive(Debug, Default, Clone)]
pub struct InputFieldInfo {
pub(crate) name: String,
pub(crate) index: usize,
}

impl InputFieldInfo {
/// Create a new input field info with the given field name and index.
pub(crate) fn new(field: impl Into<String>, index: usize) -> Self {
InputFieldInfo {
name: field.into(),
index,
}
}
}

/// Information about a field that has one input and one output.
#[derive(Debug, Default, Clone)]
pub struct OneInputOneOutputField {
input: InputFieldInfo,
output: Option<(String, usize)>,
}

impl OneInputOneOutputField {
/// Create a new field with the given input and output.
pub(crate) fn new(input: InputFieldInfo, output: (String, usize)) -> Self {
OneInputOneOutputField {
input,
output: Some(output),
}
}

/// Build a new field with the given processor kind, intermediate keys, input field, and target field.
pub(crate) fn build(
processor_kind: &str,
intermediate_keys: &[String],
input_field: &str,
target_field: &str,
) -> Result<Self> {
let input_index = find_key_index(intermediate_keys, input_field, processor_kind)?;

let input_field_info = InputFieldInfo::new(input_field, input_index);
let output_index = find_key_index(intermediate_keys, target_field, processor_kind)?;
Ok(OneInputOneOutputField::new(
input_field_info,
(target_field.to_string(), output_index),
))
}

/// Get the input field information.
pub(crate) fn input(&self) -> &InputFieldInfo {
&self.input
}

/// Get the index of the input field.
pub(crate) fn input_index(&self) -> usize {
self.input.index
}

/// Get the name of the input field.
pub(crate) fn input_name(&self) -> &str {
&self.input.name
}

/// Get the index of the output field.
pub(crate) fn output_index(&self) -> usize {
*self.output().1
}

/// Get the name of the output field.
pub(crate) fn output_name(&self) -> &str {
self.output().0
}

/// Get the output field information.
pub(crate) fn output(&self) -> (&String, &usize) {
if let Some((name, index)) = &self.output {
(name, index)
} else {
(&self.input.name, &self.input.index)
}
}
}

/// Information about a field that has one input and multiple outputs.
#[derive(Debug, Default, Clone)]
pub struct OneInputMultiOutputField {
input: InputFieldInfo,
/// Typically, processors that output multiple keys need to be distinguished by splicing the keys together.
prefix: Option<String>,
}

impl OneInputMultiOutputField {
/// Create a new field with the given input and prefix.
pub(crate) fn new(input: InputFieldInfo, prefix: Option<String>) -> Self {
OneInputMultiOutputField { input, prefix }
}

/// Get the input field information.
pub(crate) fn input(&self) -> &InputFieldInfo {
&self.input
}

/// Get the index of the input field.
pub(crate) fn input_index(&self) -> usize {
self.input.index
}

/// Get the name of the input field.
pub(crate) fn input_name(&self) -> &str {
&self.input.name
}

/// Get the prefix for the output fields.
pub(crate) fn target_prefix(&self) -> &str {
self.prefix.as_deref().unwrap_or(&self.input.name)
}
}

/// Raw processor-defined inputs and outputs
#[derive(Debug, Default, Clone)]
pub struct Field {
pub(crate) input_field: String,
pub(crate) target_field: Option<String>,
input_field: String,
target_field: Option<String>,
}

impl FromStr for Field {
Expand Down Expand Up @@ -194,6 +73,10 @@ impl Field {
pub(crate) fn target_or_input_field(&self) -> &str {
self.target_field.as_deref().unwrap_or(&self.input_field)
}

pub(crate) fn set_target_field(&mut self, target_field: Option<String>) {
self.target_field = target_field;
}
}

/// A collection of fields.
Expand Down
93 changes: 54 additions & 39 deletions src/pipeline/src/etl/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cmcd;
pub mod csv;
// pub mod cmcd;
// pub mod csv;
pub mod date;
pub mod decolorize;
pub mod digest;
pub mod dissect;
// pub mod dissect;
pub mod epoch;
pub mod gsub;
pub mod join;
pub mod json_path;
pub mod letter;
pub mod regex;
// pub mod regex;
pub mod timestamp;
pub mod urlencoding;

use std::collections::BTreeMap;

use ahash::{HashSet, HashSetExt};
use cmcd::{CmcdProcessor, CmcdProcessorBuilder};
use csv::{CsvProcessor, CsvProcessorBuilder};
use date::{DateProcessor, DateProcessorBuilder};
use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder};
use digest::{DigestProcessor, DigestProcessorBuilder};
use dissect::{DissectProcessor, DissectProcessorBuilder};
// use cmcd::CmcdProcessor;
// use csv::CsvProcessor;
use date::DateProcessor;
use decolorize::DecolorizeProcessor;
use digest::DigestProcessor;
// use dissect::DissectProcessor;
use enum_dispatch::enum_dispatch;
use epoch::{EpochProcessor, EpochProcessorBuilder};
use gsub::{GsubProcessor, GsubProcessorBuilder};
use itertools::Itertools;
use join::{JoinProcessor, JoinProcessorBuilder};
use json_path::{JsonPathProcessor, JsonPathProcessorBuilder};
use letter::{LetterProcessor, LetterProcessorBuilder};
use regex::{RegexProcessor, RegexProcessorBuilder};
use epoch::EpochProcessor;
use gsub::GsubProcessor;
use join::JoinProcessor;
use json_path::JsonPathProcessor;
use letter::LetterProcessor;
// use regex::RegexProcessor;
use snafu::{OptionExt, ResultExt};
use timestamp::{TimestampProcessor, TimestampProcessorBuilder};
use urlencoding::{UrlEncodingProcessor, UrlEncodingProcessorBuilder};
use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;

use super::error::{
FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, UnsupportedProcessorSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
};
use super::field::{Field, Fields};
use crate::etl::error::{Error, Result};
use crate::etl::value::Value;
use crate::etl_error::UnsupportedProcessorSnafu;

const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";
Expand All @@ -67,6 +66,8 @@ const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";

pub type IntermediateStatus = BTreeMap<String, Value>;

/// Processor trait defines the interface for all processors.
///
/// A processor is a transformation that can be applied to a field in a document
Expand All @@ -82,19 +83,19 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
fn ignore_missing(&self) -> bool;

/// Execute the processor on a vector which be preprocessed by the pipeline
fn exec_mut(&self, val: &mut BTreeMap<String, Value>) -> Result<()>;
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()>;
}

#[derive(Debug)]
#[enum_dispatch]
pub enum ProcessorKind {
Cmcd(CmcdProcessor),
Csv(CsvProcessor),
Dissect(DissectProcessor),
// Cmcd(CmcdProcessor),
// Csv(CsvProcessor),
// Dissect(DissectProcessor),
Gsub(GsubProcessor),
Join(JoinProcessor),
Letter(LetterProcessor),
Regex(RegexProcessor),
// Regex(RegexProcessor),
Timestamp(TimestampProcessor),
UrlEncoding(UrlEncodingProcessor),
Epoch(EpochProcessor),
Expand All @@ -104,18 +105,6 @@ pub enum ProcessorKind {
Digest(DigestProcessor),
}

/// ProcessorBuilder trait defines the interface for all processor builders
/// A processor builder is used to create a processor
#[enum_dispatch(ProcessorBuilders)]
pub trait ProcessorBuilder: std::fmt::Debug + Send + Sync + 'static {
/// Get the processor's output keys
fn output_keys(&self) -> HashSet<&str>;
/// Get the processor's input keys
fn input_keys(&self) -> HashSet<&str>;
/// Build the processor
fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind>;
}

#[derive(Debug, Default)]
pub struct Processors {
/// A ordered list of processors
Expand Down Expand Up @@ -166,7 +155,33 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {

let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;

todo!()
let processor = match str_key {
// cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
// csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
// dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
// regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
timestamp::PROCESSOR_TIMESTAMP => {
ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
}
urlencoding::PROCESSOR_URL_ENCODING => {
ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
}
json_path::PROCESSOR_JSON_PATH => {
ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
}
decolorize::PROCESSOR_DECOLORIZE => {
ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
}
digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

Ok(processor)
}

pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
Expand Down
8 changes: 5 additions & 3 deletions src/pipeline/src/etl/processor/cmcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ use crate::etl::error::{
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Field, Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::field::{Field, Fields, InputField, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, Processor, ProcessorBuilder, ProcessorKind,
FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;

use super::IntermediateStatus;

pub(crate) const PROCESSOR_CMCD: &str = "cmcd";

const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps
Expand Down Expand Up @@ -135,7 +137,7 @@ impl CmcdProcessorBuilder {
for field in self.fields.into_iter() {
let input_index = find_key_index(intermediate_keys, field.input_field(), "cmcd")?;

let input_field_info = InputFieldInfo::new(field.input_field(), input_index);
let input_field_info = InputField::new(field.input_field(), input_index);

let (_, cmcd_field_outputs) = Self::build_cmcd_outputs(&field, intermediate_keys)?;

Expand Down Expand Up @@ -372,7 +374,7 @@ impl Processor for CmcdProcessor {
self.ignore_missing
}

fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
fn exec_mut(&self, val: &mut IntermediateStatus) -> Result<()> {
for (field_index, field) in self.fields.iter().enumerate() {
let field_value_index = field.input_index();
match val.get(field_value_index) {
Expand Down
4 changes: 2 additions & 2 deletions src/pipeline/src/etl/processor/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::etl::error::{
CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField};
use crate::etl::field::{Fields, InputField, OneInputMultiOutputField};
use crate::etl::find_key_index;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, ProcessorBuilder,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl CsvProcessorBuilder {
for field in self.fields {
let input_index = find_key_index(intermediate_keys, field.input_field(), "csv")?;

let input_field_info = InputFieldInfo::new(field.input_field(), input_index);
let input_field_info = InputField::new(field.input_field(), input_index);
let real_field = OneInputMultiOutputField::new(input_field_info, None);
real_fields.push(real_field);
}
Expand Down
Loading
Loading