Skip to content

Commit

Permalink
feat(engine): enhance plateau processor with city code extraction and…
Browse files Browse the repository at this point in the history
… metadata handling
  • Loading branch information
miseyu committed Jan 16, 2025
1 parent dd71745 commit 3e7f559
Show file tree
Hide file tree
Showing 17 changed files with 506 additions and 23 deletions.
1 change: 1 addition & 0 deletions engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ parking_lot = "0.12.3"
petgraph = "0.6.5"
pretty_assertions = "1.4.1"
prost = "0.13.4"
quick-xml = "0.37.2"
quick-xml = { version = "0.37.2", features = ["serialize"] }
rand = "0.8.5"
rayon = "1.10.0"
regex = "1.11.1"
Expand Down
1 change: 1 addition & 0 deletions engine/runtime/action-plateau-processor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod mapping;
pub mod plateau3;
pub mod plateau4;
pub(crate) mod types;
1 change: 1 addition & 0 deletions engine/runtime/action-plateau-processor/src/plateau4.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod attribute_flattener;
pub(crate) mod city_code_extractor;
pub(crate) mod errors;
pub(crate) mod mapping;
pub(crate) mod max_lod_extractor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use crate::types::dictionary::Dictionary;

use super::errors::PlateauProcessorError;
use reearth_flow_common::uri::Uri;
use reearth_flow_runtime::{
channels::ProcessorChannelForwarder,
errors::BoxedError,
event::EventHub,
executor_operation::{ExecutorContext, NodeContext},
node::{Port, Processor, ProcessorFactory, DEFAULT_PORT},
};
use reearth_flow_types::{Attribute, AttributeValue};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, str::FromStr};

#[derive(Debug, Clone, Default)]
pub struct CityCodeExtractorFactory;

impl ProcessorFactory for CityCodeExtractorFactory {
fn name(&self) -> &str {
"PLATEAU4.CityCodeExtractor"
}

fn description(&self) -> &str {
"Extracts Codelist"
}

fn parameter_schema(&self) -> Option<schemars::schema::RootSchema> {
Some(schemars::schema_for!(CityCodeExtractorParam))
}

fn categories(&self) -> &[&'static str] {
&["PLATEAU"]
}

fn get_input_ports(&self) -> Vec<Port> {
vec![DEFAULT_PORT.clone()]
}

fn get_output_ports(&self) -> Vec<Port> {
vec![DEFAULT_PORT.clone()]
}

fn build(
&self,
_ctx: NodeContext,
_event_hub: EventHub,
_action: String,
with: Option<HashMap<String, Value>>,
) -> Result<Box<dyn Processor>, BoxedError> {
let params: CityCodeExtractorParam = if let Some(with) = with {
let value: Value = serde_json::to_value(with).map_err(|e| {
PlateauProcessorError::CityCodeExtractorFactory(format!(
"Failed to serialize `with` parameter: {}",
e
))
})?;
serde_json::from_value(value).map_err(|e| {
PlateauProcessorError::CityCodeExtractorFactory(format!(
"Failed to deserialize `with` parameter: {}",
e
))
})?
} else {
return Err(PlateauProcessorError::CityCodeExtractorFactory(
"Missing required parameter `with`".to_string(),
)
.into());
};
let process = CityCodeExtractor {
city_code_attribute: params.city_code_attribute,
codelists_path_attribute: params.codelists_path_attribute,
};
Ok(Box::new(process))
}
}

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub(crate) struct CityCodeExtractorParam {
city_code_attribute: Attribute,
codelists_path_attribute: Attribute,
}

#[derive(Debug, Clone)]
pub(crate) struct CityCodeExtractor {
city_code_attribute: Attribute,
codelists_path_attribute: Attribute,
}

