Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compactor auto-forget from ring on unhealthy #6563

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

eeldaly
Copy link
Contributor

@eeldaly eeldaly commented Jan 28, 2025

What this PR does:

  • Add auto_forget_unhealthy_periods for ring to auto forget unhealthy compactors.
  • Add LifecyclerDelegate interface to normal Lifecycler with OnRingInstanceHeartbeat

Which issue(s) this PR fixes:
Fixes #6533

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

eeldaly and others added 6 commits December 20, 2024 13:45
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
Signed-off-by: Ben Ye <benye@amazon.com>
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
pkg/ring/ring.go Outdated
@@ -168,6 +169,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.")
f.IntVar(&cfg.AutoForgetUnhealthyPeriods, prefix+"ring.auto-forget-unhealthy-periods", -1, "The number of heartbeat periods to occur after an ingester becomes unhealthy before it is forgotten from the ring. -1 to disable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flag needed? Is it used to auto forget Ingesters? I don't know if this is what we want to do in this PR at least

pkg/ring/ring.go Outdated
@@ -655,6 +659,43 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]int64) {
return numTokens, owned
}

// autoForgetUnhealthy forgets unhealthy ingesters from the ring.
func (r *Ring) AutoForgetUnhealthy(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding AutoForgetUnhealthy here I prefer to reuse the existing NewAutoForgetDelegate.
To make that work, we need to call i.delegate.OnRingInstanceHeartbeat(i, ctx) inside i.updateConsul and pass the same parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

it should be similar to how

func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {

works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking to @eeldaly offline I realized that the parameter is a bit different so probably hard to add the same handler.

This seems cleaner to me. We can switch AutoForgetDelegate to call this function instead.

func AutoForget(ringDesc *Desc, forgetPeriod time.Duration, logger log.Logger) {
	for id, instance := range ringDesc.Ingesters {
		lastHeartbeat := time.Unix(instance.GetTimestamp(), 0)

		if time.Since(lastHeartbeat) > forgetPeriod {
			level.Warn(d.logger).Log("msg", "auto-forgetting instance from the ring because it is unhealthy for a long time", "instance", id, "last_heartbeat", lastHeartbeat.String(), "forget_period", forgetPeriod)
			ringDesc.RemoveIngester(id)
		}
	}
}

@@ -601,6 +624,9 @@ func (i *Lifecycler) heartbeat(ctx context.Context) {
i.lifecyclerMetrics.consulHeartbeats.Inc()
ctx, cancel := context.WithTimeout(ctx, i.cfg.HeartbeatPeriod)
defer cancel()
if i.delegate != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of explicitly checking i.deletegate != nil, can we define a noopDelegate as the default which does nothing?

type LifecyclerDelegate interface {
// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
OnRingInstanceHeartbeat(lifecycler *Lifecycler, ctx context.Context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see that the same function parameter to be reused. The current interface is a bit weird and we are not using the lifecycler at all in the compactor implementation

Copy link
Contributor

@yeya24 yeya24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I think you need a rebase. It contains some commits from your previous changes

CHANGELOG.md Outdated
@@ -67,6 +67,7 @@
* [ENHANCEMENT] Added metric name in limiter per-metric exceeded errors. #6416
* [ENHANCEMENT] StoreGateway: Added `cortex_bucket_store_indexheader_load_duration_seconds` and `cortex_bucket_store_indexheader_download_duration_seconds` metrics for time of downloading and loading index header files. #6445
* [ENHANCEMENT] Blocks Storage: Allow use of non-dualstack endpoints for S3 blocks storage via `-blocks-storage.s3.disable-dualstack`. #6522
* [ENHANCEMENT] Add `auto_forget_unhealthy_periods` for ring to auto forget unhealthy compactors. #6533
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to add under ## master / unreleased

"github.com/cortexproject/cortex/pkg/ring"
)

func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.Lifecycler, ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do similar to the basicLifecycler.

Create the generic delegate for the ring and make compactor use it.

pkg/ring/ring.go Outdated
@@ -655,6 +659,43 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]int64) {
return numTokens, owned
}

// autoForgetUnhealthy forgets unhealthy ingesters from the ring.
func (r *Ring) AutoForgetUnhealthy(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

it should be similar to how

func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {

works

…LifecyclerDelegate

Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
@@ -972,6 +1001,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
instanceDesc.Zone = i.Zone
instanceDesc.RegisteredTimestamp = i.getRegisteredAt().Unix()
ringDesc.Ingesters[i.ID] = instanceDesc
i.delegate.OnRingInstanceHeartbeat(i, ringDesc)
Copy link
Contributor

@yeya24 yeya24 Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this line outside the else block. We should call OnRingInstanceHeartbeat no matter the instance exists in the ring or not. Same behavior as UpdateInstance in the basic lifecycler


func (d *LifecyclerAutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *Lifecycler, ringDesc *Desc) {
AutoForgetFromRing(ringDesc, d.forgetPeriod, d.logger)
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we know there is not really a next delegate doing anything, I feel we can safely remove next here. I don't see a usecase to really chain this here for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this next here @yeya24
It will be painful if someone add a new delegate and try to debug why it is not working. Easier just follow the pattern. We can refactor all these delegates when we are joining both lifecycler into to one

Copy link
Contributor

@danielblando danielblando left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test in compactor for it?

@@ -333,6 +334,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.")
f.IntVar(&cfg.BlockFilesConcurrency, "compactor.block-files-concurrency", 10, "Number of goroutines to use when fetching/uploading block files from object storage.")
f.IntVar(&cfg.BlocksFetchConcurrency, "compactor.blocks-fetch-concurrency", 3, "Number of goroutines to use when fetching blocks from object storage when compacting.")
f.DurationVar(&cfg.AutoForgetDelay, "compactor.auto-forget-delay", 15*time.Minute, "Time since last heartbeat before compactor will be removed from ring. 0 to disable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ruler, alertmanager and sg, we dont have a config for this. We usually just do 2x heartbeat timeout.

I am fine adding a new config, but lets add the default as 2*heartbeat_timeout.


func (d *LifecyclerAutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *Lifecycler, ringDesc *Desc) {
AutoForgetFromRing(ringDesc, d.forgetPeriod, d.logger)
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this next here @yeya24
It will be painful if someone add a new delegate and try to debug why it is not working. Easier just follow the pattern. We can refactor all these delegates when we are joining both lifecycler into to one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Compactor becomes unhealthy in the ring when stopped during startup
3 participants