Skip to content

Commit

Permalink
chore(prometheus_remote_write sink): remote write sink rewrite (#18676)
Browse files Browse the repository at this point in the history
* Refactor prometheus remote write sink

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* WIP

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Made tests pass

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use shared compression

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Some comments

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use HttpResponse

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update request builder default

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update PartitionBatcher to use BatchConfig

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update PartitionBatcher to use BatchConfig

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Remove zorkwonk

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Make into fns as fns instead

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Spelling

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Don't box the closure

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Only insert timeout if we add the batch to the list

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow the timer to remove an item

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow partitions to be types other than Vec

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Added test for aggregation

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow a custom object to be used for the reducer

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Make aggregating optional

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Adde test for non aggregation

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Component docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Kyle and Doug

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use generic compression options

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Default compression to Snappy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Add snappy to the compression docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Remove proptest file

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Oct 24, 2023
1 parent 1eaf8b1 commit 64f73f0
Show file tree
Hide file tree
Showing 40 changed files with 1,593 additions and 955 deletions.
55 changes: 26 additions & 29 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf};

use crate::batcher::{
config::BatchConfigParts,
data::BatchReduce,
data::BatchData,
limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit},
BatchConfig,
};
Expand Down Expand Up @@ -155,16 +155,15 @@ impl BatcherSettings {
}

/// A batcher config using the `ItemBatchSize` trait to determine batch sizes.
/// The output is built with the supplied reducer function.
pub fn into_reducer_config<I, T, F, S>(
self,
/// The output is built with the supplied object implementing [`BatchData`].
pub fn as_reducer_config<I, T, B>(
&self,
item_size: I,
reducer: F,
) -> BatchConfigParts<SizeLimit<I>, BatchReduce<F, S>>
reducer: B,
) -> BatchConfigParts<SizeLimit<I>, B>
where
I: ItemBatchSize<T>,
F: FnMut(&mut S, T),
S: Default,
B: BatchData<T>,
{
BatchConfigParts {
batch_limiter: SizeLimit {
Expand All @@ -173,14 +172,14 @@ impl BatcherSettings {
current_size: 0,
item_size_calculator: item_size,
},
batch_data: BatchReduce::new(reducer),
batch_data: reducer,
timeout: self.timeout,
}
}
}

#[pin_project]
pub struct PartitionedBatcher<St, Prt, KT, C, F>
pub struct PartitionedBatcher<St, Prt, KT, C, F, B>
where
Prt: Partitioner,
{
Expand All @@ -193,7 +192,7 @@ where
/// The store of 'closed' batches. When this is not empty it will be
/// preferentially flushed prior to consuming any new items from the
/// underlying stream.
closed_batches: Vec<(Prt::Key, Vec<Prt::Item>)>,
closed_batches: Vec<(Prt::Key, B)>,
/// The queue of pending batch expirations
timer: KT,
/// The partitioner for this `Batcher`
Expand All @@ -203,7 +202,7 @@ where
stream: Fuse<St>,
}

impl<St, Prt, C, F> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F>
impl<St, Prt, C, F, B> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Expand All @@ -226,7 +225,7 @@ where
}

#[cfg(test)]
impl<St, Prt, KT, C, F> PartitionedBatcher<St, Prt, KT, C, F>
impl<St, Prt, KT, C, F, B> PartitionedBatcher<St, Prt, KT, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Expand All @@ -247,17 +246,17 @@ where
}
}

impl<St, Prt, KT, C, F> Stream for PartitionedBatcher<St, Prt, KT, C, F>
impl<St, Prt, KT, C, F, B> Stream for PartitionedBatcher<St, Prt, KT, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Prt::Key: Eq + Hash + Clone,
Prt::Item: ByteSizeOf,
KT: KeyedTimer<Prt::Key>,
C: BatchConfig<Prt::Item, Batch = Vec<Prt::Item>>,
C: BatchConfig<Prt::Item, Batch = B>,
F: Fn() -> C + Send,
{
type Item = (Prt::Key, Vec<Prt::Item>);
type Item = (Prt::Key, B);

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
Expand All @@ -270,20 +269,18 @@ where
return Poll::Ready(this.closed_batches.pop());
}
match this.stream.as_mut().poll_next(cx) {
Poll::Pending => {
match this.timer.poll_expired(cx) {
// Unlike normal streams, `DelayQueue` can return `None`
// here but still be usable later if more entries are added.
Poll::Pending | Poll::Ready(None) => return Poll::Pending,
Poll::Ready(Some(item_key)) => {
let mut batch = this
.batches
.remove(&item_key)
.expect("batch should exist if it is set to expire");
this.closed_batches.push((item_key, batch.take_batch()));
}
Poll::Pending => match this.timer.poll_expired(cx) {
// Unlike normal streams, `DelayQueue` can return `None`
// here but still be usable later if more entries are added.
Poll::Pending | Poll::Ready(None) => return Poll::Pending,
Poll::Ready(Some(item_key)) => {
let mut batch = this
.batches
.remove(&item_key)
.expect("batch should exist if it is set to expire");
this.closed_batches.push((item_key, batch.take_batch()));
}
}
},
Poll::Ready(None) => {
// Now that the underlying stream is closed, we need to
// clear out our batches, including all expiration
Expand Down
Loading

0 comments on commit 64f73f0

Please sign in to comment.