diff --git a/benches/isolated_buffering.rs b/benches/isolated_buffering.rs index 4b39fe91cf4bd..fb8503c2cd626 100644 --- a/benches/isolated_buffering.rs +++ b/benches/isolated_buffering.rs @@ -3,7 +3,7 @@ use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput} use futures::{ compat::{Future01CompatExt, Stream01CompatExt}, stream::BoxStream, - StreamExt, + SinkExt, StreamExt, }; use futures01::{stream, Sink, Stream}; use tempfile::tempdir; @@ -70,6 +70,32 @@ fn benchmark_buffers(c: &mut Criterion) { ); }); + group.bench_function("channels/futures", |b| { + b.iter_batched( + || { + let rt = runtime(); + + let (writer, mut reader) = futures::channel::mpsc::channel(100); + + let read_handle = rt.spawn(async move { while reader.next().await.is_some() {} }); + + (rt, writer, read_handle) + }, + |(mut rt, mut writer, read_handle)| { + let write_handle = rt.spawn(async move { + let mut stream = random_events(line_size).take(num_lines as u64).compat(); + while let Some(e) = stream.next().await { + writer.send(e).await.unwrap(); + } + }); + + rt.block_on(write_handle).unwrap(); + rt.block_on(read_handle).unwrap(); + }, + BatchSize::SmallInput, + ); + }); + group.bench_function("channels/tokio", |b| { b.iter_batched( || { diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs index d9b75428a80df..f46ae7744f0f9 100644 --- a/src/buffers/mod.rs +++ b/src/buffers/mod.rs @@ -1,7 +1,7 @@ -use crate::{config::Resource, sink::BoundedSink, Event}; +use crate::{config::Resource, Event}; #[cfg(feature = "leveldb")] use futures::compat::{Sink01CompatExt, Stream01CompatExt}; -use futures::{Sink, Stream}; +use futures::{channel::mpsc, Sink, SinkExt, Stream}; use futures01::task::AtomicTask; use pin_project::pin_project; use serde::{Deserialize, Serialize}; @@ -16,7 +16,6 @@ use std::{ }; #[cfg(feature = "leveldb")] use tokio::stream::StreamExt; -use tokio::sync::mpsc; #[cfg(feature = "leveldb")] pub mod disk; @@ -72,7 +71,9 @@ impl BufferInputCloner { pub fn get(&self) -> Box + Send> { match self { BufferInputCloner::Memory(tx, when_full) => { - let inner = BoundedSink::new(tx.clone()); + let inner = tx + .clone() + .sink_map_err(|error| error!(message = "Sender error.", %error)); if when_full == &WhenFull::DropNewest { Box::new(DropWhenFull::new(inner)) } else { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4db16f7f57e9a..4b260c9ad4155 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -19,10 +19,7 @@ use std::{ sync::{Arc, Mutex}, }; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; -use tokio::{ - sync::mpsc, - time::{timeout, Duration}, -}; +use tokio::time::{timeout, Duration}; pub struct Pieces { pub inputs: HashMap)>, @@ -56,7 +53,7 @@ pub async fn build_pieces( .iter() .filter(|(name, _)| diff.sources.contains_new(&name)) { - let (tx, rx) = mpsc::channel(1000); + let (tx, rx) = tokio::sync::mpsc::channel(1000); let pipeline = Pipeline::from_sender(tx, vec![]); let typetag = source.source_type(); @@ -115,7 +112,7 @@ pub async fn build_pieces( Ok(transform) => transform, }; - let (input_tx, input_rx) = mpsc::channel(100); + let (input_tx, input_rx) = futures::channel::mpsc::channel(100); let input_tx = buffers::BufferInputCloner::Memory(input_tx, buffers::WhenFull::Block); let (output, control) = Fanout::new(); diff --git a/src/transforms/util/runtime_transform.rs b/src/transforms/util/runtime_transform.rs index d5be5f9cf5ba0..442cb0eccbd74 100644 --- a/src/transforms/util/runtime_transform.rs +++ b/src/transforms/util/runtime_transform.rs @@ -81,6 +81,7 @@ where Box::pin( input_rx .map(Message::Process) + .fuse() .into_future() .map(move |(first, rest)| { // The first message is always `Message::Init`.