Skip to content

Commit

Permalink
feat(core): introduce the hidden column (#863)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal authored Oct 30, 2024
1 parent 8835456 commit 5641edb
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 11 deletions.
7 changes: 7 additions & 0 deletions wren-core/core/src/mdl/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl ColumnBuilder {
r#type: r#type.to_string(),
relationship: None,
is_calculated: false,
is_hidden: false,
not_null: false,
expression: None,
},
Expand Down Expand Up @@ -171,6 +172,11 @@ impl ColumnBuilder {
self
}

pub fn hidden(mut self, is_hidden: bool) -> Self {
self.column.is_hidden = is_hidden;
self
}

pub fn build(self) -> Arc<Column> {
Arc::new(self.column)
}
Expand Down Expand Up @@ -332,6 +338,7 @@ mod test {
.relationship("test")
.calculated(true)
.not_null(true)
.hidden(true)
.expression("test")
.build();

Expand Down
23 changes: 17 additions & 6 deletions wren-core/core/src/mdl/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,56 @@ use std::sync::Arc;

impl Model {
/// Physical columns are columns that can be selected from the model.
/// All physical columns are visible columns, but not all visible columns are physical columns
/// e.g. columns that are not a relationship column
pub fn get_physical_columns(&self) -> Vec<Arc<Column>> {
self.columns
.iter()
self.get_visible_columns()
.filter(|c| c.relationship.is_none())
.map(Arc::clone)
.map(|c| Arc::clone(&c))
.collect()
}

/// Return the name of the model
pub fn name(&self) -> &str {
&self.name
}

/// Return the iterator of all visible columns
pub fn get_visible_columns(&self) -> impl Iterator<Item = Arc<Column>> + '_ {
self.columns.iter().filter(|f| !f.is_hidden).map(Arc::clone)
}

/// Get the specified visible column by name
pub fn get_column(&self, column_name: &str) -> Option<Arc<Column>> {
self.columns
.iter()
self.get_visible_columns()
.find(|c| c.name == column_name)
.map(Arc::clone)
.map(|c| Arc::clone(&c))
}

/// Return the primary key of the model
pub fn primary_key(&self) -> Option<&str> {
self.primary_key.as_deref()
}
}

impl Column {
/// Return the name of the column
pub fn name(&self) -> &str {
&self.name
}

/// Return the expression of the column
pub fn expression(&self) -> Option<&str> {
self.expression.as_deref()
}

/// Transform the column to a datafusion field
pub fn to_field(&self) -> Field {
let data_type = map_data_type(&self.r#type);
Field::new(&self.name, data_type, self.not_null)
}

/// Transform the column to a datafusion field for a remote table
pub fn to_remote_field(&self, session_state: SessionStateRef) -> Result<Vec<Field>> {
if self.expression().is_some() {
let session_state = session_state.read();
Expand Down
6 changes: 3 additions & 3 deletions wren-core/core/src/mdl/lineage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Lineage {
let mut source_columns_map = HashMap::new();

for model in mdl.manifest.models.iter() {
for column in model.columns.iter() {
for column in model.get_visible_columns() {
if column.is_calculated {
let expr: &String = match column.expression {
Some(ref exp) => exp,
Expand Down Expand Up @@ -79,13 +79,13 @@ impl Lineage {
}
fn collect_required_fields(
mdl: &WrenMDL,
source_colums_map: &HashMap<Column, HashSet<Column>>,
source_columns_map: &HashMap<Column, HashSet<Column>>,
) -> Result<RequiredInfo> {
let mut required_fields_map: HashMap<Column, HashSet<Column>> = HashMap::new();
let mut required_dataset_topo: HashMap<Column, Graph<Dataset, DatasetLink>> =
HashMap::new();
let mut pending_fields = Vec::new();
for (column, source_columns) in source_colums_map.iter() {
for (column, source_columns) in source_columns_map.iter() {
let Some(relation) = column.clone().relation else {
return internal_err!("relation not found: {}", column);
};
Expand Down
2 changes: 2 additions & 0 deletions wren-core/core/src/mdl/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ pub struct Column {
#[serde_as(as = "NoneAsEmptyString")]
#[serde(default)]
pub expression: Option<String>,
#[serde(default, with = "bool_from_int")]
pub is_hidden: bool,
}

#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq)]
Expand Down
64 changes: 62 additions & 2 deletions wren-core/core/src/mdl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl WrenMDL {
pub fn new(manifest: Manifest) -> Self {
let mut qualifed_references = HashMap::new();
manifest.models.iter().for_each(|model| {
model.columns.iter().for_each(|column| {
model.get_visible_columns().for_each(|column| {
qualifed_references.insert(
from_qualified_name_str(
&manifest.catalog,
Expand All @@ -121,7 +121,7 @@ impl WrenMDL {
),
ColumnReference::new(
Dataset::Model(Arc::clone(model)),
Arc::clone(column),
Arc::clone(&column),
),
);
});
Expand Down Expand Up @@ -750,6 +750,66 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_query_hidden_column() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_batch("artist", artist())?;
let manifest = ManifestBuilder::new()
.catalog("wren")
.schema("test")
.model(
ModelBuilder::new("artist")
.table_reference("artist")
.column(ColumnBuilder::new("名字", "string").hidden(true).build())
.column(
ColumnBuilder::new("串接名字", "string")
.expression(r#""名字" || "名字""#)
.build(),
)
.build(),
)
.build();

let analyzed_mdl = Arc::new(AnalyzedWrenMDL::analyze(manifest)?);
let sql = r#"select "串接名字" from wren.test.artist"#;
let actual = transform_sql_with_ctx(
&SessionContext::new(),
Arc::clone(&analyzed_mdl),
&[],
sql,
)
.await?;
assert_eq!(actual,
"SELECT artist.\"串接名字\" FROM (SELECT artist.\"串接名字\" FROM \
(SELECT artist.\"名字\" || artist.\"名字\" AS \"串接名字\" FROM artist) AS artist) AS artist");

let sql = r#"select * from wren.test.artist"#;
let actual = transform_sql_with_ctx(
&SessionContext::new(),
Arc::clone(&analyzed_mdl),
&[],
sql,
)
.await?;
assert_eq!(actual,
"SELECT artist.\"串接名字\" FROM (SELECT artist.\"名字\" || artist.\"名字\" AS \"串接名字\" FROM artist) AS artist");

let sql = r#"select "名字" from wren.test.artist"#;
let _ = transform_sql_with_ctx(
&SessionContext::new(),
Arc::clone(&analyzed_mdl),
&[],
sql,
)
.await.map_err(|e| {
assert_eq!(
e.to_string(),
"Schema error: No field named \"名字\". Valid fields are wren.test.artist.\"串接名字\"."
)
});
Ok(())
}

async fn assert_sql_valid_executable(sql: &str) -> Result<()> {
let ctx = SessionContext::new();
// To roundtrip testing, we should register the mock table for the planned sql.
Expand Down

0 comments on commit 5641edb

Please sign in to comment.