Skip to content

Commit

Permalink
storage/remote: disable resharding during active retry backoffs
Browse files Browse the repository at this point in the history
Today, remote_write reshards based on pure throughput. This is
problematic if throughput has been diminished because of HTTP 429s;
increasing the number of shards due to backpressure will only exacerbate
the problem.

This commit disables resharding for twice the retry backoff, ensuring
that resharding will never occur during an active backoff, and that
resharding does not become enabled again until enough time has elapsed
to allow any pending requests to be retried.

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
  • Loading branch information
rfratto committed Feb 8, 2024
1 parent 8655fe5 commit ae3f6a6
Showing 1 changed file with 31 additions and 3 deletions.
34 changes: 31 additions & 3 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ type WriteClient interface {
type QueueManager struct {
lastSendTimestamp atomic.Int64
buildRequestLimitTimestamp atomic.Int64
reshardDisableTimestamp atomic.Int64

logger log.Logger
flushDeadline time.Duration
Expand Down Expand Up @@ -574,7 +575,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
retry := func() {
t.metrics.retriedMetadataTotal.Add(float64(len(metadata)))
}
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.logger, attemptStore, retry)
err = sendWriteRequestWithBackoff(ctx, t, t.cfg, t.logger, attemptStore, retry)
if err != nil {
return err
}
Expand Down Expand Up @@ -1021,6 +1022,10 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return false
}
if disableTimestamp := t.reshardDisableTimestamp.Load(); time.Now().Unix() < disableTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, resharding is disabled", "disabled_for", time.Until(time.Unix(disableTimestamp, 0)))
return false
}
return true
}

Expand Down Expand Up @@ -1622,7 +1627,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}

err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
err = sendWriteRequestWithBackoff(ctx, s.qm, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
Expand All @@ -1635,7 +1640,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}

func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
func sendWriteRequestWithBackoff(ctx context.Context, qm *QueueManager, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
sleepDuration := model.Duration(0)
try := 0
Expand Down Expand Up @@ -1668,6 +1673,15 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism")
}

// We should never reshard for a recoverable error; increasing shards could
// make the problem worse, particularly if we're getting rate limited.
//
// reshardDisableTimestamp holds the unix timestamp until which resharding
// is diableld. We'll update that timestamp if the period we were just told
// to sleep for is newer than the existing disabled timestamp.
reshardWaitPeriod := time.Now().Add(time.Duration(sleepDuration) * 2)
setAtomicToNewer(&qm.reshardDisableTimestamp, reshardWaitPeriod.Unix())

select {
case <-ctx.Done():
case <-time.After(time.Duration(sleepDuration)):
Expand All @@ -1687,6 +1701,20 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}

// setAtomicToNewer atomically sets a value to the newer uint64 between itself
// and the provided newValue argument.
func setAtomicToNewer(value *atomic.Int64, newValue int64) {
for {
current := value.Load()
if current >= newValue {
return
}
if value.CompareAndSwap(current, newValue) {
return
}
}
}

func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
Expand Down

0 comments on commit ae3f6a6

Please sign in to comment.