impl Processor for CityCodeExtractor {
fn process(
&mut self,
ctx: ExecutorContext,
fw: &mut dyn ProcessorChannelForwarder,
) -> Result<(), BoxedError> {
let feature = &ctx.feature;
let AttributeValue::String(city_code) = feature
.attributes
.get(&self.city_code_attribute)
.ok_or(PlateauProcessorError::CityCodeExtractor(
"cityCode attribute empty".to_string(),
))
.cloned()?
else {
return Err(PlateauProcessorError::CityCodeExtractor(
"cityCode attribute error".to_string(),
)
.into());
};
let AttributeValue::String(codelists_path) = feature
.attributes
.get(&self.codelists_path_attribute)
.ok_or(PlateauProcessorError::CityCodeExtractor(
"codelists path attribute empty".to_string(),
))
.cloned()?
else {
return Err(PlateauProcessorError::CityCodeExtractor(
"codelists path attribute error".to_string(),
)
.into());
};
let codelists_path = Uri::from_str(&codelists_path)?;
let authorities_path = codelists_path.join("Common_localPublicAuthorities.xml")?;
let storage = ctx.storage_resolver.resolve(&authorities_path)?;
let bytes = storage.get_sync(&authorities_path.as_path())?;
let dic: Dictionary = quick_xml::de::from_str(&String::from_utf8(bytes.to_vec())?)?;
let city_name = dic
.entries
.iter()
.find(|entry| entry.definition.name.value == city_code)
.map(|entry| entry.definition.description.clone());
let mut feature = feature.clone();
if let Some(city_name) = city_name {
feature.insert("cityName", AttributeValue::String(city_name.value));
}
fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone()));
Ok(())
}

fn finish(
&self,
_ctx: NodeContext,
_fw: &mut dyn ProcessorChannelForwarder,
) -> Result<(), BoxedError> {
Ok(())
}

fn name(&self) -> &str {
"CityCodeExtractor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub(super) enum PlateauProcessorError {
MaxLodExtractor(String),
#[error("AttributeFlattener error: {0}")]
AttributeFlattener(String),
#[error("CityCode Extractor Factory error: {0}")]
CityCodeExtractorFactory(String),
#[error("CityCode Extractor error: {0}")]
CityCodeExtractor(String),
}

pub(super) type Result<T, E = PlateauProcessorError> = std::result::Result<T, E>;
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use once_cell::sync::Lazy;
use reearth_flow_runtime::node::{NodeKind, ProcessorFactory};

use super::{
attribute_flattener::AttributeFlattenerFactory, max_lod_extractor::MaxLodExtractorFactory,
udx_folder_extractor::UDXFolderExtractorFactory,
attribute_flattener::AttributeFlattenerFactory, city_code_extractor::CityCodeExtractorFactory,
max_lod_extractor::MaxLodExtractorFactory, udx_folder_extractor::UDXFolderExtractorFactory,
};

