Skip to content

Commit

Permalink
feat(ingestion_stream): improved stream developer experience
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv committed Jun 23, 2024
1 parent 9004323 commit 25f165e
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 93 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions swiftide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { version = "0.1.40", features = ["log"] }
strum = "0.26.2"
strum_macros = "0.26.4"
num_cpus = "1.16.0"
pin-project-lite = "0.2"

# Integrations
async-openai = { version = "0.23.2", optional = true }
Expand Down
50 changes: 30 additions & 20 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Default for IngestionPipeline {
/// Creates a default `IngestionPipeline` with an empty stream, no storage, and a concurrency level equal to the number of CPUs.
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
stream: IngestionStream::empty(),
storage: Default::default(),
concurrency: num_cpus::get(),
}
Expand All @@ -47,7 +47,7 @@ impl IngestionPipeline {
pub fn from_loader(loader: impl Loader + 'static) -> Self {
let stream = loader.into_stream();
Self {
stream: stream.boxed(),
stream,
..Default::default()
}
}
Expand Down Expand Up @@ -95,7 +95,8 @@ impl IngestionPipeline {
}
.instrument(span)
})
.boxed();
.boxed()
.into();
self
}

Expand All @@ -120,7 +121,8 @@ impl IngestionPipeline {
async move { transformer.transform_node(node).await }.instrument(span)
})
.try_buffer_unordered(concurrency)
.boxed();
.boxed()
.into();

self
}
Expand Down Expand Up @@ -154,7 +156,7 @@ impl IngestionPipeline {
})
.try_buffer_unordered(concurrency) // First get the streams from each future
.try_flatten_unordered(concurrency) // Then flatten all the streams back into one
.boxed();
.boxed().into();
self
}

Expand All @@ -180,7 +182,8 @@ impl IngestionPipeline {
})
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.boxed();
.boxed()
.into();

self
}
Expand Down Expand Up @@ -210,7 +213,7 @@ impl IngestionPipeline {
})
.try_buffer_unordered(self.concurrency)
.try_flatten_unordered(self.concurrency)
.boxed();
.boxed().into();
} else {
self.stream = self
.stream
Expand All @@ -222,7 +225,8 @@ impl IngestionPipeline {
async move { storage.store(node).await }.instrument(span)
})
.try_buffer_unordered(self.concurrency)
.boxed();
.boxed()
.into();
}

self
Expand All @@ -232,7 +236,9 @@ impl IngestionPipeline {
///
/// Useful for rate limiting the ingestion pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms.
pub fn throttle(mut self, duration: impl Into<Duration>) -> Self {
self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into()).boxed();
self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into())
.boxed()
.into();
self
}

Expand All @@ -249,7 +255,8 @@ impl IngestionPipeline {
Err(_e) => None,
}
})
.boxed();
.boxed()
.into();
self
}

Expand All @@ -260,7 +267,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect(|result| tracing::debug!("Processing result: {:?}", result))
.boxed();
.boxed()
.into();
self
}

Expand All @@ -271,7 +279,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect_err(|e| tracing::error!("Error processing node: {:?}", e))
.boxed();
.boxed()
.into();
self
}

Expand All @@ -282,7 +291,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect_ok(|node| tracing::debug!("Processed node: {:?}", node))
.boxed();
.boxed()
.into();
self
}

