Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compactor auto-forget from ring on unhealthy #6563

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* [ENHANCEMENT] Added metric name in limiter per-metric exceeded errors. #6416
* [ENHANCEMENT] StoreGateway: Added `cortex_bucket_store_indexheader_load_duration_seconds` and `cortex_bucket_store_indexheader_download_duration_seconds` metrics for time of downloading and loading index header files. #6445
* [ENHANCEMENT] Blocks Storage: Allow use of non-dualstack endpoints for S3 blocks storage via `-blocks-storage.s3.disable-dualstack`. #6522
* [ENHANCEMENT] Add `auto_forget_unhealthy_periods` for ring to auto forget unhealthy compactors. #6533
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to add under ## master / unreleased

* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,8 @@ func (c *Compactor) starting(ctx context.Context) error {
// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
delegate := ring.LifecyclerDelegate(c)
c.ringLifecycler, err = ring.NewLifecyclerWithDelegate(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer), delegate)
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
AutoForgetUnhealthyPeriods int `yaml:"auto_forget_unhealthy_periods"`

// Wait ring stability.
WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
Expand Down Expand Up @@ -54,6 +55,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.IntVar(&cfg.AutoForgetUnhealthyPeriods, "compactor.auto-forget-unhealthy-periods", -1, "The number of heartbeat periods to occur after an ingester becomes unhealthy before it is forgotten from the ring. -1 to disable")

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
Expand Down Expand Up @@ -87,6 +89,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ReplicationFactor = 1
rc.AutoForgetUnhealthyPeriods = cfg.AutoForgetUnhealthyPeriods

