Skip to content

Commit

Permalink
Added fill_null and join. Improved converting non-boolean express…
Browse files Browse the repository at this point in the history
…ions into boolean ones where necessary. Minor syntax expansion. Brought query success rate up to 74% and command success rate up to 92%.
  • Loading branch information
scnerd committed Oct 4, 2024
1 parent 9226b61 commit dc6504d
Show file tree
Hide file tree
Showing 29 changed files with 481 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
- repo: https://github.com/doublify/pre-commit-rust
rev: v1.0
hooks:
- id: fmt
- id: cargo-check
- id: clippy
args: [ --fix, --allow-dirty, --allow-staged ]
- id: fmt
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pyo3-built = "*"
float-derive = "*"
anyhow = "*"
log = "*"
regex = "*"
regex-syntax = "*"

inventory = "*"
Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,23 @@ Support status can be one of the following:
|-----------------------|---------|--------|
| **High Priority** | | |
| `bin` (`bucket`) | Partial | Yes |
| `convert` | Partial | Yes |
| `convert` | Yes | Yes |
| `dedup` | Parser | Yes |
| `eval` | Partial | Yes |
| `eventstats` | Partial | Yes |
| `fields` | Partial | Yes |
| `fillnull` | Parser | Yes |
| `fields` | Yes | Yes |
| `fillnull` | Partial | Yes |
| `head` | Partial | Yes |
| `inputlookup` | Parser | Yes |
| `iplocation` | None | Yes |
| `join` | Parser | Yes |
| `lookup` | Parser | Yes |
| `join` | Partial | Yes |
| `lookup` | Partial | Yes |
| `mstats` | None | Yes |
| `multisearch` | Partial | Yes |
| `mvexpand` | Parser | Yes |
| `outputlookup` | None | Yes |
| `rare` | Partial | Yes |
| `regex` | Partial | Yes |
| `rare` | Yes | Yes |
| `regex` | Yes | Yes |
| `rename` | Partial | Yes |
| `rex` | Partial | Yes |
| `search` | Partial | Yes |
Expand All @@ -142,7 +142,7 @@ Support status can be one of the following:
| `streamstats` | Parser | Yes |
| `table` | Partial | Yes |
| `tail` | Yes | Yes |
| `top` | Partial | Yes |
| `top` | Yes | Yes |
| `tstats` | Partial | Yes |
| `where` | Partial | Yes |
| **Planned/Supported** | | |
Expand Down Expand Up @@ -428,10 +428,11 @@ package, some may be provided by Databricks Sirens.
- [x] Support macro syntax (separate pre-processing function?)
- [ ] Incorporate [standard macros that come with CIM](https://docs.splunk.com/Documentation/CIM/5.3.2/User/UsetheCIMFiltersmacrostoexcludedata)
- [ ] Support custom Python UDFs (in `spl_transpiler` for now)
- [ ] Use sample queries to create prioritized list of remaining commands
- [x] Use sample queries to create prioritized list of remaining commands
- [ ] Support re-using intermediate results (saving off as tables or variables, `.cache()`)
- [ ] Support Scala UDFs
- [ ] Support SQL output
- [ ] Support `{}` and `@` in field names

# Acknowledgements

Expand Down
3 changes: 2 additions & 1 deletion src/commands/cmd_add_totals/pyspark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ impl PipelineTransformer for super::spl::AddTotalsCommand {
})
.collect();

let total: ColumnLike = join_as_binaries("+", cast_columns, column_like!(lit(0.0)));
let total: ColumnLike =
join_as_binaries("+", cast_columns).unwrap_or(column_like!(lit(0.0)));

