-
Notifications
You must be signed in to change notification settings - Fork 621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
segment writer service #3498
segment writer service #3498
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM – I've left a few notes but those are just my thoughts / topics for discussion – please feel free to ignore them, we'll figure those out along the way
f.DurationVar(&cfg.SegmentDuration, prefix+"segment.duration", 500*time.Millisecond, "Timeout when flushing segments to bucket.") | ||
f.BoolVar(&cfg.Async, prefix+"async", false, "Enable async mode for segment writer.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these should be tenant options (limits) rather than global ones
pkg/phlaredb/metrics.go
Outdated
func ContextWithHeadMetrics(ctx context.Context, reg prometheus.Registerer, prefix string) context.Context { | ||
return contextWithHeadMetrics(ctx, newHeadMetrics2(reg, prefix)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR: I saw your attempt to make the dependency on metrics explicit 👍🏻 I really hope we won't pass it via the context
pkg/experiment/ingester/service.go
Outdated
err = pprof.FromBytes(sample.RawProfile, func(p *profilev1.Profile, size int) error { | ||
if err = segment.ingest(ctx, tenantID, p, id, series.Labels); err != nil { | ||
reason := validation.ReasonOf(err) | ||
if reason != validation.Unknown { | ||
validation.DiscardedProfiles.WithLabelValues(string(reason), tenantID).Add(float64(1)) | ||
validation.DiscardedBytes.WithLabelValues(string(reason), tenantID).Add(float64(size)) | ||
switch validation.ReasonOf(err) { | ||
case validation.SeriesLimit: | ||
return connect.NewError(connect.CodeResourceExhausted, err) | ||
} | ||
} | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we won't have SeriesLimit
in segment writer, we can simplify this piece
pkg/experiment/ingester/service.go
Outdated
i.segmentWriter.metrics.segmentFlushTimeouts.WithLabelValues(tenantID).Inc() | ||
i.segmentWriter.metrics.segmentFlushWaitDuration.WithLabelValues(tenantID).Observe(time.Since(t1).Seconds()) | ||
level.Error(i.logger).Log("msg", "flush timeout", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We assume that the error indicates a timeout. We probably want to check the error type here (or context.Err()
)
pkg/experiment/ingester/service.go
Outdated
var waits = make(map[segmentWaitFlushed]struct{}, len(req.Msg.Series)) | ||
for _, series := range req.Msg.Series { | ||
var shard = shardKey(series.Shard) | ||
wait, err := i.segmentWriter.ingest(shard, func(segment segmentIngest) error { | ||
return i.ingestToSegment(ctx, segment, series, tenantID) | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
waits[wait] = struct{}{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: If we moved pprof split from distributors to segment writers and restricted requests to a single profile, we would not need to wait multiple segments to flush (which may result in 2 * segment_duration latency)
api/segmentwriter/v1/push.proto
Outdated
message PushRequest { | ||
// series is a set raw pprof profiles and accompanying labels | ||
repeated RawProfileSeries series = 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this internally at some point, and I recall the consensus was that batching does not benefit us here. On the contrary, it introduces several issues:
- Callers have to wait all the affected shard segment writers to flush, which badly impacts latency, and may also impact resource usage on the distributor side.
- It complicates error handling. I'm not 100% sure that partial success is handled properly.
- It complicates retries on the distributor end.
I hope we'll amend the API and implementation accordingly in follow-up PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed repeated series, but kept repeated samples
Co-authored-by: Anton Kolesnikov <anton.e.kolesnikov@gmail.com>
Bring back segment writer service.
Add push protobuf api for segment writer.
The service is still detached - nobody is pushing to it yet.
The service is not as optimized as in POC.
It will be addressed in followups.