Skip to content

Commit

Permalink
refactor(engine): replace sender with buffer in MVTWriter and update … (
Browse files Browse the repository at this point in the history
miseyu authored Oct 21, 2024
1 parent 40a4d1f commit adffad5
Showing 7 changed files with 90 additions and 75 deletions.
11 changes: 4 additions & 7 deletions engine/runtime/action-sink/src/file/cesium3dtiles/sink.rs
Original file line number Diff line number Diff line change
@@ -139,19 +139,16 @@ impl Sink for Cesium3DTilesWriter {

fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError> {
let Some(geometry) = ctx.feature.geometry.as_ref() else {
return Err(Box::new(SinkError::FileWriter(
"Unsupported input".to_string(),
)));
return Err(SinkError::Cesium3DTilesWriter("Unsupported input".to_string()).into());
};
let geometry_value = geometry.value.clone();
let feature = ctx.feature;
match geometry_value {
geometry_types::GeometryValue::CityGmlGeometry(_) => {
self.buffer.push(ctx.feature.clone());
self.buffer.push(feature);
}
_ => {
return Err(Box::new(SinkError::Cesium3DTilesWriter(
"Unsupported input".to_string(),
)));
return Err(SinkError::Cesium3DTilesWriter("Unsupported input".to_string()).into());
}
}

103 changes: 45 additions & 58 deletions engine/runtime/action-sink/src/file/mvt/sink.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ use std::io::Write;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::vec;

use flate2::{write::ZlibEncoder, Compression};
@@ -83,7 +82,7 @@ impl SinkFactory for MVTSinkFactory {
};

let sink = MVTWriter {
sender: None,
buffer: Vec::new(),
params,
};
Ok(Box::new(sink))
@@ -93,7 +92,7 @@ impl SinkFactory for MVTSinkFactory {
#[derive(Debug, Clone)]
pub struct MVTWriter {
pub(super) params: MVTWriterParam,
pub(super) sender: Option<std::sync::mpsc::SyncSender<Feature>>,
pub(super) buffer: Vec<Feature>,
}

#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
@@ -116,55 +115,16 @@ impl Sink for MVTWriter {
"MVTWriter"
}

fn initialize(&mut self, ctx: NodeContext) {
let tile_id_conv = TileIdMethod::Hilbert;
let expr_engine = Arc::clone(&ctx.expr_engine);
let output = self.params.output.clone();
let scope = expr_engine.new_scope();
let path = scope
.eval::<String>(output.as_ref())
.unwrap_or_else(|_| output.as_ref().to_string());
let output = Uri::from_str(path.as_str()).expect("Failed to parse output path");

let (sender_upstream, receiver_upstream) = std::sync::mpsc::sync_channel(2000);
let (sender_sliced, receiver_sliced) = std::sync::mpsc::sync_channel(2000);
let (sender_sorted, receiver_sorted) = std::sync::mpsc::sync_channel(2000);
self.sender = Some(sender_upstream);
let params = self.params.clone();
thread::spawn(move || {
let _ = geometry_slicing_stage(receiver_upstream, tile_id_conv, sender_sliced, &params);
});
thread::spawn(move || {
let _ = feature_sorting_stage(receiver_sliced, sender_sorted);
});
thread::spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.use_current_thread()
.build()
.unwrap();
pool.install(|| {
let _ =
tile_writing_stage(ctx.as_context(), &output, receiver_sorted, tile_id_conv);
})
});
}

fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError> {
let Some(geometry) = ctx.feature.geometry.as_ref() else {
return Err(Box::new(SinkError::FileWriter(
let feature = ctx.feature;
let Some(geometry) = feature.geometry.as_ref() else {
return Err(Box::new(SinkError::MvtWriter(
"Unsupported input".to_string(),
)));
};
let sender = self
.sender
.as_ref()
.ok_or_else(|| SinkError::FileWriter("Failed to get sender".to_string()))?;
let geometry_value = geometry.value.clone();
match geometry_value {
match geometry.value {
geometry_types::GeometryValue::CityGmlGeometry(_) => {
sender.send(ctx.feature.clone()).map_err(|e| {
SinkError::FileWriter(format!("Failed to send feature: {:?}", e))
})?;
self.buffer.push(feature);
}
_ => {
return Err(Box::new(SinkError::MvtWriter(
@@ -175,32 +135,59 @@ impl Sink for MVTWriter {

Ok(())
}

#[allow(dropping_references)]
fn finish(&self, _ctx: NodeContext) -> Result<(), BoxedError> {
let sender = self
.sender
.as_ref()
.ok_or_else(|| SinkError::FileWriter("Failed to get sender".to_string()))?;
drop(sender);
fn finish(&self, ctx: NodeContext) -> Result<(), BoxedError> {
let upstream = &self.buffer;
let tile_id_conv = TileIdMethod::Hilbert;
let expr_engine = Arc::clone(&ctx.expr_engine);
let output = self.params.output.clone();
let scope = expr_engine.new_scope();
let path = scope
.eval::<String>(output.as_ref())
.unwrap_or_else(|_| output.as_ref().to_string());
let output = Uri::from_str(path.as_str())?;

std::thread::scope(|scope| {
let (sender_sliced, receiver_sliced) = std::sync::mpsc::sync_channel(2000);
let (sender_sorted, receiver_sorted) = std::sync::mpsc::sync_channel(2000);
scope.spawn(|| {
let _ = geometry_slicing_stage(upstream, tile_id_conv, sender_sliced, &self.params);
});
scope.spawn(|| {
let _ = feature_sorting_stage(receiver_sliced, sender_sorted);
});
scope.spawn(|| {
let pool = rayon::ThreadPoolBuilder::new()
.use_current_thread()
.build()
.unwrap();
pool.install(|| {
let _ = tile_writing_stage(
ctx.as_context(),
&output,
receiver_sorted,
tile_id_conv,
);
})
});
});
Ok(())
}
}

fn geometry_slicing_stage(
upstream: std::sync::mpsc::Receiver<Feature>,
upstream: &[Feature],
tile_id_conv: TileIdMethod,
sender_sliced: std::sync::mpsc::SyncSender<(u64, Vec<u8>)>,
mvt_options: &MVTWriterParam,
) -> crate::errors::Result<()> {
let bincode_config = bincode::config::standard();

// Convert CityObjects to sliced features
upstream.into_iter().par_bridge().try_for_each(|feature| {
upstream.iter().par_bridge().try_for_each(|feature| {
let max_detail = 12; // 4096
let buffer_pixels = 5;
slice_cityobj_geoms(
&feature,
feature,
&mvt_options.layer_name,
mvt_options.min_zoom,
mvt_options.max_zoom,
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod helper;

fn main() {
helper::execute("data-convert/04-luse-lsld/workflow.yml");
helper::execute("data-convert/01-bldg/workflow.yml");
}
2 changes: 2 additions & 0 deletions engine/runtime/runtime/src/errors.rs
Original file line number Diff line number Diff line change
@@ -86,6 +86,8 @@ pub enum ExecutionError {
RestoreRecordWriter(#[source] DeserializationError),
#[error("Source error: {0}")]
Source(#[source] BoxedError),
#[error("Processor error: {0}")]
Processor(#[source] BoxedError),
#[error("Sink error: {0}")]
Sink(#[source] BoxedError),
#[error("ChannelManager error: {0}")]
13 changes: 11 additions & 2 deletions engine/runtime/runtime/src/executor/processor_node.rs
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ impl<F: Future + Unpin + Debug> ProcessorNode<F> {
panic!("Must pass in a node")
};
let node_handle = node.handle.clone();
let NodeKind::Processor(mut processor) = kind else {
let NodeKind::Processor(processor) = kind else {
panic!("Must pass in a processor node");
};
let (node_handles, receivers) = dag.collect_receivers(node_index);
@@ -117,7 +117,6 @@ impl<F: Future + Unpin + Debug> ProcessorNode<F> {
let expr_engine = Arc::clone(&ctx.expr_engine);
let storage_resolver = Arc::clone(&ctx.storage_resolver);
let kv_store = Arc::clone(&ctx.kv_store);
processor.initialize(ctx);
let num_threads = processor.num_threads();
Self {
node_handle,
@@ -167,6 +166,16 @@ impl<F: Future + Unpin + Debug> ReceiverLoop for ProcessorNode<F> {
let span = self.span.clone();
let logger = self.logger.clone();
let now = time::Instant::now();
let processor = Arc::clone(&self.processor);
processor
.write()
.initialize(NodeContext::new(
self.expr_engine.clone(),
self.storage_resolver.clone(),
self.logger_factory.clone(),
self.kv_store.clone(),
))
.map_err(ExecutionError::Processor)?;
action_log!(
parent: span, logger, "{:?} process start...", self.processor.read().name(),
);
24 changes: 21 additions & 3 deletions engine/runtime/runtime/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,9 @@ use std::{
use crossbeam::channel::Receiver;
use futures::Future;
use petgraph::graph::NodeIndex;
use reearth_flow_action_log::{action_log, ActionLogger};
use reearth_flow_action_log::{action_log, factory::LoggerFactory, ActionLogger};
use reearth_flow_eval_expr::engine::Engine;
use reearth_flow_storage::resolve::StorageResolver;
use tokio::runtime::Handle;
use tracing::info_span;

@@ -19,6 +21,7 @@ use crate::{
errors::ExecutionError,
event::Event,
executor_operation::{ExecutorContext, ExecutorOperation, NodeContext},
kvs::KvStore,
node::{NodeHandle, Sink},
};

@@ -46,7 +49,11 @@ pub struct SinkNode<F> {
#[allow(dead_code)]
runtime: Arc<Handle>,
logger: Arc<ActionLogger>,
logger_factory: Arc<LoggerFactory>,
span: tracing::Span,
expr_engine: Arc<Engine>,
storage_resolver: Arc<StorageResolver>,
kv_store: Arc<Box<dyn KvStore>>,
}

impl<F: Future + Unpin + Debug> SinkNode<F> {
@@ -62,7 +69,7 @@ impl<F: Future + Unpin + Debug> SinkNode<F> {
panic!("Must pass in a node")
};
let node_handle = node.handle.clone();
let NodeKind::Sink(mut sink) = kind else {
let NodeKind::Sink(sink) = kind else {
panic!("Must pass in a sink node");
};

@@ -79,7 +86,6 @@ impl<F: Future + Unpin + Debug> SinkNode<F> {
"workflow.id" = dag.id.to_string().as_str(),
"node.id" = node_handle.id.to_string().as_str(),
);
sink.initialize(ctx);
Self {
node_handle,
node_handles,
@@ -91,6 +97,10 @@ impl<F: Future + Unpin + Debug> SinkNode<F> {
runtime,
logger: Arc::new(logger),
span,
logger_factory: ctx.logger.clone(),
expr_engine: ctx.expr_engine.clone(),
storage_resolver: ctx.storage_resolver.clone(),
kv_store: ctx.kv_store.clone(),
}
}

@@ -118,6 +128,14 @@ impl<F: Future + Unpin + Debug> ReceiverLoop for SinkNode<F> {
let span = self.span.clone();
let logger = self.logger.clone();
let mut sel = init_select(&receivers);
self.sink
.initialize(NodeContext {
logger: self.logger_factory.clone(),
expr_engine: self.expr_engine.clone(),
kv_store: self.kv_store.clone(),
storage_resolver: self.storage_resolver.clone(),
})
.map_err(ExecutionError::Sink)?;
action_log!(
parent: span, logger, "{:?} process start...", self.sink.name(),
);
10 changes: 6 additions & 4 deletions engine/runtime/runtime/src/node.rs
Original file line number Diff line number Diff line change
@@ -299,7 +299,9 @@ impl Clone for Box<dyn ProcessorFactory> {
}

pub trait Processor: Send + Sync + Debug + ProcessorClone {
fn initialize(&mut self, _ctx: NodeContext) {}
fn initialize(&mut self, _ctx: NodeContext) -> Result<(), BoxedError> {
Ok(())
}
fn num_threads(&self) -> usize {
1
}
@@ -358,7 +360,9 @@ impl Clone for Box<dyn SinkFactory> {
}

pub trait Sink: Send + Debug + SinkClone {
fn initialize(&mut self, _ctx: NodeContext) {}
fn initialize(&mut self, _ctx: NodeContext) -> Result<(), BoxedError> {
Ok(())
}

fn name(&self) -> &str;
fn process(&mut self, ctx: ExecutorContext) -> Result<(), BoxedError>;
@@ -421,8 +425,6 @@ pub struct Router {
}

impl Processor for Router {
fn initialize(&mut self, _ctx: NodeContext) {}

fn process(
&mut self,
ctx: ExecutorContext,

0 comments on commit adffad5

Please sign in to comment.