// Configure lifecycler
lc.RingConfig = rc
Expand Down
3 changes: 3 additions & 0 deletions pkg/compactor/compactor_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestRingConfig_DefaultConfigToLifecyclerConfig(t *testing.T) {
expected.NumTokens = 512
expected.MinReadyDuration = 0
expected.FinalSleep = 0
expected.RingConfig.AutoForgetUnhealthyPeriods = -1

assert.Equal(t, expected, cfg.ToLifecyclerConfig())
}
Expand All @@ -42,6 +43,7 @@ func TestRingConfig_CustomConfigToLifecyclerConfig(t *testing.T) {
cfg.InstanceAddr = "1.2.3.4"
cfg.ListenPort = 10
cfg.TokensFilePath = "testFilePath"
cfg.AutoForgetUnhealthyPeriods = 5

// The lifecycler config should be generated based upon the compactor
// ring config
Expand All @@ -54,6 +56,7 @@ func TestRingConfig_CustomConfigToLifecyclerConfig(t *testing.T) {
expected.Addr = cfg.InstanceAddr
expected.ListenPort = cfg.ListenPort
expected.TokensFilePath = cfg.TokensFilePath
expected.RingConfig.AutoForgetUnhealthyPeriods = cfg.AutoForgetUnhealthyPeriods

// Hardcoded config
expected.RingConfig.ReplicationFactor = 1
Expand Down
11 changes: 11 additions & 0 deletions pkg/compactor/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package compactor

import (
"context"

"github.com/cortexproject/cortex/pkg/ring"
)

func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.Lifecycler, ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do similar to the basicLifecycler.

Create the generic delegate for the ring and make compactor use it.

c.ring.AutoForgetUnhealthy(ctx)
}
26 changes: 26 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ var (
errInvalidTokensGeneratorStrategy = errors.New("invalid token generator strategy")
)

type LifecyclerDelegate interface {
// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
OnRingInstanceHeartbeat(lifecycler *Lifecycler, ctx context.Context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see that the same function parameter to be reused. The current interface is a bit weird and we are not using the lifecycler at all in the compactor implementation

}

// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`
Expand Down Expand Up @@ -108,6 +114,7 @@ type Lifecycler struct {
cfg LifecyclerConfig
flushTransferer FlushTransferer
KVStore kv.Client
delegate LifecyclerDelegate

actorChan chan func()
autojoinChan chan struct{}
Expand Down Expand Up @@ -150,6 +157,22 @@ type Lifecycler struct {
tg TokenGenerator
}

func NewLifecyclerWithDelegate(
cfg LifecyclerConfig,
flushTransferer FlushTransferer,
ringName, ringKey string,
autoJoinOnStartup, flushOnShutdown bool,
logger log.Logger,
reg prometheus.Registerer,
delegate LifecyclerDelegate,
) (*Lifecycler, error) {
l, err := NewLifecycler(cfg, flushTransferer, ringName, ringKey, autoJoinOnStartup, flushOnShutdown, logger, reg)
if l != nil {
l.delegate = delegate
}
return l, err
}

// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
func NewLifecycler(
cfg LifecyclerConfig,
Expand Down Expand Up @@ -601,6 +624,9 @@ func (i *Lifecycler) heartbeat(ctx context.Context) {
i.lifecyclerMetrics.consulHeartbeats.Inc()
ctx, cancel := context.WithTimeout(ctx, i.cfg.HeartbeatPeriod)
defer cancel()
if i.delegate != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of explicitly checking i.deletegate != nil, can we define a noopDelegate as the default which does nothing?

i.delegate.OnRingInstanceHeartbeat(i, ctx)
}
if err := i.updateConsul(ctx); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
Expand Down
55 changes: 48 additions & 7 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ var (

// Config for a Ring
type Config struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
KVStore kv.Config `yaml:"kvstore"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
AutoForgetUnhealthyPeriods int `yaml:"auto_forget_unhealthy_periods"`

// Whether the shuffle-sharding subring cache is disabled. This option is set
// internally and never exposed to the user.
Expand All @@ -168,6 +169,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.")
f.IntVar(&cfg.AutoForgetUnhealthyPeriods, prefix+"ring.auto-forget-unhealthy-periods", -1, "The number of heartbeat periods to occur after an ingester becomes unhealthy before it is forgotten from the ring. -1 to disable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flag needed? Is it used to auto forget Ingesters? I don't know if this is what we want to do in this PR at least

}

type instanceInfo struct {
Expand Down Expand Up @@ -213,6 +215,7 @@ type Ring struct {
numTokensGaugeVec *prometheus.GaugeVec
oldestTimestampGaugeVec *prometheus.GaugeVec
reportedOwners map[string]struct{}
unhealthyPeriodsCount map[string]int

logger log.Logger
}
Expand Down Expand Up @@ -277,7 +280,8 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
Help: "Timestamp of the oldest member in the ring.",
ConstLabels: map[string]string{"name": name}},
[]string{"state"}),
logger: logger,
logger: logger,
unhealthyPeriodsCount: make(map[string]int),
}

r.Service = services.NewBasicService(r.starting, r.loop, nil).WithName(fmt.Sprintf("%s ring client", name))
Expand Down Expand Up @@ -655,6 +659,43 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]int64) {
return numTokens, owned
}

// autoForgetUnhealthy forgets unhealthy ingesters from the ring.
func (r *Ring) AutoForgetUnhealthy(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding AutoForgetUnhealthy here I prefer to reuse the existing NewAutoForgetDelegate.
To make that work, we need to call i.delegate.OnRingInstanceHeartbeat(i, ctx) inside i.updateConsul and pass the same parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

it should be similar to how

func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {

works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking to @eeldaly offline I realized that the parameter is a bit different so probably hard to add the same handler.

This seems cleaner to me. We can switch AutoForgetDelegate to call this function instead.

func AutoForget(ringDesc *Desc, forgetPeriod time.Duration, logger log.Logger) {
	for id, instance := range ringDesc.Ingesters {
		lastHeartbeat := time.Unix(instance.GetTimestamp(), 0)

		if time.Since(lastHeartbeat) > forgetPeriod {
			level.Warn(d.logger).Log("msg", "auto-forgetting instance from the ring because it is unhealthy for a long time", "instance", id, "last_heartbeat", lastHeartbeat.String(), "forget_period", forgetPeriod)
			ringDesc.RemoveIngester(id)
		}
	}
}

if r.cfg.AutoForgetUnhealthyPeriods < 0 {
return
}

//remove counts for ingesters no longer in the ring
for id := range r.unhealthyPeriodsCount {
if _, ok := r.ringDesc.Ingesters[id]; !ok {
delete(r.unhealthyPeriodsCount, id)
}
}

//update counts for ingesters in the ring and forget ingesters
for id, instance := range r.ringDesc.Ingesters {
if r.IsHealthy(&instance, Reporting, r.KVClient.LastUpdateTime(r.key)) {
delete(r.unhealthyPeriodsCount, id)
} else {
if _, ok := r.unhealthyPeriodsCount[id]; !ok {
r.unhealthyPeriodsCount[id] = 0
}
r.unhealthyPeriodsCount[id] += 1

if r.unhealthyPeriodsCount[id] > r.cfg.AutoForgetUnhealthyPeriods {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
level.Info(r.logger).Log("msg", "Forgetting from ring", id)
return ringDesc, true, nil
}
r.KVClient.CAS(ctx, r.key, unregister)
delete(r.unhealthyPeriodsCount, id)
}
}
}
}

// updateRingMetrics updates ring metrics. Caller must be holding the Write lock!
func (r *Ring) updateRingMetrics(compareResult CompareResult) {
if compareResult == Equal {
Expand Down
Loading