Skip to content

Commit

Permalink
Allow disabling of ring heartbeats by setting relevant options to zer…
Browse files Browse the repository at this point in the history
…o. (#4344)

* Allow disabling of ring heartbeats by setting relevant options to zero.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
  • Loading branch information
stevesg authored Jul 9, 2021
1 parent 04a8c99 commit 6b8bd5a
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 30 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
* `-alertmanager.sharding-ring.heartbeat-timeout`
* `-compactor.ring.heartbeat-timeout`
* `-store-gateway.sharding-ring.heartbeat-timeout`
* [ENHANCEMENT] Ring: allow heartbeats to be explicitly disabled by setting the interval to zero. This is considered experimental. This applies to the following configuration options: #4344
* `-distributor.ring.heartbeat-period`
* `-ingester.heartbeat-period`
* `-ruler.ring.heartbeat-period`
* `-alertmanager.sharding-ring.heartbeat-period`
* `-compactor.ring.heartbeat-period`
* `-store-gateway.sharding-ring.heartbeat-period`
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ compactor:
# CLI flag: -compactor.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -compactor.ring.heartbeat-period
[heartbeat_period: <duration> | default = 5s]

Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -store-gateway.sharding-ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]

Expand Down
12 changes: 6 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ ring:
# CLI flag: -distributor.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -distributor.ring.heartbeat-period
[heartbeat_period: <duration> | default = 5s]
Expand Down Expand Up @@ -679,7 +679,7 @@ lifecycler:
# CLI flag: -ingester.num-tokens
[num_tokens: <int> | default = 128]
# Period at which to heartbeat to consul.
# Period at which to heartbeat to consul. 0 = disabled.
# CLI flag: -ingester.heartbeat-period
[heartbeat_period: <duration> | default = 5s]
Expand Down Expand Up @@ -1581,7 +1581,7 @@ ring:
# CLI flag: -ruler.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -ruler.ring.heartbeat-period
[heartbeat_period: <duration> | default = 5s]

Expand Down Expand Up @@ -1907,7 +1907,7 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -alertmanager.sharding-ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
Expand Down Expand Up @@ -5179,7 +5179,7 @@ sharding_ring:
# CLI flag: -compactor.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -compactor.ring.heartbeat-period
[heartbeat_period: <duration> | default = 5s]
Expand Down Expand Up @@ -5257,7 +5257,7 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring.
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -store-gateway.sharding-ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
Expand Down
9 changes: 8 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,11 @@ Currently experimental features are:
- `-ruler.ring.heartbeat-timeout=0`
- `-alertmanager.sharding-ring.heartbeat-timeout=0`
- `-compactor.ring.heartbeat-timeout=0`
- `-store-gateway.sharding-ring.heartbeat-timeout=0`
- `-store-gateway.sharding-ring.heartbeat-timeout=0`
- Disabling ring heartbeats
- `-distributor.ring.heartbeat-period=0`
- `-ingester.heartbeat-period=0`
- `-ruler.ring.heartbeat-period=0`
- `-alertmanager.sharding-ring.heartbeat-period=0`
- `-compactor.ring.heartbeat-period=0`
- `-store-gateway.sharding-ring.heartbeat-period=0`
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Wait stability flags.
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
Expand Down
19 changes: 10 additions & 9 deletions pkg/ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -182,12 +183,12 @@ func (l *BasicLifecycler) starting(ctx context.Context) error {
}

func (l *BasicLifecycler) running(ctx context.Context) error {
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

for {
select {
case <-heartbeatTicker.C:
case <-heartbeatTickerChan:
l.heartbeat(ctx)

case f := <-l.actorChan:
Expand All @@ -214,13 +215,13 @@ func (l *BasicLifecycler) stopping(runningError error) error {
}()

// Heartbeat while the stopping delegate function is running.
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

heartbeatLoop:
for {
select {
case <-heartbeatTicker.C:
case <-heartbeatTickerChan:
l.heartbeat(context.Background())
case <-done:
break heartbeatLoop
Expand Down Expand Up @@ -292,8 +293,8 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error {
}

func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Duration) error {
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

// The first observation will occur after the specified period.
level.Info(l.logger).Log("msg", "waiting stable tokens", "ring", l.ringName)
Expand All @@ -312,7 +313,7 @@ func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Dura
level.Info(l.logger).Log("msg", "tokens verification succeeded", "ring", l.ringName)
return nil

case <-heartbeatTicker.C:
case <-heartbeatTickerChan:
l.heartbeat(ctx)

case <-ctx.Done():
Expand Down
15 changes: 8 additions & 7 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
}

f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul. 0 = disabled.")
f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
Expand Down Expand Up @@ -392,8 +393,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
autoJoinAfter := time.After(i.cfg.JoinAfter)
var observeChan <-chan time.Time = nil

heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

for {
select {
Expand Down Expand Up @@ -442,7 +443,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
observeChan = time.After(i.cfg.ObservePeriod)
}

case <-heartbeatTicker.C:
case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(log.Logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
Expand All @@ -469,8 +470,8 @@ func (i *Lifecycler) stopping(runningError error) error {
return nil
}

heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
Expand All @@ -489,7 +490,7 @@ func (i *Lifecycler) stopping(runningError error) error {
heartbeatLoop:
for {
select {
case <-heartbeatTicker.C:
case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(log.Logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("ruler.ring.", "rulers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithQuerier)
f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier)
f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,14 @@ func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.

return input + time.Duration(jitter)
}

// NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing
// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.
func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) {
if interval == 0 {
return func() {}, nil
}

tick := time.NewTicker(interval)
return func() { tick.Stop() }, tick.C
}
28 changes: 28 additions & 0 deletions pkg/util/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,31 @@ func TestParseTime(t *testing.T) {
assert.Equal(t, TimeToMillis(test.result), ts)
}
}

func TestNewDisableableTicker_Enabled(t *testing.T) {
stop, ch := NewDisableableTicker(10 * time.Millisecond)
defer stop()

time.Sleep(100 * time.Millisecond)

select {
case <-ch:
break
default:
t.Error("ticker should have ticked when enabled")
}
}

func TestNewDisableableTicker_Disabled(t *testing.T) {
stop, ch := NewDisableableTicker(0)
defer stop()

time.Sleep(100 * time.Millisecond)

select {
case <-ch:
t.Error("ticker should not have ticked when disabled")
default:
break
}
}

0 comments on commit 6b8bd5a

Please sign in to comment.