Skip to content

Commit

Permalink
storage/remote: disable resharding during active retry backoffs
Browse files Browse the repository at this point in the history
Today, remote_write reshards based on pure throughput. This is
problematic if throughput has been diminished because of HTTP 429s;
increasing the number of shards due to backpressure will only exacerbate
the problem.

This commit disables resharding for twice the retry backoff, ensuring
that resharding will never occur during an active backoff, and that
resharding does not become enabled again until enough time has elapsed
to allow any pending requests to be retried.

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
  • Loading branch information
rfratto committed Feb 8, 2024
1 parent 8655fe5 commit ca185ad
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ type WriteClient interface {
type QueueManager struct {
lastSendTimestamp atomic.Int64
buildRequestLimitTimestamp atomic.Int64
reshardDisableTimestamp atomic.Int64

logger log.Logger
flushDeadline time.Duration
Expand Down Expand Up @@ -574,7 +575,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
retry := func() {
t.metrics.retriedMetadataTotal.Add(float64(len(metadata)))
}
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.logger, attemptStore, retry)
err = sendWriteRequestWithBackoff(ctx, t, t.cfg, t.logger, attemptStore, retry)
if err != nil {
return err
}
Expand Down Expand Up @@ -1021,6 +1022,10 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return false
}
if disableTimestamp := t.reshardDisableTimestamp.Load(); time.Now().Unix() < disableTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, resharding is disabled", "disabled_for", time.Until(time.Unix(disableTimestamp, 0)))
return false
}
return true
}

Expand Down Expand Up @@ -1622,7 +1627,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
}

err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
err = sendWriteRequestWithBackoff(ctx, s.qm, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
if errors.Is(err, context.Canceled) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
Expand All @@ -1635,7 +1640,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}

func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
func sendWriteRequestWithBackoff(ctx context.Context, qm *QueueManager, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
sleepDuration := model.Duration(0)
try := 0
Expand Down Expand Up @@ -1668,6 +1673,15 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism")
}

// We should never reshard for a recoverable error; increasing shards could
// make the problem worse, particularly if we're getting rate limited.
//
// reshardDisableTimestamp holds the unix timestamp until which resharding
// is diableld. We'll update that timestamp if the period we were just told
// to sleep for is newer than the existing disabled timestamp.
reshardWaitPeriod := time.Now().Add(time.Duration(sleepDuration) * 2)
setAtomicToNewer(&qm.reshardDisableTimestamp, reshardWaitPeriod.Unix())

select {
case <-ctx.Done():
case <-time.After(time.Duration(sleepDuration)):
Expand All @@ -1687,6 +1701,25 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}

// setAtomicToNewer atomically sets a value to the newer uint64 between itself
// and the provided newValue argument.
func setAtomicToNewer(value *atomic.Int64, newValue int64) {
for {
current := value.Load()
if current >= newValue {
// If the current stored value is newer than newValue; abort.
return
}

// Try to swap the value. If the atomic value has changed, we loop back to
// the beginning until we've successfully swapped out the value or the
// value stored in it is newer than newValue.
if value.CompareAndSwap(current, newValue) {
return
}
}
}

func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
Expand Down

0 comments on commit ca185ad

Please sign in to comment.