Skip to content

Commit

Permalink
kafka replay speed: improve consuming pipeline telemetry (#9269)
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Sep 19, 2024
1 parent e297470 commit bbe4666
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 95 deletions.
6 changes: 6 additions & 0 deletions development/mimir-ingest-storage/config/datasource-mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@ datasources:
jsonData:
prometheusType: Mimir
timeInterval: 5s
- name: Jaeger
type: jaeger
access: proxy
uid: jaeger
orgID: 1
url: http://jaeger:16686/
7 changes: 6 additions & 1 deletion development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ ingest_storage:
address: kafka_1:9092
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
replay_concurrency: 3
replay_shards: 8

ingester:
track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing

partition_ring:
min_partition_owners_count: 2
min_partition_owners_count: 1
min_partition_owners_duration: 10s
delete_inactive_partition_after: 1m

Expand Down Expand Up @@ -98,3 +100,6 @@ limits:

runtime_config:
file: ./config/runtime.yaml

server:
log_level: debug
19 changes: 18 additions & 1 deletion development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ std.manifestYamlDoc({
self.kafka_1 +
self.kafka_2 +
self.kafka_3 +
self.jaeger +
{},

write:: {
Expand Down Expand Up @@ -231,6 +232,22 @@ std.manifestYamlDoc({
},
},

jaeger:: {
jaeger: {
image: 'jaegertracing/all-in-one',
ports: ['16686:16686', '14268'],
},
},

local jaegerEnv(appName) = {
JAEGER_AGENT_HOST: 'jaeger',
JAEGER_AGENT_PORT: 6831,
JAEGER_SAMPLER_TYPE: 'const',
JAEGER_SAMPLER_PARAM: 1,
JAEGER_TAGS: 'app=%s' % appName,
JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000,
},

// This function builds docker-compose declaration for Mimir service.
local mimirService(serviceOptions) = {
local defaultOptions = {
Expand All @@ -243,7 +260,7 @@ std.manifestYamlDoc({
kafka_1: { condition: 'service_healthy' },
kafka_2: { condition: 'service_healthy' },
},
env: {},
env: jaegerEnv(self.target),
extraArguments: [],
debug: true,
debugPort: self.publishedHttpPort + 3000,
Expand Down
69 changes: 61 additions & 8 deletions development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
- "9091:9091"
"volumes":
- "./config:/etc/agent-config"
"jaeger":
"image": "jaegertracing/all-in-one"
"ports":
- "16686:16686"
- "14268"
"kafka_1":
"environment":
- "CLUSTER_ID=zH1GDqcNTzGMDCXm5VZQdg"
Expand Down Expand Up @@ -112,7 +117,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=backend"
"hostname": "mimir-backend-1"
"image": "mimir"
"ports":
Expand All @@ -136,7 +147,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=backend"
"hostname": "mimir-backend-2"
"image": "mimir"
"ports":
Expand All @@ -160,7 +177,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=read"
"hostname": "mimir-read-1"
"image": "mimir"
"ports":
Expand All @@ -184,7 +207,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=read"
"hostname": "mimir-read-2"
"image": "mimir"
"ports":
Expand All @@ -208,7 +237,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=write"
"hostname": "mimir-write-zone-a-1"
"image": "mimir"
"ports":
Expand All @@ -233,7 +268,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=write"
"hostname": "mimir-write-zone-a-2"
"image": "mimir"
"ports":
Expand All @@ -258,7 +299,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=write"
"hostname": "mimir-write-zone-a-3"
"image": "mimir"
"ports":
Expand All @@ -283,7 +330,13 @@
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment": []
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=ingester"
"hostname": "mimir-write-zone-c-61"
"image": "mimir"
"ports":
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error {

for r := range recordsChannel {
if r.err != nil {
level.Error(c.logger).Log("msg", "failed to parse write request; skipping", "err", r.err)
level.Error(spanlogger.FromContext(ctx, c.logger)).Log("msg", "failed to parse write request; skipping", "err", r.err)
continue
}

Expand Down Expand Up @@ -337,6 +337,7 @@ func (p *shardingPusher) PushToStorage(ctx context.Context, request *mimirpb.Wri
// TODO dimitarvdimitrov support metadata and the rest of the fields; perhaps cut a new request for different values of SkipLabelNameValidation?
s.Timeseries = append(s.Timeseries, ts)
s.Context = ctx // retain the last context in case we have to flush it when closing shardingPusher
p.unfilledShards[shard] = s

if len(s.Timeseries) < p.batchSize {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ func TestPusherConsumer(t *testing.T) {
}
}

var unimportantLogFieldsPattern = regexp.MustCompile(`\scaller=\S+\.go:\d+\s`)
var unimportantLogFieldsPattern = regexp.MustCompile(`(\s?)caller=\S+\.go:\d+\s`)

func removeUnimportantLogFields(lines []string) []string {
// The 'caller' field is not important to these tests (we just care about the message and other information),
// and can change as we refactor code, making these tests brittle. So we remove it before making assertions about the log lines.
for i, line := range lines {
lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, " ")
lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, "$1")
}

return lines
Expand Down
Loading

0 comments on commit bbe4666

Please sign in to comment.