Skip to content

Commit

Permalink
refactor(engine): update parameter names and enhance CityGML processing
Browse files Browse the repository at this point in the history
  • Loading branch information
miseyu committed Oct 22, 2024
1 parent adffad5 commit d70301e
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 92 deletions.
12 changes: 12 additions & 0 deletions engine/docs/mdbook/src/action.md
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,12 @@ Filters features based on conditions
"dataset": {
"$ref": "#/definitions/Expr"
},
"flatten": {
"type": [
"boolean",
"null"
]
},
"format": {
"type": "string",
"enum": [
Expand Down Expand Up @@ -1332,6 +1338,12 @@ Reads features from a file
"dataset": {
"$ref": "#/definitions/Expr"
},
"flatten": {
"type": [
"boolean",
"null"
]
},
"format": {
"type": "string",
"enum": [
Expand Down
89 changes: 51 additions & 38 deletions engine/runtime/action-processor/src/feature/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,47 +75,53 @@ impl ProcessorFactory for FeatureReaderFactory {

let expr_engine = Arc::clone(&ctx.expr_engine);
match params {
FeatureReaderParam::CityGML { common_property } => {
let common_property = CompiledCommonPropertySchema {
FeatureReaderParam::CityGML {
common_param,
param,
} => {
let common_param = CompiledCommonReaderParam {
expr: expr_engine
.compile(common_property.dataset.as_ref())
.compile(common_param.dataset.as_ref())
.map_err(|e| FeatureProcessorError::FilterFactory(format!("{:?}", e)))?,
};
let process = FeatureReader {
params: CompiledFeatureReaderParam::CityGML { common_property },
params: CompiledFeatureReaderParam::CityGML {
common_param,
param,
},
};
Ok(Box::new(process))
}
FeatureReaderParam::Csv {
common_property,
property,
common_param,
param,
} => {
let common_property = CompiledCommonPropertySchema {
let common_param = CompiledCommonReaderParam {
expr: expr_engine
.compile(common_property.dataset.as_ref())
.compile(common_param.dataset.as_ref())
.map_err(|e| FeatureProcessorError::FilterFactory(format!("{:?}", e)))?,
};
let process = FeatureReader {
params: CompiledFeatureReaderParam::Csv {
common_property,
property,
common_param,
param,
},
};
Ok(Box::new(process))
}
FeatureReaderParam::Tsv {
common_property,
property,
common_param,
param,
} => {
let common_property = CompiledCommonPropertySchema {
let common_param = CompiledCommonReaderParam {
expr: expr_engine
.compile(common_property.dataset.as_ref())
.compile(common_param.dataset.as_ref())
.map_err(|e| FeatureProcessorError::FilterFactory(format!("{:?}", e)))?,
};
let process = FeatureReader {
params: CompiledFeatureReaderParam::Tsv {
common_property,
property,
common_param,
param,
},
};
Ok(Box::new(process))
Expand All @@ -131,7 +137,7 @@ pub struct FeatureReader {

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CommonPropertySchema {
pub struct CommonReaderParam {
pub(super) dataset: Expr,
}

Expand All @@ -141,41 +147,44 @@ pub enum FeatureReaderParam {
#[serde(rename = "citygml")]
CityGML {
#[serde(flatten)]
common_property: CommonPropertySchema,
common_param: CommonReaderParam,
#[serde(flatten)]
param: citygml::CityGmlReaderParam,
},
#[serde(rename = "csv")]
Csv {
#[serde(flatten)]
common_property: CommonPropertySchema,
common_param: CommonReaderParam,
#[serde(flatten)]
property: csv::CsvPropertySchema,
param: csv::CsvReaderParam,
},
#[serde(rename = "tsv")]
Tsv {
#[serde(flatten)]
common_property: CommonPropertySchema,
common_param: CommonReaderParam,
#[serde(flatten)]
property: csv::CsvPropertySchema,
param: csv::CsvReaderParam,
},
}

#[derive(Debug, Clone)]
enum CompiledFeatureReaderParam {
CityGML {
common_property: CompiledCommonPropertySchema,
common_param: CompiledCommonReaderParam,
param: citygml::CityGmlReaderParam,
},
Csv {
common_property: CompiledCommonPropertySchema,
property: csv::CsvPropertySchema,
common_param: CompiledCommonReaderParam,
param: csv::CsvReaderParam,
},
Tsv {
common_property: CompiledCommonPropertySchema,
property: csv::CsvPropertySchema,
common_param: CompiledCommonReaderParam,
param: csv::CsvReaderParam,
},
}

#[derive(Debug, Clone)]
struct CompiledCommonPropertySchema {
struct CompiledCommonReaderParam {
expr: rhai::AST,
}

Expand All @@ -191,32 +200,36 @@ impl Processor for FeatureReader {
) -> Result<(), BoxedError> {
match self {
FeatureReader {
params: CompiledFeatureReaderParam::CityGML { common_property },
} => citygml::read_citygml(common_property, ctx, fw).map_err(|e| e.into()),
params:
CompiledFeatureReaderParam::CityGML {
common_param,
param,
},
} => citygml::read_citygml(common_param, param, ctx, fw).map_err(|e| e.into()),
FeatureReader {
params:
CompiledFeatureReaderParam::Csv {
common_property,
property,
common_param,
param,
},
} => csv::read_csv(
reearth_flow_common::csv::Delimiter::Comma,
common_property,
property,
common_param,
param,
ctx,
fw,
)
.map_err(|e| e.into()),
FeatureReader {
params:
CompiledFeatureReaderParam::Tsv {
common_property,
property,
common_param,
param,
},
} => csv::read_csv(
reearth_flow_common::csv::Delimiter::Tab,
common_property,
property,
common_param,
param,
ctx,
fw,
)
Expand Down
77 changes: 53 additions & 24 deletions engine/runtime/action-processor/src/feature/reader/citygml.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
use std::{
collections::HashMap,
io::{BufRead, BufReader, Cursor},
sync::{Arc, RwLock},
};

use nusamai_citygml::{CityGmlElement, CityGmlReader, Envelope, ParseError, SubTreeReader};
use nusamai_plateau::{appearance::AppearanceStore, models, Entity, GeometricMergedownTransform};
use nusamai_plateau::{
appearance::AppearanceStore, models, Entity, FlattenTreeTransform, GeometricMergedownTransform,
};
use quick_xml::NsReader;
use reearth_flow_common::str::to_hash;
use reearth_flow_common::uri::Uri;
use reearth_flow_runtime::{
channels::ProcessorChannelForwarder, executor_operation::ExecutorContext, node::DEFAULT_PORT,
};
use reearth_flow_types::{geometry::Geometry, Attribute, AttributeValue, Feature};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use url::Url;

use super::CompiledCommonPropertySchema;
use super::CompiledCommonReaderParam;

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CityGmlReaderParam {
pub(super) flatten: Option<bool>,
}

pub(crate) fn read_citygml(
params: &CompiledCommonPropertySchema,
params: &CompiledCommonReaderParam,
citygml_params: &CityGmlReaderParam,
ctx: ExecutorContext,
fw: &mut dyn ProcessorChannelForwarder,
) -> Result<(), super::errors::FeatureProcessorError> {
Expand Down Expand Up @@ -49,13 +61,20 @@ pub(crate) fn read_citygml(
let mut st = citygml_reader
.start_root(&mut xml_reader)
.map_err(|e| super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e)))?;
parse_tree_reader(&mut st, base_url, ctx, fw)
.map_err(|e| super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e)))?;
parse_tree_reader(
&mut st,
citygml_params.flatten.unwrap_or(false),
base_url,
ctx,
fw,
)
.map_err(|e| super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e)))?;
Ok(())
}

fn parse_tree_reader<R: BufRead>(
st: &mut SubTreeReader<'_, '_, R>,
flatten: bool,
base_url: Url,
ctx: ExecutorContext,
fw: &mut dyn ProcessorChannelForwarder,
Expand Down Expand Up @@ -126,10 +145,8 @@ fn parse_tree_reader<R: BufRead>(
(v[0], v[1], v[2]) = (v[1], v[0], v[2]);
});
}
transformer.transform(&mut entity);

let attributes = entity.root.to_attribute_json();

let gml_id = entity
.root
.id()
Expand All @@ -140,23 +157,35 @@ fn parse_tree_reader<R: BufRead>(
.typename()
.map(|name| AttributeValue::String(name.to_string()))
.unwrap_or(AttributeValue::Null);

let geometry: Geometry = entity.try_into().map_err(|e| {
super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e))
})?;
let mut feature = Feature::new_with_attributes(ctx.feature.attributes.clone());
feature
.attributes
.insert(Attribute::new("cityGmlAttributes"), attributes.into());
feature.attributes.insert(Attribute::new("gmlName"), name);
feature.attributes.insert(Attribute::new("gmlId"), gml_id);

feature.attributes.insert(
Attribute::new("gmlRootId"),
AttributeValue::String(format!("root_{}", to_hash(base_url.as_str()))),
);
feature.geometry = Some(geometry);
fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone()));
let mut attributes = HashMap::<Attribute, AttributeValue>::from([
(Attribute::new("cityGmlAttributes"), attributes.into()),
(Attribute::new("gmlName"), name),
(Attribute::new("gmlId"), gml_id),
(
Attribute::new("gmlRootId"),
AttributeValue::String(format!("root_{}", to_hash(base_url.as_str()))),
),
]);
attributes.extend(ctx.feature.attributes.clone());
if flatten {
for mut child in FlattenTreeTransform::transform(entity) {
transformer.transform(&mut child);
let geometry: Geometry = child.try_into().map_err(|e| {
super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e))
})?;
let mut feature: Feature = geometry.into();
feature.extend(attributes.clone());
fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone()));
}
} else {
transformer.transform(&mut entity);
let geometry: Geometry = entity.try_into().map_err(|e| {
super::errors::FeatureProcessorError::FileCityGmlReader(format!("{:?}", e))
})?;
let mut feature: Feature = geometry.into();
feature.extend(attributes.clone());
fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone()));
}
}
Ok(())
}
8 changes: 4 additions & 4 deletions engine/runtime/action-processor/src/feature/reader/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use reearth_flow_types::{Attribute, AttributeValue};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use super::CompiledCommonPropertySchema;
use super::CompiledCommonReaderParam;

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CsvPropertySchema {
pub struct CsvReaderParam {
pub(super) offset: Option<usize>,
}

pub(crate) fn read_csv(
delimiter: Delimiter,
params: &CompiledCommonPropertySchema,
csv_params: &CsvPropertySchema,
params: &CompiledCommonReaderParam,
csv_params: &CsvReaderParam,
ctx: ExecutorContext,
fw: &mut dyn ProcessorChannelForwarder,
) -> Result<(), super::errors::FeatureProcessorError> {
Expand Down
Loading

0 comments on commit d70301e

Please sign in to comment.