Ok(PipelineTransformState {
df: state.df.with_column(self.field_name.clone(), total),
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_collect/spl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::spl::{SplCommand, SplCommandOptions};
use crate::spl::ast::{Field, ParsedCommandOptions};
use crate::spl::parser::field_list;
use crate::spl::parser::field_list0;
use crate::spl::python::impl_pyclass;
use anyhow::anyhow;
use nom::combinator::map;
Expand Down Expand Up @@ -117,7 +117,7 @@ impl SplCommand<CollectCommand> for CollectParser {

fn parse_body(input: &str) -> IResult<&str, CollectCommand> {
map(
pair(Self::Options::match_options, field_list),
pair(Self::Options::match_options, field_list0),
|(
CollectCommandOptions {
index,
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_event_stats/spl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::spl::{SplCommand, SplCommandOptions};
use crate::spl::ast::{Expr, Field, ParsedCommandOptions};
use crate::spl::parser::{field_list, stats_call, ws};
use crate::spl::parser::{field_list0, stats_call, ws};
use crate::spl::python::impl_pyclass;
use nom::bytes::complete::tag_no_case;
use nom::combinator::{map, opt};
Expand Down Expand Up @@ -57,7 +57,7 @@ impl SplCommand<EventStatsCommand> for EventStatsParser {
tuple((
Self::Options::match_options,
ws(stats_call),
opt(preceded(ws(tag_no_case("by")), field_list)),
opt(preceded(ws(tag_no_case("by")), field_list0)),
)),
|(options, funcs, by)| EventStatsCommand {
all_num: options.all_num,
Expand Down
7 changes: 2 additions & 5 deletions src/commands/cmd_fields/spl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::spl::{SplCommand, SplCommandOptions};
use crate::spl::ast::{Field, ParsedCommandOptions};
use crate::spl::parser::{comma_separated_list1, field, ws};
use crate::spl::parser::{field_list1, ws};
use crate::spl::python::impl_pyclass;
use nom::branch::alt;
use nom::bytes::complete::tag;
Expand Down Expand Up @@ -52,10 +52,7 @@ impl SplCommand<FieldsCommand> for FieldsParser {

fn parse_body(input: &str) -> IResult<&str, FieldsCommand> {
map(
tuple((
opt(ws(alt((tag("+"), tag("-"))))),
comma_separated_list1(field),
)),
tuple((opt(ws(alt((tag("+"), tag("-"))))), field_list1)),
|(remove_fields_opt, fields)| FieldsCommand {
remove_fields: remove_fields_opt.unwrap_or("+") == "-",
fields,
Expand Down
20 changes: 18 additions & 2 deletions src/commands/cmd_fill_null/pyspark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
use crate::commands::cmd_fill_null::spl::FillNullCommand;
use crate::pyspark::ast::*;
use crate::pyspark::transpiler::{PipelineTransformState, PipelineTransformer};
use anyhow::bail;

impl PipelineTransformer for FillNullCommand {
#[allow(unused_variables, unreachable_code)]
fn transform(&self, state: PipelineTransformState) -> anyhow::Result<PipelineTransformState> {
let df = state.df;

bail!("UNIMPLEMENTED");
let df = df.arbitrary_method(
"fillna",
vec![column_like!(py_lit(self.value.clone())).into()],
);

Ok(PipelineTransformState { df })
}
}

#[cfg(test)]
mod tests {
use crate::pyspark::utils::test::generates;

#[test]
fn test_fill_null_1() {
generates(
r#"fillnull value=NULL"#,
r#"spark.table('main').fillna("NULL")"#,
)
}
}
16 changes: 10 additions & 6 deletions src/commands/cmd_fill_null/spl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ use pyo3::prelude::*;
// def fillNull[_: P]: P[FillNullCommand] = ("fillnull" ~ ("value=" ~~ (doubleQuoted|token)).?
// ~ field.rep(1).?) map FillNullCommand.tupled

const DEFAULT_VALUE: &str = "0";

#[derive(Debug, PartialEq, Clone, Hash)]
#[pyclass(frozen, eq, hash)]
pub struct FillNullCommand {
#[pyo3(get)]
pub value: Option<String>,
pub value: String,
#[pyo3(get)]
pub fields: Option<Vec<Field>>,
}
impl_pyclass!(FillNullCommand {
value: Option<String>,
value: String,
fields: Option<Vec<Field>>
});

Expand Down Expand Up @@ -51,7 +53,9 @@ impl SplCommand<FillNullCommand> for FillNullParser {
opt(many1(into(ws(field)))),
)),
|(maybe_value, fields)| FillNullCommand {
value: maybe_value.map(|v| v.to_string()),
value: maybe_value
.map(|v| v.to_string())
.unwrap_or(DEFAULT_VALUE.into()),
fields,
},
)(input)
Expand All @@ -74,7 +78,7 @@ mod tests {
Ok((
"",
FillNullCommand {
value: None,
value: DEFAULT_VALUE.into(),
fields: None,
}
))
Expand All @@ -92,7 +96,7 @@ mod tests {
Ok((
"",
FillNullCommand {
value: Some("NA".into()),
value: "NA".into(),
fields: None,
}
))
Expand All @@ -115,7 +119,7 @@ mod tests {
Ok((
"",
FillNullCommand {
value: Some("NULL".into()),
value: "NULL".into(),
fields: Some(vec![ast::Field::from("host"), ast::Field::from("port"),]),
}
))
Expand Down
69 changes: 66 additions & 3 deletions src/commands/cmd_join/pyspark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
use crate::commands::cmd_join::spl::JoinCommand;
use crate::pyspark::ast::*;
use crate::pyspark::transpiler::utils::join_as_binaries;
use crate::pyspark::transpiler::{PipelineTransformState, PipelineTransformer};
use anyhow::bail;
use crate::spl::ast;
use anyhow::{anyhow, bail, ensure};

impl PipelineTransformer for JoinCommand {
#[allow(unused_variables, unreachable_code)]
fn transform(&self, state: PipelineTransformState) -> anyhow::Result<PipelineTransformState> {
let df = state.df;
let df = state.df.alias("LEFT");

bail!("UNIMPLEMENTED");
ensure!(
self.max == 1,
"UNIMPLEMENTED: Join with max != 1 not yet supported"
);

let right_df: TransformedPipeline = self.sub_search.clone().try_into()?;
let right_df = right_df
.dataframes
.first()
.ok_or(anyhow!("No dataframe found for sub_search"))?
.alias("RIGHT");

let join_type = match self.join_type.clone().as_str() {
"inner" => "inner",
"left" => "left",
"outer" => "outer",
_ => bail!("Unsupported join type: {}", self.join_type),
};

let condition = join_as_binaries(
"&",
self.fields
.clone()
.into_iter()
.map(|ast::Field(name)| {
column_like!(
[col(format!("LEFT.{}", name))] == [col(format!("RIGHT.{}", name))]
)
})
.collect(),
)
.unwrap();

let condition = match (self.use_time, self.earlier) {
(true, true) => {
column_like!([condition] & [[col("LEFT._time")] >= [col("RIGHT._time")]])
}
(true, false) => {
column_like!([condition] & [[col("LEFT._time")] <= [col("RIGHT._time")]])
}
(false, _) => condition,
};

let df = df.join(right_df, condition, join_type);

Ok(PipelineTransformState { df })
}
}

#[cfg(test)]
mod tests {
use crate::pyspark::utils::test::*;

#[test]
fn test_join_1() {
generates(
r#"join product_id [search vendors]"#,
r#"spark.table('main').alias("LEFT").join(
spark.table('main').where(F.col("_raw").ilike("%vendors%")).alias("RIGHT"),
(F.col("LEFT.product_id") == F.col("RIGHT.product_id")),
"inner"
)"#,
)
}
}
Loading

0 comments on commit dc6504d

Please sign in to comment.