Skip to content

Commit

Permalink
feat(engine): update example workflow path and enhance error messages… (
Browse files Browse the repository at this point in the history
  • Loading branch information
miseyu authored Dec 5, 2024
1 parent 6f877fd commit 15cf563
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 35 deletions.
23 changes: 21 additions & 2 deletions engine/docs/mdbook/src/action.md
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,9 @@ Sorts features by attributes
"Attribute": {
"type": "string"
},
"Expr": {
"type": "string"
},
"Order": {
"type": "string",
"enum": [
Expand All @@ -1085,12 +1088,28 @@ Sorts features by attributes
"SortBy": {
"type": "object",
"required": [
"attribute",
"order"
],
"properties": {
"attribute": {
"$ref": "#/definitions/Attribute"
"anyOf": [
{
"$ref": "#/definitions/Attribute"
},
{
"type": "null"
}
]
},
"attributeValue": {
"anyOf": [
{
"$ref": "#/definitions/Expr"
},
{
"type": "null"
}
]
},
"order": {
"$ref": "#/definitions/Order"
Expand Down
83 changes: 71 additions & 12 deletions engine/runtime/action-processor/src/feature/sorter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::Ordering, collections::HashMap};
use std::{cmp::Ordering, collections::HashMap, sync::Arc};

use reearth_flow_runtime::{
channels::ProcessorChannelForwarder,
Expand All @@ -7,7 +7,7 @@ use reearth_flow_runtime::{
executor_operation::{ExecutorContext, NodeContext},
node::{Port, Processor, ProcessorFactory, DEFAULT_PORT},
};
use reearth_flow_types::{Attribute, Feature};
use reearth_flow_types::{Attribute, AttributeValue, Expr, Feature};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -44,12 +44,12 @@ impl ProcessorFactory for FeatureSorterFactory {

fn build(
&self,
_ctx: NodeContext,
ctx: NodeContext,
_event_hub: EventHub,
_action: String,
with: Option<HashMap<String, Value>>,
) -> Result<Box<dyn Processor>, BoxedError> {
let params: FeatureSorterParam = if let Some(with) = with {
let params: FeatureSorterParam = if let Some(with) = with.clone() {
let value: Value = serde_json::to_value(with).map_err(|e| {
FeatureProcessorError::SorterFactory(format!(
"Failed to serialize `with` parameter: {}",
Expand All @@ -68,8 +68,35 @@ impl ProcessorFactory for FeatureSorterFactory {
)
.into());
};

let expr_engine = Arc::clone(&ctx.expr_engine);
let mut sort_by = Vec::new();
for sort in &params.sort_by {
let attribute_value = if let Some(attribute_value) = &sort.attribute_value {
Some(
expr_engine
.compile(attribute_value.as_ref())
.map_err(|e| FeatureProcessorError::FilterFactory(format!("{:?}", e)))?,
)
} else {
if sort.attribute.is_none() {
return Err(FeatureProcessorError::FilterFactory(
"Either `attribute` or `attributeValue` is required".to_string(),
)
.into());
}
None
};
sort_by.push(CompiledSortBy {
attribute: sort.attribute.clone(),
attribute_value,
order: sort.order.clone(),
});
}

let process = FeatureSorter {
params,
global_params: with,
params: FeatureSorterCompiledParam { sort_by },
buffer: vec![],
};
Ok(Box::new(process))
Expand All @@ -78,7 +105,8 @@ impl ProcessorFactory for FeatureSorterFactory {

#[derive(Debug, Clone)]
pub struct FeatureSorter {
params: FeatureSorterParam,
global_params: Option<HashMap<String, serde_json::Value>>,
params: FeatureSorterCompiledParam,
buffer: Vec<Feature>,
}

Expand All @@ -88,10 +116,23 @@ pub struct FeatureSorterParam {
sort_by: Vec<SortBy>,
}

#[derive(Debug, Clone)]
pub struct FeatureSorterCompiledParam {
sort_by: Vec<CompiledSortBy>,
}

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
struct SortBy {
attribute: Attribute,
attribute: Option<Attribute>,
attribute_value: Option<Expr>,
order: Order,
}

#[derive(Debug, Clone)]
struct CompiledSortBy {
attribute: Option<Attribute>,
attribute_value: Option<rhai::AST>,
order: Order,
}

Expand Down Expand Up @@ -126,16 +167,34 @@ impl Processor for FeatureSorter {
.sort_by
.iter()
.map(|sort_by| {
let attribute = &sort_by.attribute;
let (a, b) = if let Some(attribute) = &sort_by.attribute {
(
a.attributes.get(attribute).cloned(),
b.attributes.get(attribute).cloned(),
)
} else if let Some(attribute_value) = &sort_by.attribute_value {
let a_scope = a.new_scope(ctx.expr_engine.clone(), &self.global_params);
let b_scope = b.new_scope(ctx.expr_engine.clone(), &self.global_params);
(
a_scope
.eval_ast::<String>(attribute_value)
.map(AttributeValue::String)
.ok(),
b_scope
.eval_ast::<String>(attribute_value)
.map(AttributeValue::String)
.ok(),
)
} else {
(None, None)
};
let order = &sort_by.order;
let a = a.attributes.get(attribute);
let b = b.attributes.get(attribute);
match (a, b) {
(Some(a), Some(b)) => {
if *order == Order::Asc {
a.partial_cmp(b)
a.partial_cmp(&b)
} else {
b.partial_cmp(a)
b.partial_cmp(&a)
}
}
_ => None,
Expand Down
39 changes: 30 additions & 9 deletions engine/runtime/action-sink/src/file/cesium3dtiles/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl SinkFactory for Cesium3DTilesSinkFactory {
pub struct Cesium3DTilesWriter {
pub(super) global_params: Option<HashMap<String, serde_json::Value>>,
pub(super) params: Cesium3DTilesWriterCompiledParam,
pub(super) buffer: HashMap<Uri, Vec<Feature>>,
pub(super) buffer: HashMap<(Uri, String), Vec<Feature>>,
}

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
Expand All @@ -157,6 +157,11 @@ impl Sink for Cesium3DTilesWriter {
}

fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError> {
let Some(feature_type) = &ctx.feature.feature_type() else {
return Err(
SinkError::Cesium3DTilesWriter("Failed to get feature type".to_string()).into(),
);
};
let geometry = &ctx.feature.geometry;
if geometry.is_empty() {
return Err(SinkError::Cesium3DTilesWriter("Unsupported input".to_string()).into());
Expand All @@ -175,12 +180,22 @@ impl Sink for Cesium3DTilesWriter {
.eval_ast::<String>(&output)
.map_err(|e| SinkError::Cesium3DTilesWriter(format!("{:?}", e)))?;
let output = Uri::from_str(path.as_str())?;
let buffer = self.buffer.entry(output).or_default();
let buffer = self
.buffer
.entry((output, feature_type.clone()))
.or_default();
buffer.push(feature);
Ok(())
}
fn finish(&self, ctx: NodeContext) -> Result<(), BoxedError> {
for (output, buffer) in &self.buffer {
let mut features = HashMap::<Uri, Vec<(String, Vec<Feature>)>>::new();
for ((output, feature_type), buffer) in &self.buffer {
features
.entry(output.clone())
.or_default()
.push((feature_type.clone(), buffer.clone()));
}
for (output, buffer) in &features {
self.write(ctx.as_context(), buffer, output)?;
}
Ok(())
Expand All @@ -191,21 +206,27 @@ impl Cesium3DTilesWriter {
pub fn write(
&self,
ctx: Context,
upstream: &[Feature],
upstream: &Vec<(String, Vec<Feature>)>,
output: &Uri,
) -> crate::errors::Result<()> {
let tile_id_conv = TileIdMethod::Hilbert;
let attach_texture = self.params.attach_texture.unwrap_or(false);
let Some(first) = upstream.first() else {
return Ok(());
};
let schema: nusamai_citygml::schema::Schema = first.into();
let mut features = Vec::new();
let mut schema: nusamai_citygml::schema::Schema = Default::default();
for (feature_type, upstream) in upstream {
let Some(feature) = upstream.first() else {
continue;
};
let typedef: nusamai_citygml::schema::TypeDef = feature.into();
schema.types.insert(feature_type.clone(), typedef);
features.extend(upstream.clone().into_iter());
}
std::thread::scope(|scope| {
let (sender_sliced, receiver_sliced) = std::sync::mpsc::sync_channel(2000);
let (sender_sorted, receiver_sorted) = std::sync::mpsc::sync_channel(2000);
scope.spawn(|| {
let result = geometry_slicing_stage(
upstream,
&features,
tile_id_conv,
sender_sliced,
self.params.min_zoom,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod helper;

fn main() {
helper::execute("data-convert/10-wtr/workflow.yml");
helper::execute("data-convert/01-bldg/workflow.yml");
}
5 changes: 4 additions & 1 deletion engine/runtime/gltf/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ impl<'a> MetadataEncoder<'a> {
attributes: &HashMap<String, AttributeValue>,
) -> crate::errors::Result<usize> {
let Some(TypeDef::Feature(feature_def)) = self.original_schema.types.get(typename) else {
return Err(crate::errors::Error::metadata("Feature type not found"));
return Err(crate::errors::Error::metadata(format!(
"Feature type not found: {}",
typename
)));
};

let typename = typename.replace(':', "_");
Expand Down
27 changes: 19 additions & 8 deletions engine/runtime/types/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ impl From<&Feature> for nusamai_citygml::schema::Schema {
let Some(feature_type) = v.feature_type() else {
return schema;
};
schema.types.insert(feature_type, v.into());
schema
}
}

impl From<&Feature> for nusamai_citygml::schema::TypeDef {
fn from(v: &Feature) -> Self {
let mut attributes = nusamai_citygml::schema::Map::default();
for (k, v) in v
.attributes
Expand All @@ -143,14 +150,10 @@ impl From<&Feature> for nusamai_citygml::schema::Schema {
{
attributes.insert(k.to_string(), v.clone().into());
}
schema.types.insert(
feature_type,
nusamai_citygml::schema::TypeDef::Feature(nusamai_citygml::schema::FeatureTypeDef {
attributes,
additional_attributes: true,
}),
);
schema
nusamai_citygml::schema::TypeDef::Feature(nusamai_citygml::schema::FeatureTypeDef {
attributes,
additional_attributes: true,
})
}
}

Expand Down Expand Up @@ -350,6 +353,14 @@ impl Feature {
.collect::<serde_json::Map<_, _>>(),
);
scope.set("__value", value);
scope.set(
"__feature_type",
serde_json::Value::String(self.feature_type().unwrap_or_default()),
);
scope.set(
"__feature_id",
serde_json::Value::String(self.feature_id().unwrap_or_default()),
);
if let Some(with) = with {
for (k, v) in with {
scope.set(k, v.clone());
Expand Down
23 changes: 21 additions & 2 deletions engine/schema/actions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,9 @@
"Attribute": {
"type": "string"
},
"Expr": {
"type": "string"
},
"Order": {
"type": "string",
"enum": [
Expand All @@ -1093,12 +1096,28 @@
"SortBy": {
"type": "object",
"required": [
"attribute",
"order"
],
"properties": {
"attribute": {
"$ref": "#/definitions/Attribute"
"anyOf": [
{
"$ref": "#/definitions/Attribute"
},
{
"type": "null"
}
]
},
"attributeValue": {
"anyOf": [
{
"$ref": "#/definitions/Expr"
},
{
"type": "null"
}
]
},
"order": {
"$ref": "#/definitions/Order"
Expand Down

0 comments on commit 15cf563

Please sign in to comment.