diff --git a/engine/Cargo.lock b/engine/Cargo.lock index 778fbd65d..f67638a1a 100644 --- a/engine/Cargo.lock +++ b/engine/Cargo.lock @@ -304,6 +304,15 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -966,6 +975,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64009896348fc5af4222e9cf7d7d82a95a256c634ebcf61c53e4ea461422242" + [[package]] name = "crossbeam" version = "0.8.4" @@ -1702,6 +1717,31 @@ dependencies = [ "version_check", ] +[[package]] +name = "geo" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d640a4dd1d1c98b45f4653c841a8ec15f461a71b86bc30533ae64c6f20f268" +dependencies = [ + "float_next_after", + "geo-types", + "geographiclib-rs", + "log", + "num-traits", + "robust 0.2.3", + "rstar 0.10.0", +] + +[[package]] +name = "geo-buffer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267bf0373df2f0b0b05065ebc0c84b97ccd221e19e0cb9442bcffe8fce04c130" +dependencies = [ + "geo", + "geo-types", +] + [[package]] name = "geo-types" version = "0.7.13" @@ -1710,9 +1750,19 @@ checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61" dependencies = [ "approx", "num-traits", + "rstar 0.10.0", "serde", ] +[[package]] +name = "geographiclib-rs" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e5ed84f8089c70234b0a8e0aedb6dc733671612ddc0d37c6066052f9781960" +dependencies = [ + "libm", +] + [[package]] name = "geojson" version = "0.24.1" @@ -1947,6 +1997,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hash32" version = "0.3.1" @@ -1976,13 +2035,26 @@ dependencies = [ "serde", ] +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32 0.2.1", + "rustc_version", + "spin", + "stable_deref_trait", +] + [[package]] name = "heapless" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" dependencies = [ - "hash32", + "hash32 0.3.1", "stable_deref_trait", ] @@ -4217,7 +4289,7 @@ dependencies = [ "reearth-flow-types", "regex", "rhai", - "rstar", + "rstar 0.12.0", "schemars", "serde", "serde_json", @@ -4423,6 +4495,8 @@ dependencies = [ "bytes", "clipper-sys", "float_next_after", + "geo-buffer", + "geo-types", "geojson", "nalgebra", "num-traits", @@ -4430,8 +4504,8 @@ dependencies = [ "nusamai-projection", "parking_lot", "reearth-flow-common", - "robust", - "rstar", + "robust 1.1.0", + "rstar 0.12.0", "serde", "serde_json", "strum_macros", @@ -4882,6 +4956,12 @@ dependencies = [ "serde", ] +[[package]] +name = "robust" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5864e7ef1a6b7bcf1d6ca3f655e65e724ed3b52546a0d0a663c991522f552ea" + [[package]] name = "robust" version = "1.1.0" @@ -4909,13 +4989,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstar" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f39465655a1e3d8ae79c6d9e007f4953bfc5d55297602df9dc38f9ae9f1359a" +dependencies = [ + "heapless 0.7.17", + "num-traits", + "smallvec", +] + [[package]] name = "rstar" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "133315eb94c7b1e8d0cb097e5a710d850263372fd028fff18969de708afc7008" dependencies = [ - "heapless", + "heapless 0.8.0", "num-traits", "smallvec", ] @@ -5593,6 +5684,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 3eddedcb6..4a52a7bb4 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -94,6 +94,8 @@ flate2 = "1.0.34" float_next_after = "1.0.0" futures = "0.3.30" futures-util = "0.3.30" +geo-buffer = "0.2.0" +geo-types = "0.7.13" geojson = "0.24.1" hashbrown = { version = "0.14.5", features = ["serde"] } home = "0.5.9" diff --git a/engine/runtime/action-processor/src/geometry/area_on_area_overlayer.rs b/engine/runtime/action-processor/src/geometry/area_on_area_overlayer.rs index 339eef504..6e697ff9b 100644 --- a/engine/runtime/action-processor/src/geometry/area_on_area_overlayer.rs +++ b/engine/runtime/action-processor/src/geometry/area_on_area_overlayer.rs @@ -1,12 +1,11 @@ use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use itertools::Itertools; -use nusamai_projection::crs::EpsgCode; use once_cell::sync::Lazy; -use reearth_flow_geometry::algorithm::intersects::Intersects; +use reearth_flow_geometry::algorithm::bufferable::buffer_polygon; use reearth_flow_geometry::types::geometry::Geometry2D; use reearth_flow_geometry::types::multi_polygon::{MultiPolygon, MultiPolygon2D}; +use reearth_flow_geometry::types::point::Point2D; use reearth_flow_geometry::types::polygon::{Polygon2D, Polygon2DFloat}; use reearth_flow_runtime::executor_operation::Context; use reearth_flow_runtime::node::REJECTED_PORT; @@ -17,8 +16,7 @@ use reearth_flow_runtime::{ executor_operation::{ExecutorContext, NodeContext}, node::{Port, Processor, ProcessorFactory, DEFAULT_PORT}, }; -use reearth_flow_types::AttributeValue; -use reearth_flow_types::{Attribute, Geometry}; +use reearth_flow_types::{Attribute, AttributeValue, Geometry}; use reearth_flow_types::{Feature, GeometryValue}; use rstar::{RTree, RTreeObject}; use schemars::JsonSchema; @@ -27,7 +25,8 @@ use serde_json::Value; use super::errors::GeometryProcessorError; -const EPSILON: f64 = 0.001; +// const EPSILON: f64 = 0.001; +const TOLERANCE: f64 = 0.2; pub static AREA_PORT: Lazy = Lazy::new(|| Port::new("area")); pub static REMNANTS_PORT: Lazy = Lazy::new(|| Port::new("remnants")); @@ -100,9 +99,16 @@ impl ProcessorFactory for AreaOnAreaOverlayerFactory { #[derive(Debug, Clone)] struct PolygonFeature { - attributes: HashMap>, - overlap: u64, - epsg: Option, + feature_id: uuid::Uuid, + geometry: Polygon2D, +} + +impl rstar::RTreeObject for PolygonFeature { + type Envelope = rstar::AABB>; + + fn envelope(&self) -> Self::Envelope { + self.geometry.envelope() + } } #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] @@ -116,11 +122,15 @@ pub struct AreaOnAreaOverlayerParam { #[derive(Debug, Clone)] pub struct AreaOnAreaOverlayer { params: AreaOnAreaOverlayerParam, - buffer: HashMap, RTree>)>, // (complete_grouped, features) + buffer: HashMap, RTree)>, // (complete_grouped, features) previous_group_key: Option, } impl Processor for AreaOnAreaOverlayer { + fn num_threads(&self) -> usize { + 10 + } + fn process( &mut self, ctx: ExecutorContext, @@ -151,11 +161,23 @@ impl Processor for AreaOnAreaOverlayer { let feature = feature.clone(); match geometry { Geometry2D::Polygon(poly) => { - rtree.insert(poly.clone()); + let Some(poly) = buffer_polygon(poly, -TOLERANCE) else { + return Ok(()); + }; + rtree.insert(PolygonFeature { + feature_id: feature.id, + geometry: poly, + }); } Geometry2D::MultiPolygon(mpoly) => { for poly in mpoly.iter() { - rtree.insert(poly.clone()); + let Some(poly) = buffer_polygon(poly, -TOLERANCE) else { + return Ok(()); + }; + rtree.insert(PolygonFeature { + feature_id: feature.id, + geometry: poly, + }); } } _ => { @@ -169,11 +191,23 @@ impl Processor for AreaOnAreaOverlayer { let mut rtree = RTree::new(); match geometry { Geometry2D::Polygon(poly) => { - rtree.insert(poly.clone()); + let Some(poly) = buffer_polygon(poly, -TOLERANCE) else { + return Ok(()); + }; + rtree.insert(PolygonFeature { + feature_id: feature.id, + geometry: poly, + }); } Geometry2D::MultiPolygon(mpoly) => { for poly in mpoly.iter() { - rtree.insert(poly.clone()); + let Some(poly) = buffer_polygon(poly, -TOLERANCE) else { + return Ok(()); + }; + rtree.insert(PolygonFeature { + feature_id: feature.id, + geometry: poly, + }); } } _ => { @@ -220,105 +254,74 @@ impl AreaOnAreaOverlayer { &self, ctx: Context, targets: &[Feature], - rtree: &RTree>, + rtree: &RTree, fw: &mut dyn ProcessorChannelForwarder, ) { - let mut polygon_features = HashMap::::new(); - for target_feature in targets { + let mut polygon_features = HashMap::, u64)>::new(); + let target_features = targets + .iter() + .map(|feature| (feature.id, feature.clone())) + .collect::>(); + for target_feature in target_features.values() { let Some(MultiPolygon(target)) = self.handle_2d_polygon_and_multi_polygon(target_feature) else { continue; }; - let mut out_polygons = Vec::new(); - let mut intersect = false; for target in target.iter() { let candidates = rtree.locate_in_envelope_intersecting(&target.envelope()); for other in candidates { - if target.approx_eq(other, EPSILON) { - continue; - } - if !target.intersects(other) { + if other.feature_id == target_feature.id { continue; } - intersect = true; - let polygon_float = Polygon2DFloat(other.clone()); + let polygon_float = Polygon2DFloat(other.geometry.clone()); match polygon_features.entry(polygon_float) { Entry::Occupied(mut entry) => { - let polygon_feature = entry.get_mut(); - polygon_feature.overlap += 1; - polygon_feature - .attributes - .insert(target_feature.id, target_feature.attributes.clone()); + let (feature_ids, polygon_feature) = entry.get_mut(); + feature_ids.insert(target_feature.id); + feature_ids.insert(other.feature_id); + *polygon_feature += 1; } Entry::Vacant(entry) => { - let mut attributes = HashMap::new(); - for (k, v) in target_feature.iter() { - attributes.insert(k.clone(), v.clone()); - } - entry.insert(PolygonFeature { - attributes: HashMap::from([(target_feature.id, attributes)]), - overlap: 1, - epsg: target_feature.geometry.as_ref().and_then(|g| g.epsg), - }); + let mut feature_ids = HashSet::new(); + feature_ids.insert(target_feature.id); + feature_ids.insert(other.feature_id); + entry.insert((feature_ids, 2)); } } } - if !intersect { - out_polygons.push(target.clone()); - } } - for polygon in out_polygons { - let mut feature = target_feature.clone(); - feature.attributes.insert( - self.params.output_attribute.clone(), - AttributeValue::Number(serde_json::Number::from(1)), - ); - feature.refresh_id(); - feature.geometry = Some(Geometry { - value: GeometryValue::FlowGeometry2D(Geometry2D::Polygon(polygon)), - epsg: feature.geometry.as_ref().and_then(|g| g.epsg), - }); - fw.send(ExecutorContext::new_with_context_feature_and_port( - &ctx, - feature, - AREA_PORT.clone(), - )); - } - } - for (polygon, polygon_feature) in polygon_features.iter() { - let mut feature = Feature::new(); + let mut feature = target_feature.clone(); feature.attributes.insert( self.params.output_attribute.clone(), - AttributeValue::Number(serde_json::Number::from(polygon_feature.overlap)), + AttributeValue::Number(1.into()), ); - let feature_attributes = polygon_feature - .attributes - .values() - .map(|kv| { - kv.iter() - .map(|(k, v)| (k.to_string(), v.clone())) - .collect::>() - }) - .collect_vec(); + fw.send(ExecutorContext::new_with_context_feature_and_port( + &ctx, + feature, + AREA_PORT.clone(), + )); + } + for (polygon, (feature_ids, polygon_feature)) in polygon_features { + let features = feature_ids + .iter() + .filter_map(|feature_id| target_features.get(feature_id)) + .collect::>(); + let mut feature = Feature::new(); feature.attributes.insert( - Attribute::new("features"), - AttributeValue::Array( - feature_attributes - .into_iter() - .map(AttributeValue::Map) - .collect(), - ), + self.params.output_attribute.clone(), + AttributeValue::Number(polygon_feature.into()), ); - let mut polygon_feat = feature.clone(); - polygon_feat.refresh_id(); - polygon_feat.geometry = Some(Geometry { - epsg: polygon_feature.epsg, - value: GeometryValue::FlowGeometry2D(Geometry2D::Polygon(polygon.0.clone())), + for other_feature in features.iter() { + feature.attributes.extend(other_feature.attributes.clone()); + } + feature.geometry = Some(Geometry { + epsg: features.first().unwrap().geometry.as_ref().unwrap().epsg, + value: GeometryValue::FlowGeometry2D(Geometry2D::Polygon(polygon.0)), }); fw.send(ExecutorContext::new_with_context_feature_and_port( &ctx, - polygon_feat, + feature, AREA_PORT.clone(), )); } diff --git a/engine/runtime/geometry/Cargo.toml b/engine/runtime/geometry/Cargo.toml index 3490f0ee4..d60b2aa3c 100644 --- a/engine/runtime/geometry/Cargo.toml +++ b/engine/runtime/geometry/Cargo.toml @@ -20,6 +20,8 @@ approx.workspace = true bytes.workspace = true clipper-sys.workspace = true float_next_after.workspace = true +geo-buffer.workspace = true +geo-types.workspace = true geojson.workspace = true nalgebra.workspace = true num-traits.workspace = true diff --git a/engine/runtime/geometry/src/algorithm/bufferable.rs b/engine/runtime/geometry/src/algorithm/bufferable.rs index 8fb12f476..f32af5446 100644 --- a/engine/runtime/geometry/src/algorithm/bufferable.rs +++ b/engine/runtime/geometry/src/algorithm/bufferable.rs @@ -1,3 +1,5 @@ +use geo_buffer::buffer_multi_polygon as geo_buffer_multi_polygon; + use crate::types::{ coordinate::{Coordinate, Coordinate2D}, line_string::LineString2D, @@ -58,6 +60,18 @@ impl Bufferable for Polygon2D { } } +pub fn buffer_polygon(input_polygon: &Polygon2D, distance: f64) -> Option> { + let result = buffer_multi_polygon(&MultiPolygon2D::new(vec![input_polygon.clone()]), distance); + result.0.first().cloned() +} + +pub fn buffer_multi_polygon( + input_multi_polygon: &MultiPolygon2D, + distance: f64, +) -> MultiPolygon2D { + geo_buffer_multi_polygon(&input_multi_polygon.clone().into(), distance).into() +} + #[cfg(test)] mod tests { use crate::{algorithm::coords_iter::CoordsIter, coord}; diff --git a/engine/runtime/geometry/src/types/line_string.rs b/engine/runtime/geometry/src/types/line_string.rs index d642f84dc..9d3513c37 100644 --- a/engine/runtime/geometry/src/types/line_string.rs +++ b/engine/runtime/geometry/src/types/line_string.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use std::iter::FromIterator; use std::ops::{Index, IndexMut}; +use geo_types::LineString as GeoLineString; use nusamai_geometry::{LineString2 as NLineString2, LineString3 as NLineString3}; use crate::utils::line_string_bounding_rect; @@ -482,3 +483,21 @@ impl LineString { true } } + +impl From> for GeoLineString { + fn from(line_string: LineString2D) -> Self { + GeoLineString(line_string.0.into_iter().map(|c| c.x_y().into()).collect()) + } +} + +impl From> for LineString2D { + fn from(line_string: GeoLineString) -> Self { + LineString2D::new( + line_string + .0 + .into_iter() + .map(|c| coordinate::Coordinate2D::new_(c.x, c.y)) + .collect(), + ) + } +} diff --git a/engine/runtime/geometry/src/types/multi_polygon.rs b/engine/runtime/geometry/src/types/multi_polygon.rs index fee5687a3..0a96a5abd 100644 --- a/engine/runtime/geometry/src/types/multi_polygon.rs +++ b/engine/runtime/geometry/src/types/multi_polygon.rs @@ -2,6 +2,7 @@ use std::iter::FromIterator; use std::ops::Range; use approx::{AbsDiffEq, RelativeEq}; +use geo_types::{MultiPolygon as GeoMultiPolygon, Polygon as GeoPolygon}; use nalgebra::{Point2 as NaPoint2, Point3 as NaPoint3}; use num_traits::Zero; use nusamai_geometry::{MultiPolygon2 as NMultiPolygon2, MultiPolygon3 as NMultiPolygon3}; @@ -273,3 +274,26 @@ where self.0.iter().all(|p| p.is_elevation_zero()) } } + +impl From> for GeoMultiPolygon { + fn from(mpolygon: MultiPolygon2D) -> Self { + GeoMultiPolygon( + mpolygon + .0 + .into_iter() + .map(GeoPolygon::from) + .collect::>(), + ) + } +} + +impl From> for MultiPolygon2D { + fn from(mpolygon: GeoMultiPolygon) -> Self { + let polygons = mpolygon + .0 + .into_iter() + .map(Polygon2D::from) + .collect::>(); + MultiPolygon2D::new(polygons) + } +} diff --git a/engine/runtime/geometry/src/types/polygon.rs b/engine/runtime/geometry/src/types/polygon.rs index ff32bdfb0..2e9abb334 100644 --- a/engine/runtime/geometry/src/types/polygon.rs +++ b/engine/runtime/geometry/src/types/polygon.rs @@ -1,6 +1,7 @@ use std::hash::{Hash, Hasher}; use approx::{AbsDiffEq, RelativeEq}; +use geo_types::Polygon as GeoPolygon; use nalgebra::{Point2 as NaPoint2, Point3 as NaPoint3}; use num_traits::Zero; use nusamai_geometry::{Polygon2 as NPolygon2, Polygon3 as NPolygon3}; @@ -571,3 +572,27 @@ impl Hash for Polygon2DFloat { } } } + +impl From> for GeoPolygon { + fn from(polygon: Polygon2D) -> Self { + let exterior = polygon.exterior().clone().into(); + let interiors = polygon + .interiors() + .iter() + .map(|interior| interior.clone().into()) + .collect(); + GeoPolygon::new(exterior, interiors) + } +} + +impl From> for Polygon2D { + fn from(polygon: GeoPolygon) -> Self { + let exterior = polygon.exterior().clone().into(); + let interiors = polygon + .interiors() + .iter() + .map(|interior| interior.clone().into()) + .collect(); + Polygon2D::new(exterior, interiors) + } +} diff --git a/engine/runtime/runtime/src/executor/processor_node.rs b/engine/runtime/runtime/src/executor/processor_node.rs index 754d16e1f..200d496b5 100644 --- a/engine/runtime/runtime/src/executor/processor_node.rs +++ b/engine/runtime/runtime/src/executor/processor_node.rs @@ -1,3 +1,4 @@ +use std::env; use std::fmt::Debug; use std::sync::atomic::AtomicU32; use std::sync::Arc; @@ -6,6 +7,7 @@ use std::{borrow::Cow, mem::swap}; use crossbeam::channel::Receiver; use futures::Future; +use once_cell::sync::Lazy; use petgraph::graph::NodeIndex; use reearth_flow_action_log::factory::LoggerFactory; use reearth_flow_action_log::{action_error_log, action_log, slow_action_log, ActionLogger}; @@ -28,7 +30,13 @@ use crate::{ use super::receiver_loop::init_select; use super::{execution_dag::ExecutionDag, receiver_loop::ReceiverLoop}; -const SLOW_ACTION_THRESHOLD: Duration = Duration::from_millis(300); +static SLOW_ACTION_THRESHOLD: Lazy = Lazy::new(|| { + env::var("FLOW_RUNTIME_SLOW_ACTION_THRESHOLD") + .ok() + .and_then(|v| v.parse().ok()) + .map(Duration::from_millis) + .unwrap_or(Duration::from_millis(300)) +}); /// A processor in the execution DAG. #[derive(Debug)] @@ -265,7 +273,7 @@ fn process( let now = time::Instant::now(); let result = processor.process(ctx, channel_manager); let elapsed = now.elapsed(); - if elapsed >= SLOW_ACTION_THRESHOLD { + if elapsed >= *SLOW_ACTION_THRESHOLD { slow_action_log!( parent: span, slow_logger, diff --git a/engine/runtime/runtime/src/feature_store.rs b/engine/runtime/runtime/src/feature_store.rs index 22de0bf6b..8d2118ce5 100644 --- a/engine/runtime/runtime/src/feature_store.rs +++ b/engine/runtime/runtime/src/feature_store.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use reearth_flow_state::State; @@ -37,7 +38,7 @@ impl Clone for Box { #[async_trait::async_trait] pub trait FeatureWriter: Send + Sync + Debug + FeatureWriterClone { fn edge_id(&self) -> EdgeId; - fn write(&mut self, feature: &Feature) -> Result<(), FeatureWriterError>; + async fn write(&mut self, feature: &Feature) -> Result<(), FeatureWriterError>; async fn flush(&self) -> Result<(), FeatureWriterError>; } @@ -49,11 +50,16 @@ pub fn create_feature_writer(edge_id: EdgeId, state: Arc) -> Box, + thread_counter: Arc, } impl PrimaryKeyLookupFeatureWriter { pub(crate) fn new(edge_id: EdgeId, state: Arc) -> Self { - Self { edge_id, state } + Self { + edge_id, + state, + thread_counter: Arc::new(AtomicU64::new(0)), + } } } @@ -63,14 +69,28 @@ impl FeatureWriter for PrimaryKeyLookupFeatureWriter { self.edge_id.clone() } - fn write(&mut self, feature: &Feature) -> Result<(), FeatureWriterError> { + async fn write(&mut self, feature: &Feature) -> Result<(), FeatureWriterError> { let item: serde_json::Value = feature.clone().into(); - self.state - .append_sync(&item, self.edge_id.to_string().as_str()) - .map_err(|e| FeatureWriterError::Flush(e.to_string())) + self.thread_counter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let result = self + .state + .append(&item, self.edge_id.to_string().as_str()) + .await + .map_err(|e| FeatureWriterError::Flush(e.to_string())); + self.thread_counter + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + result } async fn flush(&self) -> Result<(), FeatureWriterError> { + while self + .thread_counter + .load(std::sync::atomic::Ordering::Relaxed) + > 0 + { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } Ok(()) } } diff --git a/engine/runtime/runtime/src/forwarder.rs b/engine/runtime/runtime/src/forwarder.rs index 75577873d..2680ea63b 100644 --- a/engine/runtime/runtime/src/forwarder.rs +++ b/engine/runtime/runtime/src/forwarder.rs @@ -52,13 +52,12 @@ pub struct ChannelManager { impl ChannelManager { #[inline] pub fn send_op(&mut self, ctx: ExecutorContext) -> Result<(), ExecutionError> { - if let Some(writer) = self.feature_writers.get_mut(&ctx.port) { - match writer.write(&ctx.feature) { - Ok(()) => {} - Err(e) => { - self.error_manager.report(e.into()); - } - } + if let Some(writer) = self.feature_writers.get(&ctx.port) { + let mut writer = writer.clone(); + let feature = ctx.feature.clone(); + self.runtime.spawn(async move { + let _ = writer.write(&feature).await; + }); } if let Some((last_sender, senders)) = self.senders.split_last() {