From 1501b492d44edae10962dc4b55ca2c6493599185 Mon Sep 17 00:00:00 2001 From: Yuji Mise Date: Thu, 26 Sep 2024 12:23:23 +0900 Subject: [PATCH] feat(engine): add zstd compression support for geometry attributes --- engine/Cargo.lock | 1 + engine/Cargo.toml | 1 + .../src/geometry/extractor.rs | 4 +- .../action-processor/src/geometry/replacer.rs | 6 +-- engine/runtime/common/Cargo.toml | 1 + engine/runtime/common/src/compress.rs | 37 ++++++++++++++++++ engine/runtime/common/src/lib.rs | 8 ++++ engine/runtime/common/src/str.rs | 8 +++- .../03-tran-rwy-trk-squr-wwy/workflow.yml | 39 ++++++++++++++++--- 9 files changed, 93 insertions(+), 12 deletions(-) create mode 100644 engine/runtime/common/src/compress.rs diff --git a/engine/Cargo.lock b/engine/Cargo.lock index 5e0c830e4..2022bd154 100644 --- a/engine/Cargo.lock +++ b/engine/Cargo.lock @@ -4417,6 +4417,7 @@ dependencies = [ "tokio", "url", "uuid", + "zstd", ] [[package]] diff --git a/engine/Cargo.toml b/engine/Cargo.toml index b4608a075..24114669b 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -162,3 +162,4 @@ uuid = { version = "1.10.0", features = [ "serde", "v4", ] } +zstd = "0.13.0" diff --git a/engine/runtime/action-processor/src/geometry/extractor.rs b/engine/runtime/action-processor/src/geometry/extractor.rs index 9a49a6589..b13be82ed 100644 --- a/engine/runtime/action-processor/src/geometry/extractor.rs +++ b/engine/runtime/action-processor/src/geometry/extractor.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use reearth_flow_common::str::base64_encode; +use reearth_flow_common::compress::compress; use reearth_flow_runtime::{ channels::ProcessorChannelForwarder, errors::BoxedError, @@ -98,7 +98,7 @@ impl Processor for GeometryExtractor { )) })?; let dump = serde_json::to_string(&value)?; - let dump = base64_encode(dump); + let dump = compress(&dump)?; feature .attributes .insert(self.output_attribute.clone(), AttributeValue::String(dump)); diff --git a/engine/runtime/action-processor/src/geometry/replacer.rs b/engine/runtime/action-processor/src/geometry/replacer.rs index 0140bb911..2f26ef697 100644 --- a/engine/runtime/action-processor/src/geometry/replacer.rs +++ b/engine/runtime/action-processor/src/geometry/replacer.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use reearth_flow_common::str::base64_decode; +use reearth_flow_common::compress::decode; use reearth_flow_runtime::{ channels::ProcessorChannelForwarder, errors::BoxedError, @@ -99,8 +99,8 @@ impl Processor for GeometryReplacer { fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone())); return Ok(()); }; - let decoded = base64_decode(dump)?; - let geometry: Geometry = serde_json::from_str(&decoded)?; + let dump = decode(dump.clone())?; + let geometry: Geometry = serde_json::from_str(&dump)?; feature.geometry = Some(geometry); feature.attributes.remove(&self.source_attribute); fw.send(ctx.new_with_feature_and_port(feature, DEFAULT_PORT.clone())); diff --git a/engine/runtime/common/Cargo.toml b/engine/runtime/common/Cargo.toml index c24a9f5e5..f6c3b01b7 100644 --- a/engine/runtime/common/Cargo.toml +++ b/engine/runtime/common/Cargo.toml @@ -34,3 +34,4 @@ thiserror.workspace = true tokio.workspace = true url.workspace = true uuid.workspace = true +zstd.workspace = true diff --git a/engine/runtime/common/src/compress.rs b/engine/runtime/common/src/compress.rs new file mode 100644 index 000000000..e6c11fec5 --- /dev/null +++ b/engine/runtime/common/src/compress.rs @@ -0,0 +1,37 @@ +use crate::{ + str::{base64_decode_byte, base64_encode}, + Error, Result, +}; + +pub fn compress(source: &str) -> Result { + let compressed = zstd::encode_all(source.as_bytes(), 3) + .map_err(|e| Error::Compress(format!("Failed to compress: {}", e)))?; + Ok(base64_encode(&compressed)) +} + +pub fn decode(source: String) -> Result { + let bytes = base64_decode_byte(source)?; + let decoded = zstd::decode_all(bytes.as_slice()) + .map_err(|e| Error::Compress(format!("Failed to decompress: {}", e)))?; + Ok(String::from_utf8_lossy(&decoded).to_string()) +} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compress_and_decode() { + let original = "This is a test string to compress and decompress."; + let compressed = compress(original).expect("Compression failed"); + let decompressed = decode(compressed).expect("Decompression failed"); + assert_eq!(original, decompressed); + } + + #[test] + fn test_compress_empty_string() { + let original = ""; + let compressed = compress(original).expect("Compression failed"); + let decompressed = decode(compressed).expect("Decompression failed"); + assert_eq!(original, decompressed); + } +} diff --git a/engine/runtime/common/src/lib.rs b/engine/runtime/common/src/lib.rs index 913556be9..2ca2cedaa 100644 --- a/engine/runtime/common/src/lib.rs +++ b/engine/runtime/common/src/lib.rs @@ -26,6 +26,9 @@ pub enum Error { #[error("JsonError: {0}")] Json(String), + + #[error("CompressError: {0}")] + Compress(String), } impl Error { @@ -60,12 +63,17 @@ impl Error { pub fn json(message: T) -> Self { Self::Json(message.to_string()) } + + pub fn compress(message: T) -> Self { + Self::Compress(message.to_string()) + } } pub type Result = std::result::Result; pub mod collection; pub mod color; +pub mod compress; pub mod csv; pub mod dir; pub mod fs; diff --git a/engine/runtime/common/src/str.rs b/engine/runtime/common/src/str.rs index 2bafc74c5..acb22903c 100644 --- a/engine/runtime/common/src/str.rs +++ b/engine/runtime/common/src/str.rs @@ -17,7 +17,13 @@ pub fn base64_decode>(s: T) -> crate::Result { let result = general_purpose::STANDARD .decode(s) .map_err(|e| crate::Error::Str(format!("{}", e)))?; - String::from_utf8(result).map_err(|e| crate::Error::Str(format!("{}", e))) + Ok(String::from_utf8_lossy(&result).to_string()) +} + +pub fn base64_decode_byte>(s: T) -> crate::Result> { + general_purpose::STANDARD + .decode(s) + .map_err(|e| crate::Error::Str(format!("{}", e))) } pub fn remove_trailing_slash(s: &str) -> String { diff --git a/engine/runtime/examples/plateau/testdata/workflow/quality-check/03-tran-rwy-trk-squr-wwy/workflow.yml b/engine/runtime/examples/plateau/testdata/workflow/quality-check/03-tran-rwy-trk-squr-wwy/workflow.yml index 85868f922..dbd97f46d 100644 --- a/engine/runtime/examples/plateau/testdata/workflow/quality-check/03-tran-rwy-trk-squr-wwy/workflow.yml +++ b/engine/runtime/examples/plateau/testdata/workflow/quality-check/03-tran-rwy-trk-squr-wwy/workflow.yml @@ -254,11 +254,11 @@ graphs: outputPort: default - id: 92f040b7-c8d8-4fb2-a8ff-9b1d7d2a0e09 - name: GeometryExtractor + name: GeometryExtractorCityGml type: action action: GeometryExtractor with: - outputAttribute: dumpGeometry + outputAttribute: dumpGeometryCityGml - id: 79b7bc2a-735a-4f5f-9e6a-ce5d51dbda0e name: TwoDimensionForcer @@ -276,6 +276,13 @@ graphs: - fileIndex - package + - id: 357febd2-3f43-45aa-b9cf-a10e4e3b59aa + name: GeometryExtractor2D + type: action + action: GeometryExtractor + with: + outputAttribute: dumpGeometry2D + - id: 73056fde-2493-44a2-9196-c2f69900a75a name: GeometryCoercerByLineString type: action @@ -301,7 +308,17 @@ graphs: groupBy: - lod - package - outputAttribute: overlap + outputAttribute: lineOverlap + + - id: 6f4352f3-0397-4672-966d-67637a6c9b98 + name: FeatureFilterByLineOverlap + type: action + action: FeatureFilter + with: + conditions: + - expr: | + env.get("__value").lineOverlap > 1 + outputPort: lineOverlap - id: 69f22583-c0c8-4343-b83b-79842ad0d9ce name: Noop @@ -402,11 +419,16 @@ graphs: to: f45d09c3-2ea5-48c6-a215-aadfacb75260 fromPort: default toPort: default - - id: 0cd140d1-5012-49b2-b816-c7c23a3e7da8 + - id: 690e045f-cc7a-406d-b324-5a54fea85158 from: f45d09c3-2ea5-48c6-a215-aadfacb75260 - to: 73056fde-2493-44a2-9196-c2f69900a75a + to: 357febd2-3f43-45aa-b9cf-a10e4e3b59aa fromPort: area toPort: default + - id: 0cd140d1-5012-49b2-b816-c7c23a3e7da8 + from: 357febd2-3f43-45aa-b9cf-a10e4e3b59aa + to: 73056fde-2493-44a2-9196-c2f69900a75a + fromPort: default + toPort: default - id: 9515cd3c-0b1d-45d5-9d68-79a3906869ab from: 73056fde-2493-44a2-9196-c2f69900a75a to: 5b958746-a54b-4e0f-ad26-5c7388167943 @@ -419,9 +441,14 @@ graphs: toPort: default - id: 3cf6418b-5991-4e36-bb93-39b4740378fe from: 46c146fb-857f-4426-a49d-ba70bc8e252f - to: 69f22583-c0c8-4343-b83b-79842ad0d9ce + to: 6f4352f3-0397-4672-966d-67637a6c9b98 fromPort: line toPort: default + - id: 7afe76cf-f121-415c-a86e-91b6d4f233ed + from: 6f4352f3-0397-4672-966d-67637a6c9b98 + to: 69f22583-c0c8-4343-b83b-79842ad0d9ce + fromPort: lineOverlap + toPort: default ## start surface validate - id: 025e8b3f-7884-485b-94e9-b5055040a59b