pub(crate) static ACTION_FACTORY_MAPPINGS: Lazy<HashMap<String, NodeKind>> = Lazy::new(|| {
let factories: Vec<Box<dyn ProcessorFactory>> = vec![
Box::<UDXFolderExtractorFactory>::default(),
Box::<MaxLodExtractorFactory>::default(),
Box::<AttributeFlattenerFactory>::default(),
Box::<CityCodeExtractorFactory>::default(),
];
factories
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions engine/runtime/action-plateau-processor/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod dictionary;
76 changes: 76 additions & 0 deletions engine/runtime/action-plateau-processor/src/types/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use serde::Deserialize;

#[derive(Deserialize, Debug, PartialEq, Default)]
#[serde(rename = "Dictionary")]
pub(crate) struct Dictionary {
pub(crate) name: Name,
#[serde(rename = "dictionaryEntry")]
pub(crate) entries: Vec<DictionaryEntry>,
}

#[derive(Deserialize, Debug, PartialEq, Default)]
#[serde(rename = "dictionaryEntry")]
pub(crate) struct DictionaryEntry {
#[serde(rename = "Definition")]
pub(crate) definition: Definition,
}

#[derive(Deserialize, Debug, PartialEq, Default, Clone)]
#[serde(default)]
pub(crate) struct Definition {
#[serde(rename = "@id")]
pub(crate) id: String,
pub(crate) description: Description,
pub(crate) name: Name,
}

#[derive(Deserialize, Debug, PartialEq, Default, Clone)]
#[serde(rename = "description")]
pub(crate) struct Description {
#[serde(rename = "$text")]
pub(crate) value: String,
}

#[derive(Deserialize, Debug, PartialEq, Default, Clone)]
#[serde(rename = "name")]
pub(crate) struct Name {
#[serde(rename = "$text")]
pub(crate) value: String,
}

#[cfg(test)]
mod tests {
use super::*;

const TEST_XML: &str = r#"
<?xml version="1.0" encoding="UTF-8"?>
<gml:Dictionary xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:gml="http://www.opengis.net/gml" xsi:schemaLocation="http://www.opengis.net/gml http://schemas.opengis.net/gml/3.1.1/profiles/SimpleDictionary/1.0.0/gmlSimpleDictionaryProfile.xsd" gml:id="cl_9d8ed669-98d2-430a-8e2c-c4158b869749">
<gml:name>Common_localPublicAuthoritiesType</gml:name>
<gml:dictionaryEntry>
<gml:Definition gml:id="id1">
<gml:description>北海道</gml:description>
<gml:name>01</gml:name>
</gml:Definition>
</gml:dictionaryEntry>
<gml:dictionaryEntry>
<gml:Definition gml:id="id2">
<gml:description>北海道札幌市</gml:description>
<gml:name>01100</gml:name>
</gml:Definition>
</gml:dictionaryEntry>
</gml:Dictionary>
"#;

#[test]
fn test_deserialize_dictionary_entry() {
let dic: Dictionary = quick_xml::de::from_str(TEST_XML).unwrap();
assert_eq!(dic.name.value, "Common_localPublicAuthoritiesType");
assert_eq!(dic.entries.len(), 2);
assert_eq!(dic.entries[0].definition.id, "id1");
assert_eq!(dic.entries[0].definition.description.value, "北海道");
assert_eq!(dic.entries[0].definition.name.value, "01");
assert_eq!(dic.entries[1].definition.id, "id2");
assert_eq!(dic.entries[1].definition.description.value, "北海道札幌市");
assert_eq!(dic.entries[1].definition.name.value, "01100");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl Processor for FeatureFilePathExtractor {
fw: &mut dyn ProcessorChannelForwarder,
) -> Result<(), BoxedError> {
let feature = &ctx.feature;
let base_attributes = feature.attributes.clone();
let expr_engine = Arc::clone(&ctx.expr_engine);
let scope = feature.new_scope(expr_engine, &self.with);
let source_dataset = scope
Expand Down Expand Up @@ -184,7 +185,14 @@ impl Processor for FeatureFilePathExtractor {
} else {
extract_zip(bytes, root_output_path, root_output_storage)?
};
for feature in features {
for mut feature in features {
feature.extend(
base_attributes
.iter()
.filter(|(k, _)| !feature.contains_key(k))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<_, _>>(),
);
fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone()));
}
} else if source_dataset.is_dir() {
Expand Down
18 changes: 12 additions & 6 deletions engine/runtime/action-processor/src/feature/reader/citygml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ fn parse_tree_reader<R: BufRead>(
let attributes = entity.root.to_attribute_json();
let gml_id = entity.root.id();
let name = entity.root.typename();
let lod = LodMask::find_lods_by_citygml_value(&entity.root);
let metadata = Metadata {
feature_id: gml_id.map(|id| id.to_string()),
feature_type: name.map(|name| name.to_string()),
lod: Some(lod),
};
let mut attributes = HashMap::<Attribute, AttributeValue>::from([
(Attribute::new("cityGmlAttributes"), attributes.into()),
(
Expand All @@ -175,13 +181,13 @@ fn parse_tree_reader<R: BufRead>(
AttributeValue::String(format!("root_{}", to_hash(base_url.as_str()))),
),
]);
if let Some(max_lod) = lod.highest_lod() {
attributes.insert(
Attribute::new("maxLod"),
AttributeValue::String(max_lod.to_string()),
);
}
attributes.extend(base_attributes.clone());
let lod = LodMask::find_lods_by_citygml_value(&entity.root);
let metadata = Metadata {
feature_id: gml_id.map(|id| id.to_string()),
feature_type: name.map(|name| name.to_string()),
lod: Some(lod),
};
let entities = if flatten {
FlattenTreeTransform::transform(entity)
} else {
Expand Down
5 changes: 5 additions & 0 deletions engine/runtime/common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ mod tests {
Uri::for_test("file:///foo/hoge/").join("../bar").unwrap(),
"file:///foo/bar"
);
assert_eq!(
Uri::for_test("file:///foo/hoge/fuga/").join("..").unwrap().join("..").unwrap(),
"file:///foo"
);

assert_eq!(
Uri::for_test("ram://foo").join("bar").unwrap(),
"ram://foo/bar"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod helper;

fn main() {
helper::execute("data-convert/plateau4/06-area-urf/workflow.yml");
helper::execute("data-convert/plateau4/02-tran-rwy-trk-squr-wwy/workflow.yml");
}
Loading

0 comments on commit 3e7f559

Please sign in to comment.