Skip to content

Commit

Permalink
feat(ingestion_pipeline): concurrency improvements
Browse files Browse the repository at this point in the history
* Remove the clone from the node cache
* Run the node cache futures on tokio
* When flattening streams, do so unordered
  • Loading branch information
timonv committed Jun 14, 2024
1 parent 473e60e commit 8f10ec8
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,20 @@ impl IngestionPipeline {
let cache = Arc::new(cache);
self.stream = self
.stream
.try_filter(move |node| {
.try_filter_map(move |node| {
let cache = Arc::clone(&cache);
let node = node.clone();
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move {
if !cache.get(&node).await {
cache.set(&node).await;
tracing::debug!("Node not in cache, passing through");
true
Some(node)
} else {
tracing::debug!("Node in cache, skipping");
false
None
}
}))
.unwrap_or_else(|e| {
tracing::error!("Error filtering cached node: {:?}", e);
true
})
.map_err(anyhow::Error::from)
})
.boxed();
self
Expand Down Expand Up @@ -159,7 +155,7 @@ impl IngestionPipeline {
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten()
.try_flatten_unordered(self.concurrency)
.boxed();
self
}
Expand All @@ -185,9 +181,8 @@ impl IngestionPipeline {
)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten()
.try_flatten_unordered(self.concurrency)
.boxed();

self
Expand Down

0 comments on commit 8f10ec8

Please sign in to comment.