Skip to content

Commit

Permalink
Avoid the use of SinkExt in src/topology/builder.rs
Browse files Browse the repository at this point in the history
I'm starting to experiment with the removal of `Sink` implementation for
`Fanout` for #10144. My in-flight work is starting to sprawl uncomfortably so
this is a small patch to remove reliance on one of the related `Sink` traits.

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 4, 2022
1 parent f1400d9 commit f5d1d24
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::Instant,
};

use futures::{stream::FuturesOrdered, FutureExt, SinkExt, StreamExt, TryFutureExt};
use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
Expand Down Expand Up @@ -658,7 +658,7 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (output, control) = Fanout::new();
let (fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx);

Expand All @@ -674,15 +674,15 @@ fn build_task_transform(
let transform = t
.transform(Box::pin(filtered))
.flat_map(|events| futures::stream::iter(events.into_events()))
.map(Ok)
.forward(output.with(|event: Event| async {
.inspect(|event: &Event| {
emit!(&EventsSent {
count: 1,
byte_size: event.size_of(),
output: None,
});
Ok(event)
}))
})
.map(Ok)
.forward(fanout)
.boxed()
.map_ok(|_| {
debug!("Finished.");
Expand Down

0 comments on commit f5d1d24

Please sign in to comment.