Skip to content

Commit

Permalink
rename internal variables
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Sep 26, 2024
1 parent f61c7ea commit 80abc79
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,30 @@ func (b *BlockBuilder) stopping(_ error) error {
func (b *BlockBuilder) running(ctx context.Context) error {
// Do initial consumption on start using current time as the point up to which we are consuming.
// To avoid small blocks at startup, we consume until the <consume interval> boundary + buffer.
cycleEnd := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
err := b.nextConsumeCycle(ctx, cycleEnd)
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
err := b.nextConsumeCycle(ctx, cycleEndTime)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

cycleEnd, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
cycleEndTime, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
for {
select {
case <-time.After(waitDur):
level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEnd)
err := b.nextConsumeCycle(ctx, cycleEnd)
level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEndTime)
err := b.nextConsumeCycle(ctx, cycleEndTime)
if err != nil && !errors.Is(err, context.Canceled) {
// Fail the whole service in case of a non-recoverable error.
return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEnd, err)
return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEndTime, err)
}

// If we took more than ConsumeInterval to consume the records, this will immediately start the next consumption.
// TODO(codesome): track waitDur < 0, which is the time we ran over. Should have an alert on this.
cycleEnd = cycleEnd.Add(b.cfg.ConsumeInterval)
waitDur = time.Until(cycleEnd)
cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeInterval)
waitDur = time.Until(cycleEndTime)
case <-ctx.Done():
level.Info(b.logger).Log("msg", "context cancelled, stopping")
return nil
Expand Down Expand Up @@ -169,7 +169,7 @@ func nextCycleEnd(t time.Time, interval, buffer time.Duration) (time.Time, time.
// nextConsumeCycle manages consumption of currently assigned partitions.
// The cycleEnd argument indicates the timestamp (relative to Kafka records) up until which to consume from partitions
// in this cycle. That is, Kafka records produced after the cycleEnd mark will be consumed in the next cycle.
func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) error {
func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.Time) error {
defer func(t time.Time) {
b.blockBuilderMetrics.consumeCycleDuration.Observe(time.Since(t).Seconds())
}(time.Now())
Expand All @@ -184,14 +184,14 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
// in the beginning of the cycle.
// Lag is the upperbound number of records we'll have to consume from Kafka to build the blocks.
// It's the "upperbound" because the consumption may be stopped earlier if we get records containing
// samples with timestamp greater than the cycleEnd timestamp.
// samples with timestamp greater than the cycleEndTime timestamp.
lag, err := b.getLagForPartition(ctx, partition)
if err != nil {
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd)
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime)
continue
}
if err := lag.Err; err != nil {
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd)
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime)
continue
}

Expand All @@ -203,7 +203,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
}

state := partitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis)
if err := b.consumePartition(ctx, partition, state, cycleEnd, lag.End.Offset); err != nil {
if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition)
}
}
Expand Down Expand Up @@ -288,32 +288,32 @@ func partitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM

// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, it takes care of consuming it in sections.
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEnd time.Time, cycleEndOffset int64) (err error) {
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) {
builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")

// Section is a portion of the partition to process in a single pass. One cycle may process multiple sections if the partition is lagging.
sectionEnd := cycleEnd
if sectionEnd.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) {
sectionEndTime := cycleEndTime
if sectionEndTime.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) {
// We are lagging behind by more than 1.5*interval or there is no commit. We need to consume the partition in sections.
// We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEnd,
// i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEnd)
// We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEndTime,
// i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEndTime)
// where T is the CommitRecordTimestamp, the timestamp of the record, whose offset we committed previously.
// When there is no kafka commit, we play safe and assume LastSeenOffset, and LastBlockEnd were 0 to not discard any samples unnecessarily.
sectionEnd, _ = nextCycleEnd(
sectionEndTime, _ = nextCycleEnd(
state.CommitRecordTimestamp,
b.cfg.ConsumeInterval,
b.cfg.ConsumeIntervalBuffer,
)
level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "section_end", sectionEnd, "cycle_end", cycleEnd, "cycle_end_offset", cycleEndOffset, "commit_rec_ts", state.CommitRecordTimestamp)
level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "section_end", sectionEndTime, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset, "commit_rec_ts", state.CommitRecordTimestamp)
}
for !sectionEnd.After(cycleEnd) {
logger := log.With(b.logger, "partition", partition, "section_end", sectionEnd, "cycle_end_offset", cycleEndOffset)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEnd, cycleEndOffset)
for !sectionEndTime.After(cycleEndTime) {
logger := log.With(b.logger, "partition", partition, "section_end", sectionEndTime, "cycle_end_offset", cycleEndOffset)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset)
if err != nil {
return fmt.Errorf("consume partition %d: %w", partition, err)
}
sectionEnd = sectionEnd.Add(b.cfg.ConsumeInterval)
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval)
}

return nil
Expand All @@ -325,12 +325,12 @@ func (b *BlockBuilder) consumePartitionSection(
builder *TSDBBuilder,
partition int32,
state partitionState,
sectionEnd time.Time,
sectionEndTime time.Time,
cycleEndOffset int64,
) (retState partitionState, retErr error) {
// Oppose to the section's range (and cycle's range), that include the ConsumeIntervalBuffer, the block's range doesn't.
// Thus, truncate the timestamp with ConsumptionInterval here to round the block's range.
blockEnd := sectionEnd.Truncate(b.cfg.ConsumeInterval)
blockEnd := sectionEndTime.Truncate(b.cfg.ConsumeInterval)

var numBlocks int
defer func(t time.Time, startState partitionState) {
Expand All @@ -349,7 +349,7 @@ func (b *BlockBuilder) consumePartitionSection(
b.blockBuilderMetrics.processPartitionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(dur.Seconds())
level.Info(logger).Log("msg", "done consuming", "duration", dur,
"last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd,
"last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset, "cycle_end_offset", cycleEndOffset,
"last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset,
"num_blocks", numBlocks)
}(time.Now(), state)

Expand All @@ -364,7 +364,7 @@ func (b *BlockBuilder) consumePartitionSection(
})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{b.cfg.Kafka.Topic: {partition}})

level.Info(logger).Log("msg", "start consuming", "offset", state.Commit.At, "cycle_end_offset", cycleEndOffset)
level.Info(logger).Log("msg", "start consuming", "offset", state.Commit.At)

var (
firstRec *kgo.Record
Expand All @@ -381,7 +381,7 @@ consumerLoop:
// PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands,
// we cannot tell if the consumer has already reached the latest end of the partition, i.e. no more records to consume,
// or there is more data in the backlog, and we must retry the poll. That's why the consumer loop above has to guard
// the iterations against the partitionState, so it retried the polling up until the expected end of the partition's reached.
// the iterations against the cycleEndOffset, so it retried the polling up until the expected end of the partition is reached.
fetches := b.kafkaClient.PollFetches(ctx)
fetches.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, context.Canceled) {
Expand All @@ -398,9 +398,9 @@ consumerLoop:
firstRec = rec
}

// Stop consuming after we reached the sectionEnd marker.
// Stop consuming after we reached the sectionEndTime marker.
// NOTE: the timestamp of the record is when the record was produced relative to distributor's time.
if rec.Timestamp.After(sectionEnd) {
if rec.Timestamp.After(sectionEndTime) {
break consumerLoop
}

Expand Down

0 comments on commit 80abc79

Please sign in to comment.