Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove extra channel hop from source output path #10912

Open
lukesteensen opened this issue Jan 19, 2022 · 0 comments
Open

Remove extra channel hop from source output path #10912

lukesteensen opened this issue Jan 19, 2022 · 0 comments
Assignees
Labels
domain: performance Anything related to Vector's performance domain: sources Anything related to the Vector's sources domain: topology Anything related to Vector's topology code type: enhancement A value-adding code change that enhances its existing functionality.

Comments

@lukesteensen
Copy link
Member

The way that sources currently plug into the topology is a bit of a special case. They are passed a SourceSender when they're built, which is essentially a glorified channel Sender. The corresponding Receiver for that channel lives in what we call a "pump" task, which has the simple job of shoveling events written to that channel into the source's Fanout instance. The Fanout instance itself contains all of the inputs configured to draw from that particular source. An "input" in this case refers to what's basically another glorified channel Sender whose Receiver feeds the downstream component task.

So at a high level, the path of an event out of a source looks like something like:

source task -> channel -> pump task -> fanout -> channel -> downstream component

This has a number of downsides:

  1. Two channels makes buffer sizes harder to reason about
  2. The pump task feels like redundant CPU work and an unnecessary serialization point
  3. The pump task has gotten more complex with the addition of multiple outputs
  4. The actual source task and pump task are treated different in the RunningTopology struct, adding complexity and making it harder to reason about

One potential solution here is to embed the fanout directly within SourceSender. This would remove the extra hop and the extra task, hopefully improving efficiency and making sources less of a special case in how they integrate with the topology.

That change would require quite a bit of work to Fanout, mostly revolving around the requirement of SourceSender to be Clone. Internally, Fanout is mostly a collection of futures::Sinks and a ControlMessage channel receiver. The sinks can be addressed relatively easily by changing the bound to CloneableSink from the buffers crate (they're already channel senders in the common case, which are Clone, or TapSink which can easily be made so).

The channel receiver is a bit more complex. Its purpose is to receive ControlMessages that add or remove sinks during config reloads. It's currently an unbounded channel to account for the fact that in theory an unbounded number of reloads could between times that the channel is read, which is currently only when an event is received. We could simply switch it to an unbounded broadcast channel if we found a suitable implementation (I haven't seen one), or a bounded broadcast channel with a bound high enough to be effectively unbounded (this might be ok, reloads are rare), but it could be worth taking the time here to rethink the pattern more deeply.

There are some warts that make the current fanout implementation confusing:

  • the Sink trait
  • the i struct member for tracking sink index across polls
  • the Replace control message
  • the fact that the control channel needs to be unbounded
  • etc

It's possible that by addressing those warts in some way we end up in a situation where we no longer need or want an unbounded broadcast channel in order to make Fanout cloneable, but the required adjustments to the mechanics of config reloading are probably outside the scope of this issue.

@lukesteensen lukesteensen added type: enhancement A value-adding code change that enhances its existing functionality. domain: topology Anything related to Vector's topology code domain: sources Anything related to the Vector's sources domain: performance Anything related to Vector's performance labels Jan 19, 2022
@blt blt self-assigned this Jan 31, 2022
blt added a commit that referenced this issue Feb 1, 2022
One of the oddities of `Fanout` was the use of an `i` to index sinks. This was,
partially, preserved across polls but was not in general use when looping. It is
my understanding that `Fanout.i` was ultimately vestigial and any actual
indexing was reset each poll. I think, as a result, we would also repoll the
same sink multiple times when removals happened, which should be rare in
practice but was possible. I have extracted the vector and index munging into a
`Store` type. We should now no longer poll underlying sinks multiple times and
calling code does not have to munge indexes, although it is required to manually
advance/reset a 'cursor' because we're changing the shape of an iterator while
iterating it.

The primary difference here is the use of `swap_remove` instead of
`remove`. This saves a shift.

I expect no performance change here. I think, ultimately, this is a stepping
stone to getting the logic here very explicit so we can start to do broadcasting
in a way that is not impeded by slow receivers downstream.

REF #10144
REF #10912

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
blt added a commit that referenced this issue Feb 2, 2022
One of the oddities of `Fanout` was the use of an `i` to index sinks. This was,
partially, preserved across polls but was not in general use when looping. It is
my understanding that `Fanout.i` was ultimately vestigial and any actual
indexing was reset each poll. I think, as a result, we would also repoll the
same sink multiple times when removals happened, which should be rare in
practice but was possible. I have extracted the vector and index munging into a
`Store` type. We should now no longer poll underlying sinks multiple times and
calling code does not have to munge indexes, although it is required to manually
advance/reset a 'cursor' because we're changing the shape of an iterator while
iterating it.

The primary difference here is the use of `swap_remove` instead of
`remove`. This saves a shift.

I expect no performance change here. I think, ultimately, this is a stepping
stone to getting the logic here very explicit so we can start to do broadcasting
in a way that is not impeded by slow receivers downstream.

REF #10144
REF #10912

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: performance Anything related to Vector's performance domain: sources Anything related to the Vector's sources domain: topology Anything related to Vector's topology code type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
Development

No branches or pull requests

2 participants