Skip to content

Commit

Permalink
feat(v2): revive grpc health checks (#3643)
Browse files Browse the repository at this point in the history
* feat(v2): revive grpc health checks

* fix: segment-writer flushes data twice
  • Loading branch information
kolesnikovae authored Oct 25, 2024
1 parent a373272 commit 1b107d4
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 225 deletions.
30 changes: 20 additions & 10 deletions pkg/experiment/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
cbMinSuccess = 5
cbMaxFailures = 3
cbClosedInterval = 0
cbOpenTimeout = 5 * time.Second
cbOpenTimeout = time.Second

poolCleanupPeriod = 15 * time.Second
)
Expand Down Expand Up @@ -137,7 +137,10 @@ const grpcServiceConfig = `{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {}
}]
}],
"healthCheckConfig": {
"serviceName": "pyroscope.segment-writer"
}
}`

type Client struct {
Expand Down Expand Up @@ -218,14 +221,14 @@ func (c *Client) Push(
// At most 5 attempts to push the data to the segment writer.
instances := placement.ActiveInstances(p.Instances)
req.Shard = p.Shard
for attempts := 5; attempts >= 0 && instances.Next() && ctx.Err() == nil; attempts-- {
for attempts := 5; attempts >= 0 && instances.Next(); attempts-- {
instance := instances.At()
logger := log.With(c.logger,
"tenant", req.TenantId,
"shard", req.Shard,
"instance_addr", instance.Addr,
"instance_id", instance.Id,
"attempts", attempts,
"attempts_left", attempts,
)
_ = level.Debug(logger).Log("msg", "sending request")
resp, err = c.pushToInstance(ctx, req, instance.Addr)
Expand All @@ -240,6 +243,9 @@ func (c *Client) Push(
return nil, status.Error(codes.Unavailable, errServiceUnavailableMsg)
}
_ = level.Warn(logger).Log("msg", "failed attempt to push data to segment writer", "err", err)
if err = ctx.Err(); err != nil {
return nil, err
}
}

_ = level.Error(c.logger).Log(
Expand All @@ -248,6 +254,7 @@ func (c *Client) Push(
"shard", req.Shard,
"last_err", err,
)

return nil, status.Error(codes.Unavailable, errServiceUnavailableMsg)
}

Expand All @@ -260,10 +267,13 @@ func (c *Client) pushToInstance(
if err != nil {
return nil, err
}
c.metrics.sentBytes.
WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr).
Observe(float64(len(req.Profile)))
return segmentwriterv1.NewSegmentWriterServiceClient(conn).Push(ctx, req)
resp, err := segmentwriterv1.NewSegmentWriterServiceClient(conn).Push(ctx, req)
if err == nil {
c.metrics.sentBytes.
WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr).
Observe(float64(len(req.Profile)))
}
return resp, err
}

func newConnPool(
Expand Down Expand Up @@ -294,8 +304,8 @@ func newConnPool(
"segment-writer",
ring_client.PoolConfig{
CheckInterval: poolCleanupPeriod,
// Note that no health checks are performed: it's caller
// responsibility to pick a healthy instance.
// Note that no health checks are not used:
// gGRPC health-checking is done at the gRPC connection level.
HealthCheckEnabled: false,
HealthCheckTimeout: 0,
MaxConcurrentHealthChecks: 0,
Expand Down
110 changes: 52 additions & 58 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ")

type shardKey uint32

type segmentWriterConfig struct {
segmentDuration time.Duration
}

type segmentsWriter struct {
segmentDuration time.Duration
limits Limits

l log.Logger
bucket objstore.Bucket
metastoreClient metastorev1.MetastoreServiceClient
config Config
limits Limits
logger log.Logger
bucket objstore.Bucket
metastore metastorev1.MetastoreServiceClient

shards map[shardKey]*shard
shardsLock sync.RWMutex
Expand All @@ -57,26 +52,26 @@ type segmentsWriter struct {
}

type shard struct {
sw *segmentsWriter
current *segment
currentLock sync.RWMutex
wg sync.WaitGroup
l log.Logger
concatBuf []byte
wg sync.WaitGroup
logger log.Logger
concatBuf []byte
sw *segmentsWriter
mu sync.RWMutex
segment *segment
}

func (sh *shard) ingest(fn func(head segmentIngest)) segmentWaitFlushed {
sh.currentLock.RLock()
s := sh.current
sh.mu.RLock()
s := sh.segment
s.inFlightProfiles.Add(1)
sh.currentLock.RUnlock()
sh.mu.RUnlock()
defer s.inFlightProfiles.Done()
fn(s)
return s
}

func (sh *shard) loop(ctx context.Context) {
ticker := time.NewTicker(sh.sw.segmentDuration)
ticker := time.NewTicker(sh.sw.config.SegmentDuration)
defer ticker.Stop()
for {
select {
Expand All @@ -90,10 +85,10 @@ func (sh *shard) loop(ctx context.Context) {
}

func (sh *shard) flushSegment(ctx context.Context) {
sh.currentLock.Lock()
s := sh.current
sh.current = sh.sw.newSegment(sh, s.shard, sh.l)
sh.currentLock.Unlock()
sh.mu.Lock()
s := sh.segment
sh.segment = sh.sw.newSegment(sh, s.shard, sh.logger)
sh.mu.Unlock()

go func() { // not blocking next ticks in case metastore/s3 latency is high
t1 := time.Now()
Expand All @@ -102,10 +97,10 @@ func (sh *shard) flushSegment(ctx context.Context) {

err := s.flush(ctx)
if err != nil {
_ = level.Error(sh.sw.l).Log("msg", "failed to flush segment", "err", err)
_ = level.Error(sh.sw.logger).Log("msg", "failed to flush segment", "err", err)
}
if s.debuginfo.movedHeads > 0 {
_ = level.Debug(s.l).Log("msg",
_ = level.Debug(s.logger).Log("msg",
"writing segment block done",
"heads-count", len(s.heads),
"heads-moved-count", s.debuginfo.movedHeads,
Expand All @@ -118,19 +113,19 @@ func (sh *shard) flushSegment(ctx context.Context) {
}()
}

func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, cfg segmentWriterConfig, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter {
func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, config Config, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter {
ctx, cancelFunc := context.WithCancel(context.Background())
sw := &segmentsWriter{
limits: limits,
metrics: metrics,
headMetrics: hm,
segmentDuration: cfg.segmentDuration,
l: l,
bucket: bucket,
shards: make(map[shardKey]*shard),
metastoreClient: metastoreClient,
cancel: cancelFunc,
cancelCtx: ctx,
limits: limits,
metrics: metrics,
headMetrics: hm,
config: config,
logger: l,
bucket: bucket,
shards: make(map[shardKey]*shard),
metastore: metastoreClient,
cancel: cancelFunc,
cancelCtx: ctx,
}

return sw
Expand Down Expand Up @@ -158,26 +153,26 @@ func (sw *segmentsWriter) ingest(shard shardKey, fn func(head segmentIngest)) (a
}

func (sw *segmentsWriter) Stop() error {
sw.l.Log("msg", "stopping segments writer")
sw.logger.Log("msg", "stopping segments writer")
sw.cancel()
sw.shardsLock.Lock()
defer sw.shardsLock.Unlock()
for _, s := range sw.shards {
s.wg.Wait()
}
sw.l.Log("msg", "segments writer stopped")
sw.logger.Log("msg", "segments writer stopped")

return nil
}

func (sw *segmentsWriter) newShard(sk shardKey) *shard {
sl := log.With(sw.l, "shard", fmt.Sprintf("%d", sk))
sl := log.With(sw.logger, "shard", fmt.Sprintf("%d", sk))
sh := &shard{
sw: sw,
l: sl,
logger: sl,
concatBuf: make([]byte, 4*0x1000),
}
sh.current = sw.newSegment(sh, sk, sl)
sh.segment = sw.newSegment(sh, sk, sl)
sh.wg.Add(1)
go func() {
defer sh.wg.Done()
Expand All @@ -189,14 +184,14 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
sshard := fmt.Sprintf("%d", sk)
s := &segment{
l: log.With(sl, "segment-id", id.String()),
logger: log.With(sl, "segment-id", id.String()),
ulid: id,
heads: make(map[serviceKey]serviceHead),
sw: sw,
sh: sh,
shard: sk,
sshard: sshard,
doneChan: make(chan struct{}, 0),
doneChan: make(chan struct{}),
}
return s
}
Expand Down Expand Up @@ -226,17 +221,17 @@ func (s *segment) flush(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("failed to flush block %s: %w", s.ulid.String(), err)
}
// TODO(kolesnikovae): Add sane timeouts to all the operations.
if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil {
return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err)
}
if err = s.sw.storeMeta(ctx, blockMeta, s); err != nil {
level.Error(s.l).Log("msg", "failed to store meta in metastore", "err", err)
level.Error(s.logger).Log("msg", "failed to store meta in metastore", "err", err)
if dlqErr := s.sw.storeMetaDLQ(ctx, blockMeta, s); dlqErr != nil {
level.Error(s.l).Log("msg", "metastore fallback failed", "err", dlqErr)
level.Error(s.logger).Log("msg", "metastore fallback failed", "err", dlqErr)
return fmt.Errorf("failed to store meta %s: %w", s.ulid.String(), dlqErr)
}
}

return nil
}

Expand All @@ -263,7 +258,7 @@ func (s *segment) flushBlock(heads []flushedServiceHead) ([]byte, *metastorev1.B
for i, e := range heads {
svc, err := concatSegmentHead(e, w)
if err != nil {
_ = level.Error(s.l).Log("msg", "failed to concat segment head", "err", err)
_ = level.Error(s.logger).Log("msg", "failed to concat segment head", "err", err)
continue
}
if i == 0 {
Expand Down Expand Up @@ -331,11 +326,11 @@ func (s *segment) flushHeads(ctx context.Context) (moved []flushedServiceHead) {
eFlushed, err := s.flushHead(ctx, e)

if err != nil {
level.Error(s.l).Log("msg", "failed to flush head", "err", err)
level.Error(s.logger).Log("msg", "failed to flush head", "err", err)
}
if eFlushed != nil {
if eFlushed.Meta.NumSamples == 0 {
_ = level.Debug(s.l).Log("msg", "skipping empty head")
_ = level.Debug(s.logger).Log("msg", "skipping empty head")
return
} else {
mutex.Lock()
Expand Down Expand Up @@ -366,7 +361,7 @@ func (s *segment) flushHead(ctx context.Context, e serviceHead) (*memdb.FlushedH
return nil, fmt.Errorf("failed to flush head : %w", err)
}
s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds())
level.Debug(s.l).Log(
level.Debug(s.logger).Log(
"msg", "flushed head",
"tenant", e.key.tenant,
"service", e.key.service,
Expand Down Expand Up @@ -404,7 +399,7 @@ type segment struct {
doneChan chan struct{}
flushErr error
flushErrMutex sync.Mutex
l log.Logger
logger log.Logger

debuginfo struct {
movedHeads int
Expand All @@ -418,8 +413,7 @@ type segment struct {
}

type segmentIngest interface {
// TODO(kolesnikovae): Remove context from the signature.
ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair)
ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair)
}

type segmentWaitFlushed interface {
Expand All @@ -432,13 +426,13 @@ func (s *segment) waitFlushed(ctx context.Context) error {
return fmt.Errorf("waitFlushed: %s %w", s.ulid.String(), ctx.Err())
case <-s.doneChan:
s.flushErrMutex.Lock()
defer s.flushErrMutex.Unlock()
res := s.flushErr
s.flushErrMutex.Unlock()
return res
}
}

func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair) {
func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair) {
k := serviceKey{
tenant: tenantID,
service: model.Labels(labels).Get(model.LabelNameServiceName),
Expand Down Expand Up @@ -523,7 +517,7 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil {
return err
}
sw.l.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))
sw.logger.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))
return nil
}

Expand All @@ -533,7 +527,7 @@ func (sw *segmentsWriter) storeMeta(ctx context.Context, meta *metastorev1.Block
sw.metrics.storeMetaDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds())
s.debuginfo.storeMetaDuration = time.Since(t1)
}()
_, err := sw.metastoreClient.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta})
_, err := sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta})
if err != nil {
sw.metrics.storeMetaErrors.WithLabelValues(s.sshard).Inc()
}
Expand Down
Loading

0 comments on commit 1b107d4

Please sign in to comment.