Skip to content

Commit

Permalink
Ring: Log unhealthy replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Oct 19, 2021
1 parent 3a65fbd commit e190bd5
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 30 deletions.
29 changes: 23 additions & 6 deletions ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
)

Expand All @@ -14,10 +16,12 @@ type ReplicationStrategy interface {
Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)
}

type defaultReplicationStrategy struct{}
type defaultReplicationStrategy struct {
logger log.Logger
}

func NewDefaultReplicationStrategy() ReplicationStrategy {
return &defaultReplicationStrategy{}
func NewDefaultReplicationStrategy(logger log.Logger) ReplicationStrategy {
return &defaultReplicationStrategy{logger: logger}
}

// Filter decides, given the set of instances eligible for a key,
Expand All @@ -40,10 +44,12 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed instances
// will cause the whole write to fail.
var skipped []string
for i := 0; i < len(instances); {
if instances[i].IsHealthy(op, heartbeatTimeout, now) {
i++
} else {
skipped = append(skipped, instances[i].Addr)
instances = append(instances[:i], instances[i+1:]...)
}
}
Expand All @@ -54,8 +60,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
var err error

if zoneAwarenessEnabled {
level.Error(s.logger).Log("msg",
fmt.Sprintf("at least %d live replicas required across different availability zones, could only find %d",
minSuccess, len(instances)), "unhealthy", skipped)
err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances))
} else {
level.Error(s.logger).Log("msg",
fmt.Sprintf("at least %d live replicas required, could only find %d",
minSuccess, len(instances)), "unhealthy", skipped)
err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances))
}

Expand All @@ -65,25 +77,30 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
return instances, len(instances) - minSuccess, nil
}

type ignoreUnhealthyInstancesReplicationStrategy struct{}
type ignoreUnhealthyInstancesReplicationStrategy struct {
logger log.Logger
}

func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy {
return &ignoreUnhealthyInstancesReplicationStrategy{}
func NewIgnoreUnhealthyInstancesReplicationStrategy(logger log.Logger) ReplicationStrategy {
return &ignoreUnhealthyInstancesReplicationStrategy{logger: logger}
}

func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []InstanceDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) {
now := time.Now()
// Filter out unhealthy instances.
var skipped []string
for i := 0; i < len(instances); {
if instances[i].IsHealthy(op, heartbeatTimeout, now) {
i++
} else {
skipped = append(skipped, instances[i].Addr)
instances = append(instances[:i], instances[i+1:]...)
}
}

// We need at least 1 healthy instance no matter what is the replication factor set to.
if len(instances) == 0 {
level.Error(r.logger).Log("msg", "failed to find any healthy ring replicas", "unhealthy", skipped)
return nil, 0, errors.New("at least 1 healthy replica required, could only find 0")
}

Expand Down
5 changes: 3 additions & 2 deletions ring/replication_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -89,7 +90,7 @@ func TestRingReplicationStrategy(t *testing.T) {
}

t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
strategy := NewDefaultReplicationStrategy()
strategy := NewDefaultReplicationStrategy(log.NewNopLogger())
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false)
if tc.expectedError == "" {
assert.NoError(t, err)
Expand Down Expand Up @@ -151,7 +152,7 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) {
}

t.Run(tc.name, func(t *testing.T) {
strategy := NewIgnoreUnhealthyInstancesReplicationStrategy()
strategy := NewIgnoreUnhealthyInstancesReplicationStrategy(log.NewNopLogger())
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false)
if tc.expectedError == "" {
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger)
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(logger), reg, logger)
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) {
Expand Down
36 changes: 18 additions & 18 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func benchmarkBatch(b *testing.B, numInstances, numKeys int) {
r := Ring{
cfg: cfg,
ringDesc: desc,
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

ctx := context.Background()
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestDoBatchZeroInstances(t *testing.T) {
r := Ring{
cfg: Config{},
ringDesc: desc,
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}
require.Error(t, DoBatch(ctx, Write, &r, keys, callback, cleanup))
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) {
ringTokensByZone: r.getTokensByZone(),
ringInstanceByToken: r.getTokensInfo(),
ringZones: getZones(r.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

_, bufHosts, bufZones := MakeBuffersForGet()
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) {
ringTokensByZone: r.getTokensByZone(),
ringInstanceByToken: r.getTokensInfo(),
ringZones: getZones(r.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

instances := make([]InstanceDesc, 0, len(r.GetIngesters()))
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestRing_GetAllHealthy(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

set, err := ring.GetAllHealthy(Read)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

set, err := ring.GetReplicationSetForOperation(Read)
Expand Down Expand Up @@ -821,7 +821,7 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing.
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Check the replication set has the correct settings
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestRing_ShuffleShard(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

shardRing := ring.ShuffleShard("tenant-id", testData.shardSize)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

for i := 1; i <= numTenants; i++ {
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Compute the shard for each tenant.
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Compute the initial shard for each tenant.
Expand Down Expand Up @@ -1240,7 +1240,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Get the replication set with shard size = 3.
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Get the replication set with shard size = 2.
Expand Down Expand Up @@ -1576,7 +1576,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Replay the events on the timeline.
Expand Down Expand Up @@ -1641,7 +1641,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// The simulation starts with the minimum shard size. Random events can later increase it.
Expand Down Expand Up @@ -1794,7 +1794,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
lastTopologyChange: time.Now(),
}

Expand Down Expand Up @@ -1822,7 +1822,7 @@ func BenchmarkRing_Get(b *testing.B) {
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
lastTopologyChange: time.Now(),
}

Expand Down Expand Up @@ -1850,7 +1850,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) {
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
lastTopologyChange: time.Now(),
}

Expand Down
7 changes: 4 additions & 3 deletions ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

startTime := time.Now()
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing.
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Add 1 new instance after some time.
Expand Down Expand Up @@ -206,7 +207,7 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
}

// Keep changing the ring.
Expand Down

0 comments on commit e190bd5

Please sign in to comment.