From 80abc797846e42f37871222cff4c7fbd090e6d4d Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Thu, 26 Sep 2024 08:58:42 +0200 Subject: [PATCH] rename internal variables Signed-off-by: Vladimir Varankin --- pkg/blockbuilder/blockbuilder.go | 62 ++++++++++++++++---------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 4ffe9b0537c..626bab6cf12 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -110,8 +110,8 @@ func (b *BlockBuilder) stopping(_ error) error { func (b *BlockBuilder) running(ctx context.Context) error { // Do initial consumption on start using current time as the point up to which we are consuming. // To avoid small blocks at startup, we consume until the boundary + buffer. - cycleEnd := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer) - err := b.nextConsumeCycle(ctx, cycleEnd) + cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer) + err := b.nextConsumeCycle(ctx, cycleEndTime) if err != nil { if errors.Is(err, context.Canceled) { return nil @@ -119,21 +119,21 @@ func (b *BlockBuilder) running(ctx context.Context) error { return err } - cycleEnd, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer) + cycleEndTime, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer) for { select { case <-time.After(waitDur): - level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEnd) - err := b.nextConsumeCycle(ctx, cycleEnd) + level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEndTime) + err := b.nextConsumeCycle(ctx, cycleEndTime) if err != nil && !errors.Is(err, context.Canceled) { // Fail the whole service in case of a non-recoverable error. - return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEnd, err) + return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEndTime, err) } // If we took more than ConsumeInterval to consume the records, this will immediately start the next consumption. // TODO(codesome): track waitDur < 0, which is the time we ran over. Should have an alert on this. - cycleEnd = cycleEnd.Add(b.cfg.ConsumeInterval) - waitDur = time.Until(cycleEnd) + cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeInterval) + waitDur = time.Until(cycleEndTime) case <-ctx.Done(): level.Info(b.logger).Log("msg", "context cancelled, stopping") return nil @@ -169,7 +169,7 @@ func nextCycleEnd(t time.Time, interval, buffer time.Duration) (time.Time, time. // nextConsumeCycle manages consumption of currently assigned partitions. // The cycleEnd argument indicates the timestamp (relative to Kafka records) up until which to consume from partitions // in this cycle. That is, Kafka records produced after the cycleEnd mark will be consumed in the next cycle. -func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) error { +func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.Time) error { defer func(t time.Time) { b.blockBuilderMetrics.consumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) @@ -184,14 +184,14 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) // in the beginning of the cycle. // Lag is the upperbound number of records we'll have to consume from Kafka to build the blocks. // It's the "upperbound" because the consumption may be stopped earlier if we get records containing - // samples with timestamp greater than the cycleEnd timestamp. + // samples with timestamp greater than the cycleEndTime timestamp. lag, err := b.getLagForPartition(ctx, partition) if err != nil { - level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd) + level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime) continue } if err := lag.Err; err != nil { - level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd) + level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime) continue } @@ -203,7 +203,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) } state := partitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis) - if err := b.consumePartition(ctx, partition, state, cycleEnd, lag.End.Offset); err != nil { + if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil { level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition) } } @@ -288,32 +288,32 @@ func partitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM // consumePartition consumes records from the given partition until the cycleEnd timestamp. // If the partition is lagging behind, it takes care of consuming it in sections. -func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEnd time.Time, cycleEndOffset int64) (err error) { +func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) { builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics) defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder") // Section is a portion of the partition to process in a single pass. One cycle may process multiple sections if the partition is lagging. - sectionEnd := cycleEnd - if sectionEnd.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) { + sectionEndTime := cycleEndTime + if sectionEndTime.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) { // We are lagging behind by more than 1.5*interval or there is no commit. We need to consume the partition in sections. - // We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEnd, - // i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEnd) + // We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEndTime, + // i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEndTime) // where T is the CommitRecordTimestamp, the timestamp of the record, whose offset we committed previously. // When there is no kafka commit, we play safe and assume LastSeenOffset, and LastBlockEnd were 0 to not discard any samples unnecessarily. - sectionEnd, _ = nextCycleEnd( + sectionEndTime, _ = nextCycleEnd( state.CommitRecordTimestamp, b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer, ) - level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "section_end", sectionEnd, "cycle_end", cycleEnd, "cycle_end_offset", cycleEndOffset, "commit_rec_ts", state.CommitRecordTimestamp) + level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "section_end", sectionEndTime, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset, "commit_rec_ts", state.CommitRecordTimestamp) } - for !sectionEnd.After(cycleEnd) { - logger := log.With(b.logger, "partition", partition, "section_end", sectionEnd, "cycle_end_offset", cycleEndOffset) - state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEnd, cycleEndOffset) + for !sectionEndTime.After(cycleEndTime) { + logger := log.With(b.logger, "partition", partition, "section_end", sectionEndTime, "cycle_end_offset", cycleEndOffset) + state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset) if err != nil { return fmt.Errorf("consume partition %d: %w", partition, err) } - sectionEnd = sectionEnd.Add(b.cfg.ConsumeInterval) + sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval) } return nil @@ -325,12 +325,12 @@ func (b *BlockBuilder) consumePartitionSection( builder *TSDBBuilder, partition int32, state partitionState, - sectionEnd time.Time, + sectionEndTime time.Time, cycleEndOffset int64, ) (retState partitionState, retErr error) { // Oppose to the section's range (and cycle's range), that include the ConsumeIntervalBuffer, the block's range doesn't. // Thus, truncate the timestamp with ConsumptionInterval here to round the block's range. - blockEnd := sectionEnd.Truncate(b.cfg.ConsumeInterval) + blockEnd := sectionEndTime.Truncate(b.cfg.ConsumeInterval) var numBlocks int defer func(t time.Time, startState partitionState) { @@ -349,7 +349,7 @@ func (b *BlockBuilder) consumePartitionSection( b.blockBuilderMetrics.processPartitionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(dur.Seconds()) level.Info(logger).Log("msg", "done consuming", "duration", dur, "last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd, - "last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset, "cycle_end_offset", cycleEndOffset, + "last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset, "num_blocks", numBlocks) }(time.Now(), state) @@ -364,7 +364,7 @@ func (b *BlockBuilder) consumePartitionSection( }) defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{b.cfg.Kafka.Topic: {partition}}) - level.Info(logger).Log("msg", "start consuming", "offset", state.Commit.At, "cycle_end_offset", cycleEndOffset) + level.Info(logger).Log("msg", "start consuming", "offset", state.Commit.At) var ( firstRec *kgo.Record @@ -381,7 +381,7 @@ consumerLoop: // PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands, // we cannot tell if the consumer has already reached the latest end of the partition, i.e. no more records to consume, // or there is more data in the backlog, and we must retry the poll. That's why the consumer loop above has to guard - // the iterations against the partitionState, so it retried the polling up until the expected end of the partition's reached. + // the iterations against the cycleEndOffset, so it retried the polling up until the expected end of the partition is reached. fetches := b.kafkaClient.PollFetches(ctx) fetches.EachError(func(_ string, _ int32, err error) { if !errors.Is(err, context.Canceled) { @@ -398,9 +398,9 @@ consumerLoop: firstRec = rec } - // Stop consuming after we reached the sectionEnd marker. + // Stop consuming after we reached the sectionEndTime marker. // NOTE: the timestamp of the record is when the record was produced relative to distributor's time. - if rec.Timestamp.After(sectionEnd) { + if rec.Timestamp.After(sectionEndTime) { break consumerLoop }