Expand Down Expand Up @@ -333,7 +343,6 @@ mod tests {
use super::*;
use crate::ingestion::IngestionNode;
use crate::traits::*;
use futures_util::stream;
use mockall::Sequence;

/// Tests a simple run of the ingestion pipeline.
Expand All @@ -351,7 +360,7 @@ mod tests {
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| Box::pin(stream::iter(vec![Ok(IngestionNode::default())])));
.returning(|| vec![Ok(IngestionNode::default())].into());

transformer.expect_transform_node().returning(|mut node| {
node.chunk = "transformed".to_string();
Expand All @@ -363,7 +372,7 @@ mod tests {
.expect_batch_transform()
.times(1)
.in_sequence(&mut seq)
.returning(|nodes| Box::pin(stream::iter(nodes.into_iter().map(Ok))));
.returning(|nodes| IngestionStream::iter(nodes.into_iter().map(Ok)));
batch_transformer.expect_concurrency().returning(|| None);

chunker
Expand All @@ -377,7 +386,7 @@ mod tests {
node.chunk = format!("transformed_chunk_{}", i);
nodes.push(Ok(node));
}
Box::pin(stream::iter(nodes))
nodes.into()
});
chunker.expect_concurrency().returning(|| None);

Expand Down Expand Up @@ -409,7 +418,7 @@ mod tests {
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| Box::pin(stream::iter(vec![Ok(IngestionNode::default())])));
.returning(|| vec![Ok(IngestionNode::default())].into());
transformer
.expect_transform_node()
.returning(|_node| Err(anyhow::anyhow!("Error transforming node")));
Expand All @@ -435,11 +444,12 @@ mod tests {
.times(1)
.in_sequence(&mut seq)
.returning(|| {
Box::pin(stream::iter(vec![
vec![
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
]))
]
.into()
});
transformer
.expect_transform_node()
Expand Down
97 changes: 60 additions & 37 deletions swiftide/src/ingestion/ingestion_stream.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,67 @@
#![allow(clippy::from_over_into)]
//! This module defines the `IngestionStream` type, which is used for handling asynchronous streams of `IngestionNode` items in the ingestion pipeline.
//!
//! The `IngestionStream` type is a pinned, boxed, dynamically-dispatched stream that yields `Result<IngestionNode>` items. This type is essential for managing
//! and processing large volumes of data asynchronously, ensuring efficient and scalable ingestion workflows.
use anyhow::Result;
use futures_util::stream::Stream;
use futures_util::stream::{self, Stream};
use pin_project_lite::pin_project;
use std::pin::Pin;

use super::IngestionNode;

/// A type alias for a pinned, boxed, dynamically-dispatched stream of `IngestionNode` items.
///
/// This type is used in the ingestion pipeline to handle asynchronous streams of data. Each item in the stream is a `Result<IngestionNode>`,
/// allowing for error handling during the ingestion process. The `Send` trait is implemented to ensure that the stream can be safely sent
/// across threads, enabling concurrent processing.
///
/// # Type Definition
/// - `Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>`
///
/// # Components
/// - `Pin`: Ensures that the memory location of the stream is fixed, which is necessary for certain asynchronous operations.
/// - `Box<dyn Stream<Item = Result<IngestionNode>>>`: A heap-allocated, dynamically-dispatched stream that yields `Result<IngestionNode>` items.
/// - `Send`: Ensures that the stream can be sent across thread boundaries, facilitating concurrent processing.
///
/// # Usage
/// The `IngestionStream` type is typically used in the ingestion pipeline to process data asynchronously. It allows for efficient handling
/// of large volumes of data by leveraging Rust's asynchronous capabilities.
///
/// # Error Handling
/// Each item in the stream is a `Result<IngestionNode>`, which means that errors can be propagated and handled during the ingestion process.
/// This design allows for robust error handling and ensures that the ingestion pipeline can gracefully handle failures.
///
/// # Performance Considerations
/// The use of `Pin` and `Box` ensures that the stream's memory location is fixed and heap-allocated, respectively. This design choice is
/// crucial for asynchronous operations that require stable memory addresses. Additionally, the `Send` trait enables concurrent processing,
/// which can significantly improve performance in multi-threaded environments.
///
/// # Edge Cases
/// - The stream may yield errors (`Err` variants) instead of valid `IngestionNode` items. These errors should be handled appropriately
/// to ensure the robustness of the ingestion pipeline.
/// - The stream must be pinned to ensure that its memory location remains fixed, which is necessary for certain asynchronous operations.
pub type IngestionStream = Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>;
pub use futures_util::{StreamExt, TryStreamExt};

// We need to inform the compiler that `inner` is pinned as well
pin_project! {
/// An asynchronous stream of `IngestionNode` items.
///
/// Wraps an internal stream of `Result<IngestionNode>` items.
///
/// Streams, iterators and vectors of `Result<IngestionNode>` can be converted into an `IngestionStream`.
pub struct IngestionStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>,
}
}

impl Stream for IngestionStream {
type Item = Result<IngestionNode>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(cx)
}
}

impl Into<IngestionStream> for Vec<Result<IngestionNode>> {
fn into(self) -> IngestionStream {
IngestionStream::iter(self)
}
}

impl Into<IngestionStream> for Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>> {
fn into(self) -> IngestionStream {
IngestionStream { inner: self }
}
}

impl IngestionStream {
pub fn empty() -> Self {
IngestionStream {
inner: stream::empty().boxed(),
}
}

// NOTE: Can we really guarantee that the iterator will outlive the stream?
pub fn iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = Result<IngestionNode>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
{
IngestionStream {
inner: stream::iter(iter).boxed(),
}
}
}
7 changes: 3 additions & 4 deletions swiftide/src/integrations/qdrant/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use anyhow::Result;
use async_trait::async_trait;
use futures_util::{stream, StreamExt};

use crate::{
ingestion::{IngestionNode, IngestionStream},
Expand Down Expand Up @@ -82,7 +81,7 @@ impl Persist for Qdrant {
.collect::<Result<Vec<_>>>();

if points.is_err() {
return stream::iter(vec![Err(points.unwrap_err())]).boxed();
return vec![Err(points.unwrap_err())].into();
}

let points = points.unwrap();
Expand All @@ -93,9 +92,9 @@ impl Persist for Qdrant {
.await;

if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
IngestionStream::iter(nodes.into_iter().map(Ok))
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
vec![Err(result.unwrap_err())].into()
}
}
}
9 changes: 4 additions & 5 deletions swiftide/src/integrations/redis/persist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use futures_util::{stream, StreamExt};

use crate::{
ingestion::{IngestionNode, IngestionStream},
Expand Down Expand Up @@ -58,7 +57,7 @@ impl Persist for Redis {
.collect::<Result<Vec<_>>>();

if args.is_err() {
return stream::iter(vec![Err(args.unwrap_err())]).boxed();
return vec![Err(args.unwrap_err())].into();
}

let args = args.unwrap();
Expand All @@ -70,12 +69,12 @@ impl Persist for Redis {
.context("Error persisting to redis");

if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
IngestionStream::iter(nodes.into_iter().map(Ok))
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
IngestionStream::iter([Err(result.unwrap_err())])
}
} else {
stream::iter(vec![Err(anyhow::anyhow!("Failed to connect to Redis"))]).boxed()
IngestionStream::iter([Err(anyhow::anyhow!("Failed to connect to Redis"))])
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions swiftide/src/loaders/file_loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{ingestion::IngestionNode, ingestion::IngestionStream, Loader};
use futures_util::{stream, StreamExt};
use std::path::PathBuf;

/// The `FileLoader` struct is responsible for loading files from a specified directory,
Expand Down Expand Up @@ -103,7 +102,7 @@ impl Loader for FileLoader {
})
});

stream::iter(file_paths).boxed()
IngestionStream::iter(file_paths)
}
}

Expand Down
Loading

0 comments on commit 25f165e

Please sign in to comment.