From 4ea6a0fcaf04b93e01d687eebd1a9e2487af422a Mon Sep 17 00:00:00 2001 From: Yuji Mise Date: Tue, 8 Oct 2024 23:38:40 +0900 Subject: [PATCH] feat(engine): update MVT handling and add error types (#560) * feat(engine): update MVT handling and add error types * fix --- engine/Cargo.lock | 38 + engine/Cargo.toml | 1 + engine/runtime/action-sink/Cargo.toml | 2 + engine/runtime/action-sink/src/errors.rs | 4 + engine/runtime/action-sink/src/file.rs | 1 - engine/runtime/action-sink/src/file/mvt.rs | 844 +----------------- .../runtime/action-sink/src/file/mvt/sink.rs | 424 +++++++++ .../runtime/action-sink/src/file/mvt/slice.rs | 275 ++++++ .../runtime/action-sink/src/file/mvt/tags.rs | 47 + engine/runtime/action-sink/src/file/types.rs | 8 +- .../action-sink/src/file/vector_tile.rs | 110 --- engine/runtime/action-sink/src/mapping.rs | 2 +- .../data-convert/04-luse-lsld/workflow.yml | 10 +- .../runtime/geometry/src/types/line_string.rs | 15 +- engine/runtime/geometry/src/types/polygon.rs | 20 +- engine/tools/mvt/index.html | 6 +- 16 files changed, 842 insertions(+), 965 deletions(-) create mode 100644 engine/runtime/action-sink/src/file/mvt/sink.rs create mode 100644 engine/runtime/action-sink/src/file/mvt/slice.rs create mode 100644 engine/runtime/action-sink/src/file/mvt/tags.rs delete mode 100644 engine/runtime/action-sink/src/file/vector_tile.rs diff --git a/engine/Cargo.lock b/engine/Cargo.lock index d3cfb0e99..58bca0a33 100644 --- a/engine/Cargo.lock +++ b/engine/Cargo.lock @@ -422,6 +422,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bincode" +version = "2.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11ea1a0346b94ef188834a65c068a03aec181c94896d481d7a0a40d85b0ce95" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -569,6 +578,20 @@ name = "bytemuck" version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc8b54b395f2fcfbb3d90c47b01c7f444d94d05bdeb775811dec868ac3bbc26" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] [[package]] name = "byteorder" @@ -2639,6 +2662,19 @@ dependencies = [ "selectors", ] +[[package]] +name = "kv-extsort" +version = "0.1.0" +source = "git+https://github.com/MIERUNE/kv-extsort-rs.git#6ac60e183cca6525876b24137cb1b822236e6e0d" +dependencies = [ + "bytemuck", + "crossbeam-channel", + "log", + "rayon", + "tempfile", + "thiserror", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4341,6 +4377,7 @@ version = "0.0.2" dependencies = [ "ahash 0.8.11", "async-trait", + "bincode", "byteorder", "bytes", "cesiumtiles", @@ -4355,6 +4392,7 @@ dependencies = [ "image 0.25.2", "indexmap 2.6.0", "itertools", + "kv-extsort", "nusamai-citygml", "nusamai-geometry", "nusamai-gltf", diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 3d17a0796..cc3a96663 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -78,6 +78,7 @@ async-stream = "0.3.6" async-trait = "0.1.83" async_zip = { version = "0.0.17", features = ["full"] } base64 = "0.22.1" +bincode = { version = "2.0.0-rc.3", default-features = false, features = ["serde", "std"] } byteorder = "1.5.0" bytes = { version = "1.7.2", features = ["serde"] } chrono = { version = "0.4.38", features = ["serde"] } diff --git a/engine/runtime/action-sink/Cargo.toml b/engine/runtime/action-sink/Cargo.toml index 6c2595a90..f5729d2d7 100644 --- a/engine/runtime/action-sink/Cargo.toml +++ b/engine/runtime/action-sink/Cargo.toml @@ -20,6 +20,7 @@ reearth-flow-state.workspace = true reearth-flow-storage.workspace = true reearth-flow-types.workspace = true +kv-extsort = { git = "https://github.com/MIERUNE/kv-extsort-rs.git" } nusamai-citygml.workspace = true nusamai-geometry.workspace = true nusamai-gltf.workspace = true @@ -29,6 +30,7 @@ nusamai-projection.workspace = true ahash.workspace = true async-trait.workspace = true +bincode.workspace = true byteorder.workspace = true bytes.workspace = true cesiumtiles.workspace = true diff --git a/engine/runtime/action-sink/src/errors.rs b/engine/runtime/action-sink/src/errors.rs index 210c68441..24c7015a5 100644 --- a/engine/runtime/action-sink/src/errors.rs +++ b/engine/runtime/action-sink/src/errors.rs @@ -14,6 +14,8 @@ pub enum SinkError { GeoJsonWriterFactory(String), #[error("GeoJson Writer error: {0}")] GeoJsonWriter(String), + #[error("Mvt Writer error: {0}")] + MvtWriter(String), } impl SinkError { @@ -21,3 +23,5 @@ impl SinkError { Self::FileWriter(message.to_string()) } } + +pub type Result = std::result::Result; diff --git a/engine/runtime/action-sink/src/file.rs b/engine/runtime/action-sink/src/file.rs index ed3a32078..0405635b5 100644 --- a/engine/runtime/action-sink/src/file.rs +++ b/engine/runtime/action-sink/src/file.rs @@ -5,5 +5,4 @@ mod gltf; pub(crate) mod mvt; pub(super) mod types; mod util; -mod vector_tile; pub(crate) mod writer; diff --git a/engine/runtime/action-sink/src/file/mvt.rs b/engine/runtime/action-sink/src/file/mvt.rs index d13021517..919807094 100644 --- a/engine/runtime/action-sink/src/file/mvt.rs +++ b/engine/runtime/action-sink/src/file/mvt.rs @@ -1,841 +1,3 @@ -use super::vector_tile::tile; -use crate::errors::SinkError; -use ahash::RandomState; -use flate2::{write::ZlibEncoder, Compression}; -use geomotry_types::GeometryType; -use hashbrown::HashMap as BrownHashMap; -use indexmap::IndexSet; -use reearth_flow_common::uri::Uri; -use reearth_flow_geometry::algorithm::coords_iter::CoordsIter; -use reearth_flow_geometry::types::coordinate::Coordinate; -use reearth_flow_geometry::types::coordinate::Coordinate2D; -use reearth_flow_geometry::types::line_string::LineString; -use reearth_flow_geometry::types::multi_polygon::MultiPolygon2D; -use reearth_flow_geometry::types::polygon::Polygon2D; -use reearth_flow_geometry::types::polygon::Polygon3D; -use reearth_flow_runtime::errors::BoxedError; -use reearth_flow_runtime::event::EventHub; -use reearth_flow_runtime::executor_operation::{ExecutorContext, NodeContext}; -use reearth_flow_runtime::node::{Port, Sink, SinkFactory, DEFAULT_PORT}; -use reearth_flow_storage::resolve::StorageResolver; -use reearth_flow_types::geometry as geomotry_types; -use reearth_flow_types::Attribute; -use reearth_flow_types::AttributeValue; -use reearth_flow_types::Expr; -use rhai::Variant; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use serde_json::Value as JsonValue; -use std::collections::HashMap; -use std::fs; -use std::io::Write; -use std::path::Path; -use std::str::FromStr; -use std::sync::Arc; -use std::vec; - -#[derive(Debug, Clone, Default)] -pub struct MVTSinkFactory; - -impl SinkFactory for MVTSinkFactory { - fn name(&self) -> &str { - "MVTWriter" - } - - fn description(&self) -> &str { - "Writes features to a file" - } - - fn parameter_schema(&self) -> Option { - Some(schemars::schema_for!(MVTWriterParam)) - } - - fn categories(&self) -> &[&'static str] { - &["File"] - } - - fn get_input_ports(&self) -> Vec { - vec![DEFAULT_PORT.clone()] - } - - fn prepare(&self) -> Result<(), BoxedError> { - Ok(()) - } - - fn build( - &self, - _ctx: NodeContext, - _event_hub: EventHub, - _action: String, - with: Option>, - ) -> Result, BoxedError> { - let params = if let Some(with) = with { - let value: JsonValue = serde_json::to_value(with).map_err(|e| { - SinkError::BuildFactory(format!("Failed to serialize `with` parameter: {}", e)) - })?; - serde_json::from_value(value).map_err(|e| { - SinkError::BuildFactory(format!("Failed to deserialize `with` parameter: {}", e)) - })? - } else { - return Err( - SinkError::BuildFactory("Missing required parameter `with`".to_string()).into(), - ); - }; - - let sink = MVTWriter { params }; - Ok(Box::new(sink)) - } -} - -#[derive(Debug, Clone)] -pub struct MVTWriter { - pub(super) params: MVTWriterParam, -} - -#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct MVTWriterCommonParam { - pub(super) output: Expr, -} - -#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct MVTWriterParam { - pub(super) output: Expr, - pub(super) min_zoom: u8, - pub(super) max_zoom: u8, -} - -impl Sink for MVTWriter { - fn name(&self) -> &str { - "MVTWriter" - } - - fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError> { - let geometry = ctx.feature.geometry.as_ref().unwrap(); - let attributes = ctx.feature.attributes.clone(); - let geometry_value = geometry.value.clone(); - match geometry_value { - geomotry_types::GeometryValue::None => { - return Err(Box::new(SinkError::FileWriter( - "Unsupported input".to_string(), - ))); - } - geomotry_types::GeometryValue::CityGmlGeometry(city_gml) => { - let storage_resolver = Arc::clone(&ctx.storage_resolver); - let expr_engine = Arc::clone(&ctx.expr_engine); - let output = self.params.output.clone(); - let scope = expr_engine.new_scope(); - let path = scope - .eval::(output.as_ref()) - .unwrap_or_else(|_| output.as_ref().to_string()); - let output = Uri::from_str(path.as_str())?; - match self.handle_city_gml_geometry( - &output, - storage_resolver.clone(), - attributes, - city_gml, - self.params.min_zoom, - self.params.max_zoom, - ) { - Ok(_) => (), - Err(e) => { - return Err(Box::new(SinkError::FileWriter(format!( - "CityGmlGeometry handle Error: {:?}", - e - )))) - } - }; - } - geomotry_types::GeometryValue::FlowGeometry2D(_flow_geom_2d) => { - return Err(Box::new(SinkError::FileWriter( - "Unsupported input".to_string(), - ))); - } - geomotry_types::GeometryValue::FlowGeometry3D(_flow_geom_3d) => { - return Err(Box::new(SinkError::FileWriter( - "Unsupported input".to_string(), - ))); - } - } - - Ok(()) - } - fn finish(&self, _ctx: NodeContext) -> Result<(), BoxedError> { - Ok(()) - } -} - -impl MVTWriter { - fn handle_city_gml_geometry( - &self, - output: &Uri, - storage_resolver: Arc, - attributes: HashMap, - city_gml: geomotry_types::CityGmlGeometry, - min_zoom: u8, - max_zoom: u8, - ) -> Result<(), crate::errors::SinkError> { - let max_detail = 12; // 4096 - let buffer_pixels = 5; - - let mut tiled_mpolys = HashMap::new(); - - let extent = 1 << max_detail; - let buffer = extent * buffer_pixels / 256; - - for entry in city_gml.gml_geometries.iter() { - match entry.ty { - GeometryType::Solid | GeometryType::Surface | GeometryType::Triangle => { - for poly in entry.clone().polygons { - // Early rejection of polygons that are not front-facing. - if poly.exterior().signed_ring_area() >= 0.0 { - continue; - } - - let area = poly.area(); - - // Slice for each zoom level - for zoom in min_zoom..=max_zoom { - // Skip if the polygon is smaller than 4 square subpixels - // - // TODO: emulate the 'tiny-polygon-reduction' of tippecanoe - if area * (4u64.pow(zoom as u32 + max_detail) as f64) < 4.0 { - continue; - } - slice_polygon(zoom, extent, buffer, &poly, &mut tiled_mpolys); - } - } - } - GeometryType::Curve => { - // TODO: implement - } - GeometryType::Point => { - // TODO: implement - } - _ => { - // TODO: implement - } - } - } - - for ((zoom, x, y), mpoly) in tiled_mpolys { - if mpoly.is_empty() { - continue; - } - let feature = SlicedFeature { - geometry: mpoly, - properties: attributes.clone(), - }; - - let default_detail = 12; - let min_detail = 9; - - // let serialized_feats = vec![bytes]; - - let storage = storage_resolver - .resolve(output) - .map_err(crate::errors::SinkError::file_writer)?; - let output_path = output - .path() - .join(Path::new(&format!("{zoom}/{x}/{y}.pbf"))); - if let Some(dir) = output_path.parent() { - fs::create_dir_all(dir).map_err(crate::errors::SinkError::file_writer)?; - } - let path = Path::new(&output_path); - - for detail in (min_detail..=default_detail).rev() { - // Make a MVT tile binary - let bytes = make_tile(detail, &feature)?; - - // Retry with a lower detail level if the compressed tile size is too large - let compressed_size = { - let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); - e.write_all(&bytes) - .map_err(crate::errors::SinkError::file_writer)?; - let compressed_bytes = - e.finish().map_err(crate::errors::SinkError::file_writer)?; - compressed_bytes.len() - }; - if detail != min_detail && compressed_size > 500_000 { - // If the tile is too large, try a lower detail level - continue; - } - storage - .put_sync(path, bytes::Bytes::from(bytes)) - .map_err(crate::errors::SinkError::file_writer)?; - break; - } - } - Ok(()) - } -} - -fn make_tile(default_detail: i32, feature: &SlicedFeature) -> Result, SinkError> { - let mut layers: BrownHashMap = BrownHashMap::new(); - let mut int_ring_buf = Vec::new(); - let mut int_ring_buf2 = Vec::new(); - let extent = 1 << default_detail; - - let mpoly = feature.geometry.clone(); - let mut int_mpoly: MultiPolygon2D = Default::default(); // 2D only - - for poly in &mpoly { - for (ri, ring) in poly.rings().into_iter().enumerate() { - int_ring_buf.clear(); - int_ring_buf.extend(ring.into_iter().map(|c| { - let x = (c.x as f64 * extent as f64 + 0.5) as i16; - let y = (c.y as f64 * extent as f64 + 0.5) as i16; - [x, y] - })); - - // some simplification - { - int_ring_buf2.clear(); - int_ring_buf2.push(int_ring_buf[0]); - for c in int_ring_buf.windows(3) { - let &[prev, curr, next] = c.try_into().unwrap(); - - // Remove duplicate points - if prev == curr { - continue; - } - - // Reject collinear points - let [curr_x, curr_y] = curr; - let [prev_x, prev_y] = prev; - let [next_x, next_y] = next; - if curr != next - && ((next_y - prev_y) as i32 * (curr_x - prev_x) as i32).abs() - == ((curr_y - prev_y) as i32 * (next_x - prev_x) as i32).abs() - { - continue; - } - - int_ring_buf2.push(curr); - } - int_ring_buf2.push(*int_ring_buf.last().unwrap()); - } - - let ls = LineString::new( - int_ring_buf2 - .iter() - .map(|c| Coordinate::new_(c[0], c[1])) - .collect(), - ); - match ri { - 0 => int_mpoly.add_exterior(ls), - _ => int_mpoly.add_interior(ls), - } - } - } - - // encode geometry - let mut geom_enc = GeometryEncoder::new(); - for poly in &int_mpoly { - let exterior = poly.exterior(); - if exterior.signed_ring_area() < 0.0 { - geom_enc.add_ring(exterior.into_iter().map(|c| [c.x, c.y])); - for interior in poly.interiors() { - if interior.signed_ring_area() < 0.0 { - geom_enc.add_ring(interior.into_iter().map(|c| [c.x, c.y])); - } - } - } - } - let geometry = geom_enc.into_vec(); - if geometry.is_empty() { - return Ok(vec![]); - } - - let tags: Vec = Vec::new(); - - for (key, value) in &feature.properties { - let layer = layers.entry_ref(key.type_name()).or_default(); - let mut tags_clone = tags.clone(); - convert_properties(&mut tags_clone, &mut layer.tags_enc, key.type_name(), value); - - let geomery_cloned = geometry.clone(); - layer.features.push(vector_tile::tile::Feature { - id: Some( - key.type_name() - .as_bytes() - .iter() - .fold(5381u64, |a, c| a.wrapping_mul(33) ^ *c as u64), - ), - tags: tags_clone, - r#type: Some(vector_tile::tile::GeomType::Polygon as i32), - geometry: geomery_cloned, - }); - } - - let layers = layers - .into_iter() - .flat_map(|(name, layer_data)| { - if layer_data.features.is_empty() { - return None; - } - let (keys, values) = layer_data.tags_enc.into_keys_and_values(); - Some(vector_tile::tile::Layer { - version: 2, - name: name.to_string(), - features: layer_data.features, - keys, - values, - extent: Some(extent), - }) - }) - .collect(); - - let tile = vector_tile::Tile { layers }; - - let bytes = tile.encode_to_vec(); - Ok(bytes) -} - -const GEOM_COMMAND_MOVE_TO: u32 = 1; -const GEOM_COMMAND_LINE_TO: u32 = 2; -const GEOM_COMMAND_CLOSE_PATH: u32 = 7; - -const GEOM_COMMAND_MOVE_TO_WITH_COUNT1: u32 = 1 << 3 | GEOM_COMMAND_MOVE_TO; -const GEOM_COMMAND_CLOSE_PATH_WITH_COUNT1: u32 = 1 << 3 | GEOM_COMMAND_CLOSE_PATH; - -pub struct GeometryEncoder { - buf: Vec, - prev_x: i16, - prev_y: i16, -} - -/// Utility for encoding MVT geometries. -impl GeometryEncoder { - pub fn new() -> Self { - Self { - buf: Vec::new(), - prev_x: 0, - prev_y: 0, - } - } - - #[inline] - pub fn into_vec(self) -> Vec { - self.buf - } - - pub fn add_ring(&mut self, iterable: impl IntoIterator) { - let mut iter = iterable.into_iter(); - let Some([first_x, first_y]) = iter.next() else { - return; - }; - let dx = (first_x - self.prev_x) as i32; - let dy = (first_y - self.prev_y) as i32; - (self.prev_x, self.prev_y) = (first_x, first_y); - - // move to - self.buf - .extend([GEOM_COMMAND_MOVE_TO_WITH_COUNT1, zigzag(dx), zigzag(dy)]); - - // line to - let lineto_cmd_pos = self.buf.len(); - self.buf.push(GEOM_COMMAND_LINE_TO); // length will be updated later - let mut count = 0; - for [x, y] in iter { - let dx = (x - self.prev_x) as i32; - let dy = (y - self.prev_y) as i32; - (self.prev_x, self.prev_y) = (x, y); - if dx != 0 || dy != 0 { - self.buf.extend([zigzag(dx), zigzag(dy)]); - count += 1; - } - } - debug_assert!(count >= 2); - self.buf[lineto_cmd_pos] = GEOM_COMMAND_LINE_TO | count << 3; - - // close path - self.buf.push(GEOM_COMMAND_CLOSE_PATH_WITH_COUNT1); - } -} - -impl Default for GeometryEncoder { - fn default() -> Self { - Self::new() - } -} - -#[inline] -fn zigzag(v: i32) -> u32 { - ((v << 1) ^ (v >> 31)) as u32 -} - -#[derive(Default)] -pub struct TagsEncoder { - keys: IndexSet, - values: IndexSet, -} - -/// Utility for encoding MVT attributes (tags). -impl TagsEncoder { - #[allow(dead_code)] - pub fn new() -> Self { - Default::default() - } - - #[inline] - pub fn add(&mut self, key: &str, value: Value) -> [u32; 2] { - let key_idx = match self.keys.get_index_of(key) { - None => self.keys.insert_full(key.to_string()).0, - Some(idx) => idx, - }; - let value_idx = match self.values.get_index_of(&value) { - None => self.values.insert_full(value).0, - Some(idx) => idx, - }; - [key_idx as u32, value_idx as u32] - } - - #[inline] - pub fn into_keys_and_values(self) -> (Vec, Vec) { - let keys = self.keys.into_iter().collect(); - let values = self - .values - .into_iter() - .map(|v| v.into_tile_value()) - .collect(); - (keys, values) - } -} - -/// Wrapper for MVT Values -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum Value { - String(String), - Float([u8; 4]), - Double([u8; 8]), - Uint(u64), - SInt(i64), - Bool(bool), -} - -impl Value { - pub fn into_tile_value(self) -> tile::Value { - use Value::*; - match self { - String(v) => tile::Value { - string_value: Some(v), - ..Default::default() - }, - Float(v) => tile::Value { - float_value: Some(f32::from_ne_bytes(v)), - ..Default::default() - }, - Double(v) => tile::Value { - double_value: Some(f64::from_ne_bytes(v)), - ..Default::default() - }, - Uint(v) => tile::Value { - uint_value: Some(v), - ..Default::default() - }, - SInt(v) => tile::Value { - sint_value: Some(v), - ..Default::default() - }, - Bool(v) => tile::Value { - bool_value: Some(v), - ..Default::default() - }, - } - } -} - -impl From<&str> for Value { - fn from(v: &str) -> Self { - Value::String(v.to_string()) - } -} -impl From for Value { - fn from(v: String) -> Self { - Value::String(v) - } -} -impl From for Value { - fn from(v: u64) -> Self { - Value::Uint(v) - } -} -impl From for Value { - fn from(v: u32) -> Self { - Value::Uint(v as u64) - } -} -impl From for Value { - fn from(v: i64) -> Self { - if v >= 0 { - Value::Uint(v as u64) - } else { - Value::SInt(v) - } - } -} -impl From for Value { - fn from(v: i32) -> Self { - if v >= 0 { - Value::Uint(v as u64) - } else { - Value::SInt(v as i64) - } - } -} -impl From for Value { - fn from(v: f32) -> Self { - Value::Float(v.to_ne_bytes()) - } -} -impl From for Value { - fn from(v: f64) -> Self { - Value::Double(v.to_ne_bytes()) - } -} -impl From for Value { - fn from(v: bool) -> Self { - Value::Bool(v) - } -} - -#[derive(Default)] -struct LayerData { - pub features: Vec, - pub tags_enc: TagsEncoder, -} - -#[derive(Serialize, Deserialize)] -struct SlicedFeature { - geometry: MultiPolygon2D, - properties: HashMap, -} -use super::vector_tile; -use prost::Message; - -pub fn convert_properties( - tags: &mut Vec, - tags_enc: &mut TagsEncoder, - name: &str, - tree: &AttributeValue, -) { - match &tree { - AttributeValue::Null => { - // ignore - } - AttributeValue::String(v) => { - tags.extend(tags_enc.add(name, v.clone().into())); - } - AttributeValue::Bool(v) => { - tags.extend(tags_enc.add(name, (*v).into())); - } - AttributeValue::Number(v) => { - if let Some(v) = v.as_u64() { - tags.extend(tags_enc.add(name, v.into())); - } else if let Some(v) = v.as_i64() { - tags.extend(tags_enc.add(name, v.into())); - } else if let Some(v) = v.as_f64() { - tags.extend(tags_enc.add(name, v.into())); - } - } - AttributeValue::Array(_arr) => { - // ignore non-root attributes - } - AttributeValue::Bytes(_v) => { - // ignore non-root attributes - } - AttributeValue::Map(obj) => { - for (key, value) in obj { - convert_properties(tags, tags_enc, key, value); - } - } - AttributeValue::DateTime(v) => { - tags.extend(tags_enc.add(name, v.to_string().into())); - } - } -} - -fn slice_polygon( - zoom: u8, - extent: u32, - buffer: u32, - poly: &Polygon3D, - out: &mut HashMap<(u8, u32, u32), MultiPolygon2D>, -) { - let z_scale = (1 << zoom) as f64; - let buf_width = buffer as f64 / extent as f64; - let mut new_ring_buffer: Vec<[f64; 2]> = - Vec::with_capacity(poly.exterior().into_iter().len() + 1); - - // Slice along Y-axis - let y_range = { - let (min_y, max_y) = poly - .exterior() - .into_iter() - .fold((f64::MAX, f64::MIN), |(min_y, max_y), c| { - (min_y.min(c.y), max_y.max(c.y)) - }); - (min_y * z_scale).floor() as u32..(max_y * z_scale).ceil() as u32 - }; - - let mut y_sliced_polys = Vec::with_capacity(y_range.len()); - - for yi in y_range.clone() { - let k1 = (yi as f64 - buf_width) / z_scale; - let k2 = ((yi + 1) as f64 + buf_width) / z_scale; - - let mut y_sliced_poly: Polygon2D = - Polygon2D::::new(LineString::new(vec![]), vec![]); - - for ring in poly.rings() { - if ring.coords_count() == 0 { - continue; - } - - new_ring_buffer.clear(); - ring.into_iter() - .fold(None, |a, b| { - let Some(a) = a else { return Some(b) }; - - if a.y < k1 { - if b.y > k1 { - let x = (b.x - a.x) * (k1 - a.y) / (b.y - a.y) + a.x; - new_ring_buffer.push([x, k1]) - } - } else if a.y > k2 { - if b.y < k2 { - let x = (b.x - a.x) * (k2 - a.y) / (b.y - a.y) + a.x; - new_ring_buffer.push([x, k2]) - } - } else { - new_ring_buffer.push([a.x, a.y]) - } - - if b.y < k1 && a.y > k1 { - let x = (b.x - a.x) * (k1 - a.y) / (b.y - a.y) + a.x; - new_ring_buffer.push([x, k1]) - } else if b.y > k2 && a.y < k2 { - let x = (b.x - a.x) * (k2 - a.y) / (b.y - a.y) + a.x; - new_ring_buffer.push([x, k2]) - } - - Some(b) - }) - .unwrap(); - - let coordinates: Vec> = new_ring_buffer - .clone() - .into_iter() - .map(|[x, y]| Coordinate { x, y, z: 00 }) - .map(|c| Coordinate2D::new_(c.x, c.y)) - .collect(); - - let linestring = LineString::from(coordinates); - - y_sliced_poly.add_ring(linestring); - } - - y_sliced_polys.push(y_sliced_poly); - } - - let mut norm_coords_buf = Vec::new(); - - // Slice along X-axis - for (yi, y_sliced_poly) in y_range.zip(y_sliced_polys.iter()) { - let x_range = { - let (min_x, max_x) = y_sliced_poly - .exterior() - .into_iter() - .fold((f64::MAX, f64::MIN), |(min_x, max_x), c| { - (min_x.min(c.x), max_x.max(c.x)) - }); - (min_x * z_scale).floor() as i32..(max_x * z_scale).ceil() as i32 - }; - - for xi in x_range { - let k1 = (xi as f64 - buf_width) / z_scale; - let k2 = ((xi + 1) as f64 + buf_width) / z_scale; - - let key = ( - zoom, - xi.rem_euclid(1 << zoom) as u32, // handling geometry crossing the antimeridian - yi, - ); - let tile_mpoly = out.entry(key).or_default(); - - for (ri, ring) in y_sliced_poly.rings().into_iter().enumerate() { - if ring.coords_count() == 0 { - continue; - } - - new_ring_buffer.clear(); - ring.into_iter() - .fold(None, |a, b| { - let Some(a) = a else { return Some(b) }; - - if a.x < k1 { - if b.x > k1 { - let y = (b.y - a.y) * (k1 - a.x) / (b.x - a.x) + a.y; - new_ring_buffer.push([k1, y]) - } - } else if a.x > k2 { - if b.x < k2 { - let y = (b.y - a.y) * (k2 - a.x) / (b.x - a.x) + a.y; - new_ring_buffer.push([k2, y]) - } - } else { - new_ring_buffer.push([a.x, a.y]) - } - - if b.x < k1 && a.x > k1 { - let y = (b.y - a.y) * (k1 - a.x) / (b.x - a.x) + a.y; - new_ring_buffer.push([k1, y]) - } else if b.x > k2 && a.x < k2 { - let y = (b.y - a.y) * (k2 - a.x) / (b.x - a.x) + a.y; - new_ring_buffer.push([k2, y]) - } - - Some(b) - }) - .unwrap(); - - // get integer coordinates and simplify the ring - { - norm_coords_buf.clear(); - norm_coords_buf.extend(new_ring_buffer.iter().map(|&[x, y]| { - let tx = (x * z_scale - xi as f64) as i16; - let ty = (y * z_scale - yi as f64) as i16; - [tx, ty] - })); - - // remove closing point if exists - if norm_coords_buf.len() >= 2 - && norm_coords_buf[0] == *norm_coords_buf.last().unwrap() - { - norm_coords_buf.pop(); - } - - if norm_coords_buf.len() < 3 { - continue; - } - } - - // let mut ring = LineString::from(norm_coords_buf); - let mut ls = LineString::new( - norm_coords_buf - .iter() - .map(|c| Coordinate::new_(c[0], c[1])) - .collect(), - ); - ls.reverse_inplace(); - - match ri { - 0 => tile_mpoly.add_exterior(ls), - _ => tile_mpoly.add_interior(ls), - }; - } - } - } -} +pub(crate) mod sink; +pub(crate) mod slice; +pub(crate) mod tags; diff --git a/engine/runtime/action-sink/src/file/mvt/sink.rs b/engine/runtime/action-sink/src/file/mvt/sink.rs new file mode 100644 index 000000000..425785ab0 --- /dev/null +++ b/engine/runtime/action-sink/src/file/mvt/sink.rs @@ -0,0 +1,424 @@ +use std::collections::HashMap; +use std::convert::Infallible; +use std::io::Write; +use std::path::Path; +use std::str::FromStr; +use std::sync::Arc; +use std::vec; + +use flate2::{write::ZlibEncoder, Compression}; +use itertools::Itertools; +use nusamai_geometry::MultiPolygon as NMultiPolygon; +use nusamai_mvt::geometry::GeometryEncoder; +use nusamai_mvt::tag::TagsEncoder; +use nusamai_mvt::tileid::TileIdMethod; +use nusamai_mvt::vector_tile; +use prost::Message; +use rayon::iter::ParallelBridge; +use rayon::iter::ParallelIterator; +use reearth_flow_common::uri::Uri; +use reearth_flow_runtime::errors::BoxedError; +use reearth_flow_runtime::event::EventHub; +use reearth_flow_runtime::executor_operation::Context; +use reearth_flow_runtime::executor_operation::{ExecutorContext, NodeContext}; +use reearth_flow_runtime::node::{Port, Sink, SinkFactory, DEFAULT_PORT}; +use reearth_flow_types::geometry as geometry_types; +use reearth_flow_types::Expr; +use reearth_flow_types::Feature; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; + +use super::slice::slice_cityobj_geoms; +use super::tags::convert_properties; +use crate::errors::SinkError; + +#[derive(Debug, Clone, Default)] +pub struct MVTSinkFactory; + +impl SinkFactory for MVTSinkFactory { + fn name(&self) -> &str { + "MVTWriter" + } + + fn description(&self) -> &str { + "Writes features to a file" + } + + fn parameter_schema(&self) -> Option { + Some(schemars::schema_for!(MVTWriterParam)) + } + + fn categories(&self) -> &[&'static str] { + &["File"] + } + + fn get_input_ports(&self) -> Vec { + vec![DEFAULT_PORT.clone()] + } + + fn prepare(&self) -> Result<(), BoxedError> { + Ok(()) + } + + fn build( + &self, + _ctx: NodeContext, + _event_hub: EventHub, + _action: String, + with: Option>, + ) -> Result, BoxedError> { + let params = if let Some(with) = with { + let value: JsonValue = serde_json::to_value(with).map_err(|e| { + SinkError::BuildFactory(format!("Failed to serialize `with` parameter: {}", e)) + })?; + serde_json::from_value(value).map_err(|e| { + SinkError::BuildFactory(format!("Failed to deserialize `with` parameter: {}", e)) + })? + } else { + return Err( + SinkError::BuildFactory("Missing required parameter `with`".to_string()).into(), + ); + }; + + let sink = MVTWriter { + buffer: Vec::new(), + params, + }; + Ok(Box::new(sink)) + } +} + +#[derive(Debug, Clone)] +pub struct MVTWriter { + pub(super) params: MVTWriterParam, + pub(super) buffer: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct MVTWriterCommonParam { + pub(super) output: Expr, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct MVTWriterParam { + pub(super) output: Expr, + pub(super) min_zoom: u8, + pub(super) max_zoom: u8, +} + +impl Sink for MVTWriter { + fn name(&self) -> &str { + "MVTWriter" + } + + fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError> { + let Some(geometry) = ctx.feature.geometry.as_ref() else { + return Err(Box::new(SinkError::FileWriter( + "Unsupported input".to_string(), + ))); + }; + let geometry_value = geometry.value.clone(); + match geometry_value { + geometry_types::GeometryValue::CityGmlGeometry(_) => { + self.buffer.push(ctx.feature.clone()); + } + _ => { + return Err(Box::new(SinkError::MvtWriter( + "Unsupported input".to_string(), + ))); + } + } + + Ok(()) + } + fn finish(&self, ctx: NodeContext) -> Result<(), BoxedError> { + let upstream = &self.buffer; + let tile_id_conv = TileIdMethod::Hilbert; + let expr_engine = Arc::clone(&ctx.expr_engine); + let output = self.params.output.clone(); + let scope = expr_engine.new_scope(); + let path = scope + .eval::(output.as_ref()) + .unwrap_or_else(|_| output.as_ref().to_string()); + let output = Uri::from_str(path.as_str())?; + + std::thread::scope(|scope| { + let (sender_sliced, receiver_sliced) = std::sync::mpsc::sync_channel(2000); + let (sender_sorted, receiver_sorted) = std::sync::mpsc::sync_channel(2000); + scope.spawn(|| { + let _ = geometry_slicing_stage(upstream, tile_id_conv, sender_sliced, &self.params); + }); + scope.spawn(|| { + let _ = feature_sorting_stage(receiver_sliced, sender_sorted); + }); + scope.spawn(|| { + let _ = + tile_writing_stage(ctx.as_context(), &output, receiver_sorted, tile_id_conv); + }); + }); + Ok(()) + } +} + +fn geometry_slicing_stage( + upstream: &[Feature], + tile_id_conv: TileIdMethod, + sender_sliced: std::sync::mpsc::SyncSender<(u64, Vec)>, + mvt_options: &MVTWriterParam, +) -> crate::errors::Result<()> { + let bincode_config = bincode::config::standard(); + + // Convert CityObjects to sliced features + upstream.iter().par_bridge().try_for_each(|feature| { + let max_detail = 12; // 4096 + let buffer_pixels = 5; + slice_cityobj_geoms( + feature, + mvt_options.min_zoom, + mvt_options.max_zoom, + max_detail, + buffer_pixels, + |(z, x, y, typename), mpoly| { + let feature = super::slice::SlicedFeature { + typename, + geometry: mpoly, + properties: feature.attributes.clone(), + }; + let bytes = bincode::serde::encode_to_vec(&feature, bincode_config).unwrap(); + let tile_id = tile_id_conv.zxy_to_id(z, x, y); + if sender_sliced.send((tile_id, bytes)).is_err() { + return Err(crate::errors::SinkError::MvtWriter("Canceled".to_string())); + }; + Ok(()) + }, + ) + })?; + Ok(()) +} + +fn feature_sorting_stage( + receiver_sliced: std::sync::mpsc::Receiver<(u64, Vec)>, + sender_sorted: std::sync::mpsc::SyncSender<(u64, Vec>)>, +) -> crate::errors::Result<()> { + let config = kv_extsort::SortConfig::default().max_chunk_bytes(256 * 1024 * 1024); + + let sorted_iter = kv_extsort::sort( + receiver_sliced + .into_iter() + .map(|(tile_id, body)| std::result::Result::<_, Infallible>::Ok((tile_id, body))), + config, + ); + + for ((_, tile_id), grouped) in &sorted_iter.chunk_by(|feat| match feat { + Ok((tile_id, _)) => (false, *tile_id), + Err(_) => (true, 0), + }) { + let grouped = grouped + .into_iter() + .map_ok(|(_, serialized_feats)| serialized_feats) + .collect::, _>>(); + match grouped { + Ok(serialized_feats) => { + if sender_sorted.send((tile_id, serialized_feats)).is_err() { + return Err(crate::errors::SinkError::MvtWriter("Canceled".to_string())); + } + } + Err(kv_extsort::Error::Canceled) => { + return Err(crate::errors::SinkError::MvtWriter("Canceled".to_string())); + } + Err(err) => { + return Err(crate::errors::SinkError::MvtWriter(format!( + "Failed to sort features: {:?}", + err + ))); + } + } + } + + Ok(()) +} + +#[derive(Default)] +struct LayerData { + pub features: Vec, + pub tags_enc: TagsEncoder, +} + +fn tile_writing_stage( + ctx: Context, + output_path: &Uri, + receiver_sorted: std::sync::mpsc::Receiver<(u64, Vec>)>, + tile_id_conv: TileIdMethod, +) -> crate::errors::Result<()> { + let default_detail = 12; + let min_detail = 9; + + let storage = ctx + .storage_resolver + .resolve(output_path) + .map_err(|e| crate::errors::SinkError::MvtWriter(format!("{:?}", e)))?; + + receiver_sorted + .into_iter() + .par_bridge() + .try_for_each(|(tile_id, serialized_feats)| { + let (zoom, x, y) = tile_id_conv.id_to_zxy(tile_id); + + let path = output_path + .join(Path::new(&format!("{zoom}/{x}/{y}.pbf"))) + .map_err(|e| crate::errors::SinkError::MvtWriter(format!("{:?}", e)))?; + for detail in (min_detail..=default_detail).rev() { + // Make a MVT tile binary + let bytes = make_tile(detail, &serialized_feats)?; + + // Retry with a lower detail level if the compressed tile size is too large + let compressed_size = { + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(&bytes) + .map_err(|e| crate::errors::SinkError::MvtWriter(format!("{:?}", e)))?; + let compressed_bytes = e + .finish() + .map_err(|e| crate::errors::SinkError::MvtWriter(format!("{:?}", e)))?; + compressed_bytes.len() + }; + if detail != min_detail && compressed_size > 500_000 { + // If the tile is too large, try a lower detail level + continue; + } + storage + .put_sync(&path.path(), bytes::Bytes::from(bytes)) + .map_err(|e| crate::errors::SinkError::MvtWriter(format!("{:?}", e)))?; + break; + } + Ok::<(), crate::errors::SinkError>(()) + })?; + Ok(()) +} + +fn make_tile(default_detail: i32, serialized_feats: &[Vec]) -> crate::errors::Result> { + let mut layers: HashMap = HashMap::new(); + let mut int_ring_buf = Vec::new(); + let mut int_ring_buf2 = Vec::new(); + let extent = 1 << default_detail; + let bincode_config = bincode::config::standard(); + + for serialized_feat in serialized_feats { + let (feature, _): (super::slice::SlicedFeature, _) = + bincode::serde::decode_from_slice(serialized_feat, bincode_config).map_err(|err| { + crate::errors::SinkError::MvtWriter(format!( + "Failed to deserialize a sliced feature: {:?}", + err + )) + })?; + + let mpoly = feature.geometry; + let mut int_mpoly = NMultiPolygon::<[i16; 2]>::new(); + + for poly in &mpoly { + for (ri, ring) in poly.rings().enumerate() { + int_ring_buf.clear(); + int_ring_buf.extend(ring.into_iter().map(|[x, y]| { + let x = (x * extent as f64 + 0.5) as i16; + let y = (y * extent as f64 + 0.5) as i16; + [x, y] + })); + + // some simplification + { + int_ring_buf2.clear(); + int_ring_buf2.push(int_ring_buf[0]); + for c in int_ring_buf.windows(3) { + let &[prev, curr, next] = c.try_into().unwrap(); + + // Remove duplicate points + if prev == curr { + continue; + } + + // Reject collinear points + let [curr_x, curr_y] = curr; + let [prev_x, prev_y] = prev; + let [next_x, next_y] = next; + if curr != next + && ((next_y - prev_y) as i32 * (curr_x - prev_x) as i32).abs() + == ((curr_y - prev_y) as i32 * (next_x - prev_x) as i32).abs() + { + continue; + } + + int_ring_buf2.push(curr); + } + int_ring_buf2.push(*int_ring_buf.last().unwrap()); + } + + match ri { + 0 => int_mpoly.add_exterior(int_ring_buf2.drain(..)), + _ => int_mpoly.add_interior(int_ring_buf2.drain(..)), + } + } + } + + // encode geometry + let mut geom_enc = GeometryEncoder::new(); + for poly in &int_mpoly { + let exterior = poly.exterior(); + if exterior.signed_ring_area() > 0.0 { + geom_enc.add_ring(&exterior); + for interior in poly.interiors() { + if interior.is_cw() { + geom_enc.add_ring(&interior); + } + } + } + } + let geometry = geom_enc.into_vec(); + if geometry.is_empty() { + continue; + } + + let mut tags: Vec = Vec::new(); + + let layer = { + let layer = layers.entry(feature.typename).or_default(); + + // Encode attributes as MVT tags + for (key, value) in &feature.properties { + convert_properties(&mut tags, &mut layer.tags_enc, key.as_ref(), value); + } + layer + }; + + layer.features.push(vector_tile::tile::Feature { + id: None, + tags, + r#type: Some(vector_tile::tile::GeomType::Polygon as i32), + geometry, + }); + } + + let layers = layers + .into_iter() + .flat_map(|(name, layer_data)| { + if layer_data.features.is_empty() { + return None; + } + let (keys, values) = layer_data.tags_enc.into_keys_and_values(); + Some(vector_tile::tile::Layer { + version: 2, + name: name.to_string(), + features: layer_data.features, + keys, + values, + extent: Some(extent), + }) + }) + .collect(); + + let tile = vector_tile::Tile { layers }; + + let bytes = tile.encode_to_vec(); + Ok(bytes) +} diff --git a/engine/runtime/action-sink/src/file/mvt/slice.rs b/engine/runtime/action-sink/src/file/mvt/slice.rs new file mode 100644 index 000000000..15c5405eb --- /dev/null +++ b/engine/runtime/action-sink/src/file/mvt/slice.rs @@ -0,0 +1,275 @@ +use std::collections::HashMap; + +use nusamai_geometry::{LineString2, MultiPolygon2, Polygon2}; +use nusamai_mvt::webmercator::lnglat_to_web_mercator; +use reearth_flow_types::{Attribute, AttributeValue, Feature, GeometryType, GeometryValue}; +use serde::{Deserialize, Serialize}; + +pub type TileZXYName = (u8, u32, u32, String); + +#[derive(Serialize, Deserialize)] +pub(super) struct SlicedFeature<'a> { + pub(super) typename: String, + pub(super) geometry: MultiPolygon2<'a>, + pub(super) properties: HashMap, +} + +pub(super) fn slice_cityobj_geoms( + feature: &Feature, + min_z: u8, + max_z: u8, + max_detail: u32, + buffer_pixels: u32, + f: impl Fn(TileZXYName, MultiPolygon2) -> Result<(), crate::errors::SinkError>, +) -> Result<(), crate::errors::SinkError> { + let mut tiled_mpolys = HashMap::new(); + + let extent = 1 << max_detail; + let buffer = extent * buffer_pixels / 256; + let Some(geometry) = feature.geometry.as_ref() else { + return Err(crate::errors::SinkError::MvtWriter( + "Feature does not have geometry".to_string(), + )); + }; + let GeometryValue::CityGmlGeometry(city_geometry) = &geometry.value else { + return Err(crate::errors::SinkError::MvtWriter( + "Feature does not have geometry value".to_string(), + )); + }; + + city_geometry + .gml_geometries + .iter() + .for_each(|entry| match entry.ty { + GeometryType::Solid + | GeometryType::Surface + | GeometryType::Triangle + | GeometryType::CompositeSurface + | GeometryType::MultiSurface => { + for flow_poly in entry.polygons.iter() { + let idx_poly: Polygon2 = flow_poly.clone().into(); + let poly = idx_poly.transform(|[lng, lat]| { + let (mx, my) = lnglat_to_web_mercator(*lng, *lat); + [mx, my] + }); + + if !poly.exterior().is_cw() { + continue; + } + let area = poly.area(); + let Some(typename) = entry.feature_type.as_ref() else { + continue; + }; + + for zoom in min_z..=max_z { + // Skip if the polygon is smaller than 4 square subpixels + // + // TODO: emulate the 'tiny-polygon-reduction' of tippecanoe + if area * (4u64.pow(zoom as u32 + max_detail) as f64) < 4.0 { + continue; + } + slice_polygon(zoom, extent, buffer, &poly, typename, &mut tiled_mpolys); + } + } + } + GeometryType::Curve | GeometryType::MultiCurve => { + unimplemented!() + } + GeometryType::Point | GeometryType::MultiPoint => { + unimplemented!() + } + GeometryType::Tin => { + unimplemented!() + } + }); + + for ((z, x, y, typename), mpoly) in tiled_mpolys { + if mpoly.is_empty() { + continue; + } + f((z, x, y, typename), mpoly)?; + } + + Ok(()) + + // TODO: linestring, point +} + +fn slice_polygon( + zoom: u8, + extent: u32, + buffer: u32, + poly: &Polygon2, + typename: &str, + out: &mut HashMap<(u8, u32, u32, String), MultiPolygon2>, +) { + let z_scale = (1 << zoom) as f64; + let buf_width = buffer as f64 / extent as f64; + let mut new_ring_buffer: Vec<[f64; 2]> = Vec::with_capacity(poly.exterior().len() + 1); + + // Slice along Y-axis + let y_range = { + let (min_y, max_y) = poly + .exterior() + .iter() + .fold((f64::MAX, f64::MIN), |(min_y, max_y), c| { + (min_y.min(c[1]), max_y.max(c[1])) + }); + (min_y * z_scale).floor() as u32..(max_y * z_scale).ceil() as u32 + }; + + let mut y_sliced_polys = Vec::with_capacity(y_range.len()); + + for yi in y_range.clone() { + let k1 = (yi as f64 - buf_width) / z_scale; + let k2 = ((yi + 1) as f64 + buf_width) / z_scale; + let mut y_sliced_poly = Polygon2::new(); + + // todo?: check interior bbox to optimize + + for ring in poly.rings() { + if ring.raw_coords().is_empty() { + continue; + } + + new_ring_buffer.clear(); + ring.iter_closed() + .fold(None, |a, b| { + let Some(a) = a else { return Some(b) }; + + if a[1] < k1 { + if b[1] > k1 { + let x = (b[0] - a[0]) * (k1 - a[1]) / (b[1] - a[1]) + a[0]; + // let z = (b[2] - a[2]) * (k1 - a[1]) / (b[1] - a[1]) + a[2]; + new_ring_buffer.push([x, k1]) + } + } else if a[1] > k2 { + if b[1] < k2 { + let x = (b[0] - a[0]) * (k2 - a[1]) / (b[1] - a[1]) + a[0]; + // let z = (b[2] - a[2]) * (k2 - a[1]) / (b[1] - a[1]) + a[2]; + new_ring_buffer.push([x, k2]) + } + } else { + new_ring_buffer.push(a) + } + + if b[1] < k1 && a[1] > k1 { + let x = (b[0] - a[0]) * (k1 - a[1]) / (b[1] - a[1]) + a[0]; + // let z = (b[2] - a[2]) * (k1 - a[1]) / (b[1] - a[1]) + a[2]; + new_ring_buffer.push([x, k1]) + } else if b[1] > k2 && a[1] < k2 { + let x = (b[0] - a[0]) * (k2 - a[1]) / (b[1] - a[1]) + a[0]; + // let z = (b[2] - a[2]) * (k2 - a[1]) / (b[1] - a[1]) + a[2]; + new_ring_buffer.push([x, k2]) + } + + Some(b) + }) + .unwrap(); + + y_sliced_poly.add_ring(new_ring_buffer.iter().copied()); + } + + y_sliced_polys.push(y_sliced_poly); + } + + let mut norm_coords_buf = Vec::new(); + + // Slice along X-axis + for (yi, y_sliced_poly) in y_range.zip(y_sliced_polys.iter()) { + let x_range = { + let (min_x, max_x) = y_sliced_poly + .exterior() + .iter() + .fold((f64::MAX, f64::MIN), |(min_x, max_x), c| { + (min_x.min(c[0]), max_x.max(c[0])) + }); + (min_x * z_scale).floor() as i32..(max_x * z_scale).ceil() as i32 + }; + + for xi in x_range { + let k1 = (xi as f64 - buf_width) / z_scale; + let k2 = ((xi + 1) as f64 + buf_width) / z_scale; + + // todo?: check interior bbox to optimize ... + + let key = ( + zoom, + xi.rem_euclid(1 << zoom) as u32, // handling geometry crossing the antimeridian + yi, + typename.to_string(), + ); + let tile_mpoly = out.entry(key).or_default(); + + for (ri, ring) in y_sliced_poly.rings().enumerate() { + if ring.raw_coords().is_empty() { + continue; + } + + new_ring_buffer.clear(); + ring.iter_closed() + .fold(None, |a, b| { + let Some(a) = a else { return Some(b) }; + + if a[0] < k1 { + if b[0] > k1 { + let y = (b[1] - a[1]) * (k1 - a[0]) / (b[0] - a[0]) + a[1]; + // let z = (b[2] - a[2]) * (k1 - a[0]) / (b[0] - a[0]) + a[2]; + new_ring_buffer.push([k1, y]) + } + } else if a[0] > k2 { + if b[0] < k2 { + let y = (b[1] - a[1]) * (k2 - a[0]) / (b[0] - a[0]) + a[1]; + // let z = (b[2] - a[2]) * (k2 - a[0]) / (b[0] - a[0]) + a[2]; + new_ring_buffer.push([k2, y]) + } + } else { + new_ring_buffer.push(a) + } + + if b[0] < k1 && a[0] > k1 { + let y = (b[1] - a[1]) * (k1 - a[0]) / (b[0] - a[0]) + a[1]; + // let z = (b[2] - a[2]) * (k1 - a[0]) / (b[0] - a[0]) + a[2]; + new_ring_buffer.push([k1, y]) + } else if b[0] > k2 && a[0] < k2 { + let y = (b[1] - a[1]) * (k2 - a[0]) / (b[0] - a[0]) + a[1]; + // let z = (b[2] - a[2]) * (k2 - a[0]) / (b[0] - a[0]) + a[2]; + new_ring_buffer.push([k2, y]) + } + + Some(b) + }) + .unwrap(); + + // get integer coordinates and simplify the ring + { + norm_coords_buf.clear(); + norm_coords_buf.extend(new_ring_buffer.iter().map(|&[x, y]| { + let tx = x * z_scale - xi as f64; + let ty = y * z_scale - yi as f64; + [tx, ty] + })); + + // remove closing point if exists + if norm_coords_buf.len() >= 2 + && norm_coords_buf[0] == *norm_coords_buf.last().unwrap() + { + norm_coords_buf.pop(); + } + + if norm_coords_buf.len() < 3 { + continue; + } + } + + let mut ring = LineString2::from_raw(norm_coords_buf.clone().into()); + ring.reverse_inplace(); + + match ri { + 0 => tile_mpoly.add_exterior(ring.iter()), + _ => tile_mpoly.add_interior(ring.iter()), + }; + } + } + } +} diff --git a/engine/runtime/action-sink/src/file/mvt/tags.rs b/engine/runtime/action-sink/src/file/mvt/tags.rs new file mode 100644 index 000000000..37e39696f --- /dev/null +++ b/engine/runtime/action-sink/src/file/mvt/tags.rs @@ -0,0 +1,47 @@ +use nusamai_mvt::tag::TagsEncoder; +use reearth_flow_types::AttributeValue; + +pub fn convert_properties( + tags: &mut Vec, + tags_enc: &mut TagsEncoder, + name: &str, + tree: &AttributeValue, +) { + match &tree { + AttributeValue::Null => { + // ignore + } + AttributeValue::String(v) => { + tags.extend(tags_enc.add(name, v.clone().into())); + } + AttributeValue::Bool(v) => { + tags.extend(tags_enc.add(name, (*v).into())); + } + AttributeValue::Number(v) => { + if let Some(v) = v.as_u64() { + tags.extend(tags_enc.add(name, v.into())); + } else if let Some(v) = v.as_i64() { + tags.extend(tags_enc.add(name, v.into())); + } else if let Some(v) = v.as_f64() { + tags.extend(tags_enc.add(name, v.into())); + } else { + // Handle any remaining number types by converting to string + tags.extend(tags_enc.add(name, v.to_string().into())); + } + } + AttributeValue::Array(_arr) => { + // ignore non-root attributes + } + AttributeValue::Bytes(_v) => { + // ignore non-root attributes + } + AttributeValue::Map(obj) => { + for (key, value) in obj { + convert_properties(tags, tags_enc, key, value); + } + } + AttributeValue::DateTime(v) => { + tags.extend(tags_enc.add(name, v.to_string().into())); + } + } +} diff --git a/engine/runtime/action-sink/src/file/types.rs b/engine/runtime/action-sink/src/file/types.rs index fe96aa622..e977c38c1 100644 --- a/engine/runtime/action-sink/src/file/types.rs +++ b/engine/runtime/action-sink/src/file/types.rs @@ -1,4 +1,4 @@ -pub(super) mod material; -pub(super) mod metadata; -pub(super) mod slice; -pub(super) mod tree; +pub(crate) mod material; +pub(crate) mod metadata; +pub(crate) mod slice; +pub(crate) mod tree; diff --git a/engine/runtime/action-sink/src/file/vector_tile.rs b/engine/runtime/action-sink/src/file/vector_tile.rs deleted file mode 100644 index 79589ead0..000000000 --- a/engine/runtime/action-sink/src/file/vector_tile.rs +++ /dev/null @@ -1,110 +0,0 @@ -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Tile { - #[prost(message, repeated, tag = "3")] - pub layers: ::prost::alloc::vec::Vec, -} -/// Nested message and enum types in `Tile`. -pub mod tile { - /// Variant type encoding - /// The use of values is described in section 4.1 of the specification - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Value { - /// Exactly one of these values must be present in a valid message - #[prost(string, optional, tag = "1")] - pub string_value: ::core::option::Option<::prost::alloc::string::String>, - #[prost(float, optional, tag = "2")] - pub float_value: ::core::option::Option, - #[prost(double, optional, tag = "3")] - pub double_value: ::core::option::Option, - #[prost(int64, optional, tag = "4")] - pub int_value: ::core::option::Option, - #[prost(uint64, optional, tag = "5")] - pub uint_value: ::core::option::Option, - #[prost(sint64, optional, tag = "6")] - pub sint_value: ::core::option::Option, - #[prost(bool, optional, tag = "7")] - pub bool_value: ::core::option::Option, - } - /// Features are described in section 4.2 of the specification - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Feature { - #[prost(uint64, optional, tag = "1", default = "0")] - pub id: ::core::option::Option, - /// Tags of this feature are encoded as repeated pairs of - /// integers. - /// A detailed description of tags is located in sections - /// 4.2 and 4.4 of the specification - #[prost(uint32, repeated, tag = "2")] - pub tags: ::prost::alloc::vec::Vec, - /// The type of geometry stored in this feature. - #[prost(enumeration = "GeomType", optional, tag = "3", default = "Unknown")] - pub r#type: ::core::option::Option, - /// Contains a stream of commands and parameters (vertices). - /// A detailed description on geometry encoding is located in - /// section 4.3 of the specification. - #[prost(uint32, repeated, tag = "4")] - pub geometry: ::prost::alloc::vec::Vec, - } - /// Layers are described in section 4.1 of the specification - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Layer { - /// Any compliant implementation must first read the version - /// number encoded in this message and choose the correct - /// implementation for this version number before proceeding to - /// decode other parts of this message. - #[prost(uint32, required, tag = "15", default = "1")] - pub version: u32, - #[prost(string, required, tag = "1")] - pub name: ::prost::alloc::string::String, - /// The actual features in this tile. - #[prost(message, repeated, tag = "2")] - pub features: ::prost::alloc::vec::Vec, - /// Dictionary encoding for keys - #[prost(string, repeated, tag = "3")] - pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - /// Dictionary encoding for values - #[prost(message, repeated, tag = "4")] - pub values: ::prost::alloc::vec::Vec, - /// Although this is an "optional" field it is required by the specification. - /// See - #[prost(uint32, optional, tag = "5", default = "4096")] - pub extent: ::core::option::Option, - } - /// GeomType is described in section 4.3.4 of the specification - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum GeomType { - Unknown = 0, - Point = 1, - Linestring = 2, - Polygon = 3, - } - impl GeomType { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - GeomType::Unknown => "UNKNOWN", - GeomType::Point => "POINT", - GeomType::Linestring => "LINESTRING", - GeomType::Polygon => "POLYGON", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "UNKNOWN" => Some(Self::Unknown), - "POINT" => Some(Self::Point), - "LINESTRING" => Some(Self::Linestring), - "POLYGON" => Some(Self::Polygon), - _ => None, - } - } - } -} diff --git a/engine/runtime/action-sink/src/mapping.rs b/engine/runtime/action-sink/src/mapping.rs index d31ae6634..b33d14826 100644 --- a/engine/runtime/action-sink/src/mapping.rs +++ b/engine/runtime/action-sink/src/mapping.rs @@ -7,7 +7,7 @@ use crate::{ echo::EchoSinkFactory, file::{ cesium3dtiles::Cesium3DTilesSinkFactory, geojson::GeoJsonWriterFactory, - mvt::MVTSinkFactory, writer::FileWriterSinkFactory, + mvt::sink::MVTSinkFactory, writer::FileWriterSinkFactory, }, noop::NoopSinkFactory, }; diff --git a/engine/runtime/examples/plateau/testdata/workflow/data-convert/04-luse-lsld/workflow.yml b/engine/runtime/examples/plateau/testdata/workflow/data-convert/04-luse-lsld/workflow.yml index 55a826744..aa21a561c 100644 --- a/engine/runtime/examples/plateau/testdata/workflow/data-convert/04-luse-lsld/workflow.yml +++ b/engine/runtime/examples/plateau/testdata/workflow/data-convert/04-luse-lsld/workflow.yml @@ -121,9 +121,15 @@ graphs: } - id: b4862d31-4bb2-49b1-8f0d-6d58dd4cb385 - name: NoopLuse + name: mvtWriter type: action - action: NoopSink + action: MVTWriter + with: + format: mvt + minZoom: 12 + maxZoom: 18 + output: | + env.get("outputPath") edges: - id: 7b81f501-3f07-4cec-bf9b-9cefcebdf47d diff --git a/engine/runtime/geometry/src/types/line_string.rs b/engine/runtime/geometry/src/types/line_string.rs index 4bc68289f..184a33ac9 100644 --- a/engine/runtime/geometry/src/types/line_string.rs +++ b/engine/runtime/geometry/src/types/line_string.rs @@ -298,7 +298,7 @@ impl IndexMut for LineString { } } -impl<'a> From> for LineString { +impl<'a> From> for LineString2D { #[inline] fn from(coords: NLineString2<'a>) -> Self { LineString2D::new( @@ -310,7 +310,18 @@ impl<'a> From> for LineString { } } -impl<'a> From> for LineString { +impl<'a> From> for NLineString2<'a> { + #[inline] + fn from(coords: LineString3D) -> Self { + let mut line_string = NLineString2::new(); + for coord in coords.iter() { + line_string.push([coord.x, coord.y]); + } + line_string + } +} + +impl<'a> From> for LineString3D { #[inline] fn from(coords: NLineString3<'a>) -> Self { LineString3D::new( diff --git a/engine/runtime/geometry/src/types/polygon.rs b/engine/runtime/geometry/src/types/polygon.rs index 2e9abb334..461cdb6af 100644 --- a/engine/runtime/geometry/src/types/polygon.rs +++ b/engine/runtime/geometry/src/types/polygon.rs @@ -4,7 +4,7 @@ use approx::{AbsDiffEq, RelativeEq}; use geo_types::Polygon as GeoPolygon; use nalgebra::{Point2 as NaPoint2, Point3 as NaPoint3}; use num_traits::Zero; -use nusamai_geometry::{Polygon2 as NPolygon2, Polygon3 as NPolygon3}; +use nusamai_geometry::{LineString2 as NLineString2, Polygon2 as NPolygon2, Polygon3 as NPolygon3}; use nusamai_projection::vshift::Jgd2011ToWgs84; use serde::{Deserialize, Serialize}; @@ -381,6 +381,24 @@ impl<'a> From> for Polygon2D { } } +impl<'a> From> for NPolygon2<'a> { + #[inline] + fn from(poly: Polygon3D) -> Self { + let interiors: Vec = poly + .interiors() + .iter() + .map(|interior| interior.clone().into()) + .collect(); + let mut npoly = NPolygon2::new(); + let exterior: NLineString2 = poly.exterior().clone().into(); + npoly.add_ring(&exterior); + for interior in interiors.iter() { + npoly.add_ring(interior); + } + npoly + } +} + impl<'a> From> for Polygon { #[inline] fn from(poly: NPolygon3<'a>) -> Self { diff --git a/engine/tools/mvt/index.html b/engine/tools/mvt/index.html index 57629a542..db9a69222 100644 --- a/engine/tools/mvt/index.html +++ b/engine/tools/mvt/index.html @@ -36,8 +36,8 @@ const viewer = new Cesium.Viewer("cesiumContainer", {}); const yourMvt = new CesiumMVTImageryProvider.CesiumMVTImageryProvider({ - urlTemplate: "http://localhost:8000/dist/{z}/{x}/{y}.mvt", - layerName: "luse", + urlTemplate: "http://localhost:8004/dist/{z}/{x}/{y}.pbf", + layerName: "luse:LandUse", style: feature => { return { fillStyle: "white" // 白で塗りつぶす @@ -47,7 +47,7 @@ viewer.scene.imageryLayers.addImageryProvider(yourMvt); viewer.camera.setView({ - destination: Cesium.Cartesian3.fromDegrees(139.8393542,35.8249542, 5000.0) + destination: Cesium.Cartesian3.fromDegrees(138.2922289,36.0921473, 5000.0) });