Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add healthyInstancesInZoneCount to Lifecycler; update tests #526

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ type Lifecycler struct {
readySince time.Time

// Keeps stats updated at every heartbeat period
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
instancesInZoneCount int
zonesCount int
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
healthyInstancesInZoneCount int
instancesInZoneCount int
zonesCount int

tokenGenerator TokenGenerator
// The maximum time allowed to wait on the CanJoin() condition.
Expand Down Expand Up @@ -441,6 +442,15 @@ func (i *Lifecycler) InstancesCount() int {
return i.instancesCount
}

// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesInZoneCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()

return i.healthyInstancesInZoneCount
}

// InstancesInZoneCount returns the number of instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) InstancesInZoneCount() int {
Expand Down Expand Up @@ -913,6 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
instancesCount := 0
zones := map[string]int{}
healthyInstancesInZone := map[string]int{}

if ringDesc != nil {
now := time.Now()
Expand All @@ -924,6 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
healthyInstancesCount++
healthyInstancesInZone[ingester.Zone]++
}
}
}
Expand All @@ -932,6 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
i.countersLock.Lock()
i.healthyInstancesCount = healthyInstancesCount
i.instancesCount = instancesCount
i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone]
i.instancesInZoneCount = zones[i.cfg.Zone]
i.zonesCount = len(zones)
i.countersLock.Unlock()
Expand Down
112 changes: 106 additions & 6 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,86 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) {
})
}

func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

ctx := context.Background()

// Add the first ingester to the ring
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig1.JoinAfter = 100 * time.Millisecond
lifecyclerConfig1.Zone = "zone-a"

lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler1.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1))
defer services.StopAndAwaitTerminated(ctx, lifecycler1) // nolint:errcheck

// Assert the first ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 1
})

// Add the second ingester to the ring in the same zone
lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "ing2")
lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig2.JoinAfter = 100 * time.Millisecond
lifecyclerConfig2.Zone = "zone-a"

lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler2.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler2))
defer services.StopAndAwaitTerminated(ctx, lifecycler2) // nolint:errcheck

// Assert the second ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler2.HealthyInstancesInZoneCount() == 2
})

// Assert the first ingester count is updated
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 2
})

// Add the third ingester to the ring in a different zone
lifecyclerConfig3 := testLifecyclerConfig(ringConfig, "ing3")
lifecyclerConfig3.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig3.JoinAfter = 100 * time.Millisecond
lifecyclerConfig3.Zone = "zone-b"

lifecycler3, err := NewLifecycler(lifecyclerConfig3, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler3.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler3))
defer services.StopAndAwaitTerminated(ctx, lifecycler3) // nolint:errcheck

// Assert the third ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler3.HealthyInstancesInZoneCount() == 1
})

// Assert the first ingester count is correct
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 2
})

// Assert the second ingester count is correct
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler2.HealthyInstancesInZoneCount() == 2
})
}

func TestLifecycler_InstancesInZoneCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand All @@ -169,12 +249,13 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
ringConfig.KVStore.Mock = ringStore

instances := []struct {
zone string
healthy bool
expectedInstancesInZoneCount int
expectedInstancesCount int
expectedHealthyInstancesCount int
expectedZonesCount int
zone string
healthy bool
expectedInstancesInZoneCount int
expectedInstancesCount int
expectedHealthyInstancesCount int
expectedZonesCount int
expectedHealthyInstancesInZoneCount int
}{
{
zone: "zone-a",
Expand All @@ -187,6 +268,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 1,
// after adding a healthy instance in zone-a, expectedZonesCount is 1
expectedZonesCount: 1,
// after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount is 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-a",
Expand All @@ -199,6 +282,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 1,
// zone-a was already added, so expectedZonesCount remains 1
expectedZonesCount: 1,
// after adding an unhealthy instance in zone-a, expectedHealthyInstancesInZoneCount remains 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-a",
Expand All @@ -211,6 +296,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 2,
// zone-a was already added, so expectedZonesCount remains 1
expectedZonesCount: 1,
// after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount becomes 2
expectedHealthyInstancesInZoneCount: 2,
},
{
zone: "zone-b",
Expand All @@ -223,6 +310,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 3,
// after adding a healthy instance in zone-b, expectedZonesCount becomes 2
expectedZonesCount: 2,
// after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-c",
Expand All @@ -235,6 +324,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 3,
// after adding an unhealthy instance in zone-c, expectedZonesCount becomes 3
expectedZonesCount: 3,
// after adding an unhealthy instance in zone-c, expectedHealthyInstancesInZoneCount is 0
expectedHealthyInstancesInZoneCount: 0,
},
{
zone: "zone-c",
Expand All @@ -247,6 +338,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 4,
// zone-c was already added, so expectedZonesCount remains 3
expectedZonesCount: 3,
// after adding a healthy instance in zone-c, expectedHealthyInstancesInZoneCount is 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-b",
Expand All @@ -259,6 +352,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 5,
// zone-b was already added, so expectedZonesCount remains 3
expectedZonesCount: 3,
// after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 2
expectedHealthyInstancesInZoneCount: 2,
},
}

Expand Down Expand Up @@ -292,10 +387,15 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
return lifecycler.HealthyInstancesCount()
})

test.Poll(t, time.Duration(joinWaitMs)*time.Millisecond, instance.expectedHealthyInstancesInZoneCount, func() interface{} {
return lifecycler.HealthyInstancesInZoneCount()
})

require.Equal(t, instance.expectedInstancesInZoneCount, lifecycler.InstancesInZoneCount())
require.Equal(t, instance.expectedInstancesCount, lifecycler.InstancesCount())
require.Equal(t, instance.expectedHealthyInstancesCount, lifecycler.HealthyInstancesCount())
require.Equal(t, instance.expectedZonesCount, lifecycler.ZonesCount())
require.Equal(t, instance.expectedHealthyInstancesInZoneCount, lifecycler.HealthyInstancesInZoneCount())
}
}

Expand Down