Skip to content

Commit

Permalink
Extract the ring-ish structure implicit in Fanout
Browse files Browse the repository at this point in the history
One of the oddities of `Fanout` was the use of an `i` to index sinks. This was,
partially, preserved across polls but was not in general use when looping. It is
my understanding that `Fanout.i` was ultimately vestigial and any actual
indexing was reset each poll. I think, as a result, we would also repoll the
same sink multiple times when removals happened, which should be rare in
practice but was possible. I have extracted the vector and index munging into a
`Store` type. We should now no longer poll underlying sinks multiple times and
calling code does not have to munge indexes, although it is required to manually
advance/reset a 'cursor' because we're changing the shape of an iterator while
iterating it.

The primary difference here is the use of `swap_remove` instead of
`remove`. This saves a shift.

I expect no performance change here. I think, ultimately, this is a stepping
stone to getting the logic here very explicit so we can start to do broadcasting
in a way that is not impeded by slow receivers downstream.

REF #10144
REF #10912

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 2, 2022
1 parent 251a7b4 commit 8dae1a3
Showing 1 changed file with 132 additions and 57 deletions.
189 changes: 132 additions & 57 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,106 @@ impl fmt::Debug for ControlMessage {

pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;

struct Store<K, V> {
index: usize,
inner: Vec<(K, V)>,
}

impl<K, V> Store<K, V>
where
K: PartialEq,
{
fn with_capacity(capacity: usize) -> Self {
Self {
index: 0,
inner: Vec::with_capacity(capacity),
}
}

fn len(&self) -> usize {
self.inner.len()
}

/// # Panics
///
/// Function will panic if the key is duplicate
fn push(&mut self, key: K, val: V) {
assert!(
!self.inner.iter().any(|(k, _)| k == &key),
"Duplicate output id in fanout"
);

self.inner.push((key, val));
}

/// Remove an stored element by its key
///
/// This function will remove an element from the store by its key.
///
/// This function is O(n).
///
/// # Panics
///
/// Function will panic if the inner cursor is in any state but initial.
fn remove_by_key(&mut self, key: &K) -> Option<V> {
assert!(
self.index == 0,
"Cursor is not in its initial state, cannot remove by key safely"
);

match self.inner.iter().position(|(k, _)| k == key) {
Some(idx) => Some(self.inner.swap_remove(idx).1),
None => None,
}
}

/// Removes the element at the current cursor
///
/// This function will remove the element at the current cursor
/// position. The element at the end of the inner store will be swaped to
/// the current cursor position and, so, the cursor SHOULD NOT be advanced
/// after calling `remove`. Doing so may cause elements to be skipped by
/// calls to `get_mut`.
///
/// This function is O(1).
///
fn remove(&mut self) {
self.inner.swap_remove(self.index);
}

fn replace(&mut self, key: &K, val: V) {
// NOTE if we added K: Ord we could use a binary search, although pushes
// would no longer be O(1). This function should be called rarely in
// practice so it kinda doesn't matter.
if let Some((_, existing)) = self.inner.iter_mut().find(|(k, _)| k == key) {
*existing = val;
} else {
panic!("Tried to replace a sink that's not already present");
}
}

/// Return a mutable reference to the current cursor, None if we've gone
/// past the end
fn get_mut(&mut self) -> Option<&mut V> {
self.inner.get_mut(self.index).map(|(_, v)| v)
}

/// Advance the internal cursor
///
/// Returns None if the inner store is empty or we've wrapped around to the
/// start of the store again.
fn advance(&mut self) {
self.index += 1;
}

/// Reset the internal cursor
fn reset_cursor(&mut self) {
self.index = 0;
}
}

pub struct Fanout {
sinks: Vec<(ComponentKey, Option<GenericEventSink>)>,
i: usize,
sinks: Store<ComponentKey, Option<GenericEventSink>>,
control_channel: mpsc::UnboundedReceiver<ControlMessage>,
}

Expand All @@ -41,8 +138,7 @@ impl Fanout {
let (control_tx, control_rx) = mpsc::unbounded_channel();

let fanout = Self {
sinks: vec![],
i: 0,
sinks: Store::with_capacity(16), // arbitrary smallish value
control_channel: control_rx,
};

Expand All @@ -54,39 +150,26 @@ impl Fanout {
/// # Panics
///
/// Function will panic if a sink with the same ID is already present.
pub fn add(&mut self, id: ComponentKey, sink: GenericEventSink) {
assert!(
!self.sinks.iter().any(|(n, _)| n == &id),
"Duplicate output id in fanout"
);

self.sinks.push((id, Some(sink)));
fn add(&mut self, id: ComponentKey, sink: GenericEventSink) {
self.sinks.push(id, Some(sink));
}

fn remove(&mut self, id: &ComponentKey) {
let i = self.sinks.iter().position(|(n, _)| n == id);
let i = i.expect("Didn't find output in fanout");

let (_id, removed) = self.sinks.remove(i);
let removed = self
.sinks
.remove_by_key(id)
.expect("Didn't find output in fanout");

if let Some(mut removed) = removed {
tokio::spawn(async move { removed.close().await });
}

if self.i > i {
self.i -= 1;
}
}

fn replace(&mut self, id: &ComponentKey, sink: Option<GenericEventSink>) {
if let Some((_, existing)) = self.sinks.iter_mut().find(|(n, _)| n == id) {
*existing = sink;
} else {
panic!("Tried to replace a sink that's not already present");
}
self.sinks.replace(id, sink);
}

pub fn process_control_messages(&mut self, cx: &mut Context<'_>) {
fn process_control_messages(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(message)) = self.control_channel.poll_recv(cx) {
match message {
ControlMessage::Add(id, sink) => self.add(id, sink),
Expand All @@ -97,15 +180,21 @@ impl Fanout {
}

#[inline]
fn handle_sink_error(&mut self, index: usize) -> Result<(), ()> {
/// Handles an errored sink by removing it from fanout rotation.
///
/// # Errors
///
/// This function will error-out if the underlying sink store has only a
/// single remaining member.
fn handle_sink_error(&mut self) -> Result<(), ()> {
// If there's only one sink, propagate the error to the source ASAP
// so it stops reading from its input. If there are multiple sinks,
// keep pushing to the non-errored ones (while the errored sink
// triggers a more graceful shutdown).
if self.sinks.len() == 1 {
Err(())
} else {
self.sinks.remove(index);
self.sinks.remove();
Ok(())
}
}
Expand All @@ -121,26 +210,22 @@ impl Fanout {
&mut Context<'_>,
) -> Poll<Result<(), ()>>,
{
self.sinks.reset_cursor();
self.process_control_messages(cx);

let mut poll_result = Poll::Ready(Ok(()));

let mut i = 0;
while let Some((_, sink)) = self.sinks.get_mut(i) {
while let Some(sink) = self.sinks.get_mut() {
if let Some(sink) = sink {
match poll(sink.as_mut(), cx) {
Poll::Pending => poll_result = Poll::Pending,
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(())) => {
self.handle_sink_error(i)?;
continue;
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => self.sinks.advance(),
Poll::Ready(Err(())) => self.handle_sink_error()?,
}
}
i += 1;
// TODO it's not clear to me why we wouldn't return Pending if the
// value is None, like we do in `poll_ready`.
}

poll_result
Poll::Ready(Ok(()))
}
}

Expand All @@ -150,14 +235,15 @@ impl Sink<Event> for Fanout {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
let this = self.get_mut();

this.sinks.reset_cursor();
this.process_control_messages(cx);

while let Some((_, sink)) = this.sinks.get_mut(this.i) {
while let Some(sink) = this.sinks.get_mut() {
match sink {
Some(sink) => match sink.as_mut().poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => this.i += 1,
Poll::Ready(Err(())) => this.handle_sink_error(this.i)?,
Poll::Ready(Ok(())) => this.sinks.advance(),
Poll::Ready(Err(())) => this.handle_sink_error()?,
},
// process_control_messages ended because control channel returned
// Pending so it's fine to return Pending here since the control
Expand All @@ -166,33 +252,22 @@ impl Sink<Event> for Fanout {
}
}

this.i = 0;

Poll::Ready(Ok(()))
}

fn start_send(mut self: Pin<&mut Self>, item: Event) -> Result<(), ()> {
let mut items = vec![item; self.sinks.len()];

let mut i = 1;
while let Some((_, sink)) = self.sinks.get_mut(i) {
self.sinks.reset_cursor();
while let Some(sink) = self.sinks.get_mut() {
if let Some(sink) = sink.as_mut() {
let item = items.pop().unwrap();
if sink.as_mut().start_send(item).is_err() {
self.handle_sink_error(i)?;
self.handle_sink_error()?;
continue;
}
}
i += 1;
}

if let Some((_, sink)) = self.sinks.first_mut() {
if let Some(sink) = sink.as_mut() {
let item = items.pop().unwrap();
if sink.as_mut().start_send(item).is_err() {
self.handle_sink_error(0)?;
}
}
self.sinks.advance();
}

Ok(())
Expand Down

0 comments on commit 8dae1a3

Please sign in to comment.