diff --git a/development/mimir-ingest-storage/config/datasource-mimir.yaml b/development/mimir-ingest-storage/config/datasource-mimir.yaml index 0bb8266d750..3f614d8d923 100644 --- a/development/mimir-ingest-storage/config/datasource-mimir.yaml +++ b/development/mimir-ingest-storage/config/datasource-mimir.yaml @@ -10,3 +10,9 @@ datasources: jsonData: prometheusType: Mimir timeInterval: 5s +- name: Jaeger + type: jaeger + access: proxy + uid: jaeger + orgID: 1 + url: http://jaeger:16686/ diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index 0fdaa8f82f3..26cc493eb35 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -15,12 +15,14 @@ ingest_storage: address: kafka_1:9092 topic: mimir-ingest last_produced_offset_poll_interval: 500ms + replay_concurrency: 3 + replay_shards: 8 ingester: track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing partition_ring: - min_partition_owners_count: 2 + min_partition_owners_count: 1 min_partition_owners_duration: 10s delete_inactive_partition_after: 1m @@ -98,3 +100,6 @@ limits: runtime_config: file: ./config/runtime.yaml + +server: + log_level: debug diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index 631fb8503d0..2d9c4c39d03 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -12,6 +12,7 @@ std.manifestYamlDoc({ self.kafka_1 + self.kafka_2 + self.kafka_3 + + self.jaeger + {}, write:: { @@ -231,6 +232,22 @@ std.manifestYamlDoc({ }, }, + jaeger:: { + jaeger: { + image: 'jaegertracing/all-in-one', + ports: ['16686:16686', '14268'], + }, + }, + + local jaegerEnv(appName) = { + JAEGER_AGENT_HOST: 'jaeger', + JAEGER_AGENT_PORT: 6831, + JAEGER_SAMPLER_TYPE: 'const', + JAEGER_SAMPLER_PARAM: 1, + JAEGER_TAGS: 'app=%s' % appName, + JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000, + }, + // This function builds docker-compose declaration for Mimir service. local mimirService(serviceOptions) = { local defaultOptions = { @@ -243,7 +260,7 @@ std.manifestYamlDoc({ kafka_1: { condition: 'service_healthy' }, kafka_2: { condition: 'service_healthy' }, }, - env: {}, + env: jaegerEnv(self.target), extraArguments: [], debug: true, debugPort: self.publishedHttpPort + 3000, diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index 0edbdffd602..71cefefab81 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -23,6 +23,11 @@ - "9091:9091" "volumes": - "./config:/etc/agent-config" + "jaeger": + "image": "jaegertracing/all-in-one" + "ports": + - "16686:16686" + - "14268" "kafka_1": "environment": - "CLUSTER_ID=zH1GDqcNTzGMDCXm5VZQdg" @@ -112,7 +117,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=backend" "hostname": "mimir-backend-1" "image": "mimir" "ports": @@ -136,7 +147,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=backend" "hostname": "mimir-backend-2" "image": "mimir" "ports": @@ -160,7 +177,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=read" "hostname": "mimir-read-1" "image": "mimir" "ports": @@ -184,7 +207,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=read" "hostname": "mimir-read-2" "image": "mimir" "ports": @@ -208,7 +237,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=write" "hostname": "mimir-write-zone-a-1" "image": "mimir" "ports": @@ -233,7 +268,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=write" "hostname": "mimir-write-zone-a-2" "image": "mimir" "ports": @@ -258,7 +299,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=write" "hostname": "mimir-write-zone-a-3" "image": "mimir" "ports": @@ -283,7 +330,13 @@ "condition": "service_healthy" "minio": "condition": "service_started" - "environment": [] + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=ingester" "hostname": "mimir-write-zone-c-61" "image": "mimir" "ports": diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 6be600726dc..18872cc74cb 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -149,7 +149,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { for r := range recordsChannel { if r.err != nil { - level.Error(c.logger).Log("msg", "failed to parse write request; skipping", "err", r.err) + level.Error(spanlogger.FromContext(ctx, c.logger)).Log("msg", "failed to parse write request; skipping", "err", r.err) continue } @@ -337,6 +337,7 @@ func (p *shardingPusher) PushToStorage(ctx context.Context, request *mimirpb.Wri // TODO dimitarvdimitrov support metadata and the rest of the fields; perhaps cut a new request for different values of SkipLabelNameValidation? s.Timeseries = append(s.Timeseries, ts) s.Context = ctx // retain the last context in case we have to flush it when closing shardingPusher + p.unfilledShards[shard] = s if len(s.Timeseries) < p.batchSize { continue diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index c70d16bc157..99442262687 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -225,13 +225,13 @@ func TestPusherConsumer(t *testing.T) { } } -var unimportantLogFieldsPattern = regexp.MustCompile(`\scaller=\S+\.go:\d+\s`) +var unimportantLogFieldsPattern = regexp.MustCompile(`(\s?)caller=\S+\.go:\d+\s`) func removeUnimportantLogFields(lines []string) []string { // The 'caller' field is not important to these tests (we just care about the message and other information), // and can change as we refactor code, making these tests brittle. So we remove it before making assertions about the log lines. for i, line := range lines { - lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, " ") + lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, "$1") } return lines diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 6f0ec52e5b9..a952f9110fa 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math" + "os" "strconv" "sync" "time" @@ -16,6 +17,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -25,7 +27,6 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/plugin/kotel" "github.com/twmb/franz-go/plugin/kprom" - "go.opentelemetry.io/otel/propagation" "go.uber.org/atomic" "github.com/grafana/mimir/pkg/mimirpb" @@ -67,7 +68,7 @@ type recordConsumer interface { } type fetcher interface { - pollFetches(context.Context) kgo.Fetches + pollFetches(context.Context) (kgo.Fetches, context.Context) } type PartitionReader struct { @@ -259,7 +260,9 @@ func (r *PartitionReader) run(ctx context.Context) error { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches := r.fetcher.pollFetches(ctx) + fetches, fetchCtx := r.fetcher.pollFetches(ctx) + // Propagate the fetching span to consuming the records. + ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) r.logFetchErrors(fetches) fetches = filterOutErrFetches(fetches) @@ -466,6 +469,9 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) { } func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetches) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PartitionReader.consumeFetches") + defer span.Finish() + if fetches.NumRecords() == 0 { return nil } @@ -496,6 +502,8 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) }(time.Now()) + logger := spanlogger.FromContext(ctx, r.logger) + for boff.Ongoing() { // We instantiate the consumer on each iteration because it is stateful, and we can't reuse it after closing. consumer := r.newConsumer.consumer() @@ -507,12 +515,14 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche consumeCtx := context.WithoutCancel(ctx) err := consumer.Consume(consumeCtx, records) if err == nil { + level.Debug(logger).Log("msg", "closing consumer after successful consumption") err = consumer.Close(consumeCtx) if err == nil { + level.Debug(logger).Log("msg", "closed consumer") break } } - level.Error(r.logger).Log( + level.Error(logger).Log( "msg", "encountered error while ingesting data from Kafka; should retry", "err", err, "record_min_offset", minOffset, @@ -733,11 +743,11 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } -func (r *PartitionReader) pollFetches(ctx context.Context) (result kgo.Fetches) { +func (r *PartitionReader) pollFetches(ctx context.Context) (result kgo.Fetches, ctx2 context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.client.PollFetches(ctx) + return r.client.PollFetches(ctx), ctx } // fetchWant represents a range of offsets to fetch. @@ -816,11 +826,57 @@ func (w fetchWant) trimIfTooBig() fetchWant { type fetchResult struct { kgo.FetchPartition + ctx context.Context fetchedBytes int } -func newEmptyFetchResult(err error) fetchResult { - return fetchResult{kgo.FetchPartition{Err: err}, 0} +func (fr fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) { + var logger log.Logger = spanlogger.FromContext(fr.ctx, log.NewLogfmtLogger(os.Stderr)) + + msg := "fetched records" + if fr.Err != nil { + msg = "received an error while fetching records; will retry after processing received records (if any)" + } + var ( + gotRecords = int64(len(fr.Records)) + askedRecords = w.endOffset - w.startOffset + ) + switch { + case fr.Err == nil, errors.Is(fr.Err, kerr.OffsetOutOfRange): + logger = level.Debug(logger) + default: + logger = level.Error(logger) + } + logger.Log( + "msg", msg, + "duration", time.Since(fetchStartTime), + "start_offset", w.startOffset, + "end_offset", w.endOffset, + "asked_records", askedRecords, + "got_records", gotRecords, + "diff_records", askedRecords-gotRecords, + "asked_bytes", w.MaxBytes(), + "got_bytes", fr.fetchedBytes, + "diff_bytes", int(w.MaxBytes())-fr.fetchedBytes, + "hwm", fr.HighWatermark, + "lso", fr.LogStartOffset, + "err", fr.Err, + ) +} + +func (fr fetchResult) logOrderedFetch() { + if fr.ctx == nil { + return + } + spanlogger.FromContext(fr.ctx, log.NewNopLogger()).DebugLog("msg", "fetch result is enqueued for consuming") +} + +func newEmptyFetchResult(ctx context.Context, err error) fetchResult { + return fetchResult{ + ctx: ctx, + fetchedBytes: 0, + FetchPartition: kgo.FetchPartition{Err: err}, + } } type concurrentFetchers struct { @@ -836,7 +892,7 @@ type concurrentFetchers struct { recordsPerFetch int minBytesWaitTime time.Duration - orderedFetches chan kgo.FetchPartition + orderedFetches chan fetchResult lastReturnedRecord int64 startOffsets *genericOffsetReader[int64] @@ -874,8 +930,8 @@ func newConcurrentFetchers( lastReturnedRecord: noReturnedRecords, startOffsets: startOffsetsReader, trackCompressedBytes: trackCompressedBytes, - tracer: kotel.NewTracer(kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}))), - orderedFetches: make(chan kgo.FetchPartition), + tracer: recordsTracer(), + orderedFetches: make(chan fetchResult), } var err error @@ -906,52 +962,68 @@ func newConcurrentFetchers( return f, nil } -func (r *concurrentFetchers) pollFetches(ctx context.Context) (result kgo.Fetches) { +func (r *concurrentFetchers) pollFetches(ctx context.Context) (kgo.Fetches, context.Context) { waitStartTime := time.Now() select { case <-ctx.Done(): - return kgo.Fetches{} + return kgo.Fetches{}, ctx case f := <-r.orderedFetches: - level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", time.Since(waitStartTime)) - r.metrics.fetchWaitDuration.Observe(time.Since(waitStartTime).Seconds()) - trimUntil := 0 - f.EachRecord(func(record *kgo.Record) { - if record.Offset <= r.lastReturnedRecord { - trimUntil++ - spanlogger.FromContext(record.Context, r.logger).DebugLog("msg", "skipping record because it has already been returned", "offset", record.Offset) - } - r.tracer.OnFetchRecordUnbuffered(record, true) - }) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) + r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset - f.Records = f.Records[trimUntil:] + f.Records = f.Records[firstUnreturnedRecordIdx:] - r.metrics.fetchedDiscardedRecords.Add(float64(trimUntil)) return kgo.Fetches{{ Topics: []kgo.FetchTopic{ { Topic: r.topicName, - Partitions: []kgo.FetchPartition{f}, + Partitions: []kgo.FetchPartition{f.FetchPartition}, }, }, - }} + }}, f.ctx } } +func recordIndexAfterOffset(records []*kgo.Record, offset int64) int { + for i, r := range records { + if r.Offset > offset { + return i + } + } + return len(records) - 1 +} + +func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) { + waitDuration := time.Since(waitStartTime) + level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", waitDuration) + r.metrics.fetchWaitDuration.Observe(waitDuration.Seconds()) + + doubleFetchedBytes := 0 + for i, record := range f.Records { + if i < firstReturnedRecordIndex { + doubleFetchedBytes += len(record.Value) + spanlogger.FromContext(record.Context, r.logger).DebugLog("msg", "skipping record because it has already been returned", "offset", record.Offset) + } + r.tracer.OnFetchRecordUnbuffered(record, true) + } + r.metrics.fetchedDiscardedRecordBytes.Add(float64(doubleFetchedBytes)) +} + // fetchSingle attempts to find out the leader leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses // fetchSingle returns a fetchResult which may or may not fulfil the entire fetchWant. // If ctx is cancelled, fetchSingle will return an empty fetchResult without an error. -func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant, logger log.Logger) (fr fetchResult) { +func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr fetchResult) { defer func(fetchStartTime time.Time) { - logCompletedFetch(logger, fr, fetchStartTime, fw) + fr.logCompletedFetch(fetchStartTime, fw) }(time.Now()) leaderID, leaderEpoch, err := r.client.PartitionLeader(r.topicName, r.partitionID) if err != nil || (leaderID == -1 && leaderEpoch == -1) { if err != nil { - return newEmptyFetchResult(fmt.Errorf("finding leader for partition: %w", err)) + return newEmptyFetchResult(ctx, fmt.Errorf("finding leader for partition: %w", err)) } - return newEmptyFetchResult(errUnknownPartitionLeader) + return newEmptyFetchResult(ctx, errUnknownPartitionLeader) } req := r.buildFetchRequest(fw, leaderEpoch) @@ -959,12 +1031,12 @@ func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant, logg resp, err := req.RequestWith(ctx, r.client.Broker(int(leaderID))) if err != nil { if errors.Is(err, context.Canceled) { - return newEmptyFetchResult(nil) + return newEmptyFetchResult(ctx, nil) } - return newEmptyFetchResult(fmt.Errorf("fetching from kafka: %w", err)) + return newEmptyFetchResult(ctx, fmt.Errorf("fetching from kafka: %w", err)) } - return r.parseFetchResponse(fw.startOffset, resp) + return r.parseFetchResponse(ctx, fw.startOffset, resp) } func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) kmsg.FetchRequest { @@ -989,7 +1061,7 @@ func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) return req } -func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.FetchResponse) fetchResult { +func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset int64, resp *kmsg.FetchResponse) fetchResult { // Here we ignore resp.ErrorCode. That error code was added for support for KIP-227 and is only set if we're using fetch sessions. We don't use fetch sessions. // We also ignore rawPartitionResp.PreferredReadReplica to keep the code simpler. We don't provide any rack in the FetchRequest, so the broker _probably_ doesn't have a recommended replica for us. @@ -998,7 +1070,7 @@ func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.Fe // Even in case of errors we get the topic partition. err := assertResponseContainsPartition(resp, r.topicID, r.partitionID) if err != nil { - return newEmptyFetchResult(err) + return newEmptyFetchResult(ctx, err) } parseOptions := kgo.ProcessFetchPartitionOptions{ @@ -1023,6 +1095,7 @@ func (r *concurrentFetchers) parseFetchResponse(startOffset int64, resp *kmsg.Fe } return fetchResult{ + ctx: ctx, FetchPartition: partition, fetchedBytes: fetchedBytes, } @@ -1070,15 +1143,23 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa }) for w := range wants { + // Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested. + wantSpan, ctx := opentracing.StartSpanFromContext(ctx, "concurrentFetcher.fetch") + wantSpan.SetTag("start_offset", w.startOffset) + wantSpan.SetTag("end_offset", w.endOffset) + for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ { - attemptLogger := log.With(logger, "attempt", attempt) - f := r.fetchSingle(ctx, w, attemptLogger) + attemptSpan, ctx := opentracing.StartSpanFromContext(ctx, "concurrentFetcher.fetch.attempt") + attemptSpan.SetTag("attempt", attempt) + + f := r.fetchSingle(ctx, w) if f.Err != nil { - w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, attemptLogger) + w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, spanlogger.FromContext(ctx, logger)) } if len(f.Records) == 0 { - // Typically if we had an error, then there wouldn't eb any records. + // Typically if we had an error, then there wouldn't be any records. // But it's hard to verify this for all errors from the Kafka API docs, so just to be sure, we process any records we might have received. + attemptSpan.Finish() continue } // Next attempt will be from the last record onwards. @@ -1089,11 +1170,15 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa errBackoff.Reset() newRecordsProducedBackoff.Reset() + // Propagate the span context to consuming the records. + f.ctx = ctx select { case w.result <- f: case <-ctx.Done(): } + attemptSpan.Finish() } + wantSpan.Finish() close(w.result) } } @@ -1116,7 +1201,7 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) pendingResults = list.New() bufferedResult fetchResult - readyBufferedResults chan kgo.FetchPartition // this is non-nil when bufferedResult is non-empty + readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty ) nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume @@ -1152,9 +1237,10 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) } nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records)) bufferedResult = result + bufferedResult.logOrderedFetch() readyBufferedResults = r.orderedFetches - case readyBufferedResults <- bufferedResult.FetchPartition: + case readyBufferedResults <- bufferedResult: readyBufferedResults = nil bufferedResult = fetchResult{} } @@ -1197,14 +1283,17 @@ func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff wait if partitionStart >= fw.endOffset { // The next fetch want is responsible for this range. We set startOffset=endOffset to effectively mark this fetch as complete. fw.startOffset = fw.endOffset + level.Debug(logger).Log("msg", "we're too far behind aborting fetch", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) break } // Only some of the offsets of our want are out of range, so let's fast-forward. fw.startOffset = partitionStart + level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) } else { // If the broker is behind or if we are requesting offsets which have not yet been produced, we end up here. // We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time. // Wait for a short time to allow the broker to catch up or for new records to be produced. + level.Debug(logger).Log("msg", "offset out of range; waiting for new records to be produced") shortBackoff.Wait() } case errors.Is(err, kerr.TopicAuthorizationFailed): @@ -1258,38 +1347,6 @@ func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff wait return fw } -func logCompletedFetch(logger log.Logger, f fetchResult, fetchStartTime time.Time, w fetchWant) { - msg := "fetched records" - if f.Err != nil { - msg = "received an error while fetching records; will retry after processing received records (if any)" - } - var ( - gotRecords = int64(len(f.Records)) - askedRecords = w.endOffset - w.startOffset - ) - switch { - case f.Err == nil, errors.Is(f.Err, kerr.OffsetOutOfRange): - logger = level.Debug(logger) - default: - logger = level.Error(logger) - } - logger.Log( - "msg", msg, - "duration", time.Since(fetchStartTime), - "start_offset", w.startOffset, - "end_offset", w.endOffset, - "asked_records", askedRecords, - "got_records", gotRecords, - "diff_records", askedRecords-gotRecords, - "asked_bytes", w.MaxBytes(), - "got_bytes", f.fetchedBytes, - "diff_bytes", int(w.MaxBytes())-f.fetchedBytes, - "hwm", f.HighWatermark, - "lso", f.LogStartOffset, - "err", f.Err, - ) -} - type partitionCommitter struct { services.Service @@ -1427,7 +1484,7 @@ type readerMetrics struct { fetchesErrors prometheus.Counter fetchesTotal prometheus.Counter fetchWaitDuration prometheus.Histogram - fetchedDiscardedRecords prometheus.Counter + fetchedDiscardedRecordBytes prometheus.Counter strongConsistencyInstrumentation *StrongReadConsistencyInstrumentation[struct{}] lastConsumedOffset prometheus.Gauge consumeLatency prometheus.Histogram @@ -1477,9 +1534,9 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", NativeHistogramBucketFactor: 1.1, }), - fetchedDiscardedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_fetched_discarded_records_total", - Help: "Total number of records discarded by the consumer because they were already consumed.", + fetchedDiscardedRecordBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_fetched_discarded_bytes_total", + Help: "Total number of uncompressed bytes of records discarded from because they were already consumed. A higher rate means that the concurrent fetching estimations are less accurate.", }), consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_reader_records_batch_process_duration_seconds", diff --git a/pkg/storage/ingest/util.go b/pkg/storage/ingest/util.go index 87d3faffc48..0a9baf96ab9 100644 --- a/pkg/storage/ingest/util.go +++ b/pkg/storage/ingest/util.go @@ -84,10 +84,7 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo opts = append(opts, kgo.AllowAutoTopicCreation()) } - tracer := kotel.NewTracer( - kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})), - ) - opts = append(opts, kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(tracer)).Hooks()...)) + opts = append(opts, kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(recordsTracer())).Hooks()...)) if metrics != nil { opts = append(opts, kgo.WithHooks(metrics)) @@ -96,6 +93,10 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo return opts } +func recordsTracer() *kotel.Tracer { + return kotel.NewTracer(kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}))) +} + // resultPromise is a simple utility to have multiple goroutines waiting for a result from another one. type resultPromise[T any] struct { // done is a channel used to wait the result. Once the channel is closed