Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protection to store-gateway to not drop all blocks if unhealthy in the ring #1806

Merged
merged 3 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* [ENHANCEMENT] Admin: Admin API now has some styling. #1482 #1549
* [ENHANCEMENT] Alertmanager: added `insight=true` field to alertmanager dispatch logs. #1379
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run index header operations in a dedicated thread pool. This feature can be configured using `-blocks-storage.bucket-store.index-header-thread-pool-size` and is disabled by default. #1660
* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy in the ring. #1806
* [ENHANCEMENT] Querier: wait until inflight queries are completed when shutting down queriers. #1756 #1767
* [BUGFIX] Query-frontend: do not shard queries with a subquery unless the subquery is inside a shardable aggregation function call. #1542
* [BUGFIX] Query-frontend: added `component=query-frontend` label to results cache memcached metrics to fix a panic when Mimir is running in single binary mode and results cache is enabled. #1704
Expand Down
33 changes: 25 additions & 8 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,27 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin

// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits)
filterBlocksByRingSharding(subRing, s.instanceAddr, metas, loaded, synced, s.logger)
return nil
}
// As a protection, ensure the store-gateway instance is healthy in the ring. If it's unhealthy because it's failing
// to heartbeat or get updates from the ring, or even removed from the ring because of the auto-forget feature, then
// keep the previously loaded blocks.
// TODO test
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil || !set.Includes(s.instanceAddr) {
for blockID := range metas {
if _, ok := loaded[blockID]; ok {
level.Warn(s.logger).Log("msg", "store-gateway is unhealthy in the ring but block is kept because was previously loaded", "block", blockID.String(), "err", err)
} else {
level.Warn(s.logger).Log("msg", "store-gateway is unhealthy in the ring and block has been excluded because was not previously loaded", "block", blockID.String(), "err", err)

func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec, logger log.Logger) {
// Skip the block.
synced.WithLabelValues(shardExcludedMeta).Inc()
delete(metas, blockID)
}
}

return nil
}

r := GetShuffleShardingSubring(s.r, userID, s.limits)
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

for blockID := range metas {
Expand All @@ -97,9 +112,9 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
// If an error occurs while checking the ring, we keep the previously loaded blocks.
if err != nil {
if _, ok := loaded[blockID]; ok {
level.Warn(logger).Log("msg", "failed to check block owner but block is kept because was previously loaded", "block", blockID.String(), "err", err)
level.Warn(s.logger).Log("msg", "failed to check block owner but block is kept because was previously loaded", "block", blockID.String(), "err", err)
} else {
level.Warn(logger).Log("msg", "failed to check block owner and block has been excluded because was not previously loaded", "block", blockID.String(), "err", err)
level.Warn(s.logger).Log("msg", "failed to check block owner and block has been excluded because was not previously loaded", "block", blockID.String(), "err", err)

// Skip the block.
synced.WithLabelValues(shardExcludedMeta).Inc()
Expand All @@ -110,7 +125,7 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
}

// Keep the block if it is owned by the store-gateway.
if set.Includes(instanceAddr) {
if set.Includes(s.instanceAddr) {
continue
}

Expand All @@ -131,6 +146,8 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
synced.WithLabelValues(shardExcludedMeta).Inc()
delete(metas, blockID)
}

return nil
}

// GetShuffleShardingSubring returns the subring to be used for a given user. This function
Expand Down
37 changes: 33 additions & 4 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
replicationFactor int
limits ShardingLimits
setupRing func(*ring.Desc)
prevLoadedBlocks map[string]map[ulid.ULID]struct{}
expectedUsers []usersExpectation
expectedBlocks []blocksExpectation
}{
Expand Down Expand Up @@ -189,7 +190,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4 /* replicated: */, block3}},
},
},
"one unhealthy instance in the ring with RF = 1 and SS = 3": {
"one unhealthy instance in the ring with RF = 1, SS = 3 and NO previously loaded blocks": {
replicationFactor: 1,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3},
setupRing: func(r *ring.Desc) {
Expand All @@ -215,7 +216,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{}},
},
},
"one unhealthy instance in the ring with RF = 2 and SS = 3": {
"one unhealthy instance in the ring with RF = 2, SS = 3 and NO previously loaded blocks": {
replicationFactor: 2,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3},
setupRing: func(r *ring.Desc) {
Expand All @@ -240,7 +241,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{}},
},
},
"one unhealthy instance in the ring with RF = 2 and SS = 2": {
"one unhealthy instance in the ring with RF = 2, SS = 2 and NO previously loaded blocks": {
replicationFactor: 2,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
setupRing: func(r *ring.Desc) {
Expand All @@ -265,6 +266,34 @@ func TestShuffleShardingStrategy(t *testing.T) {
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }},
},
},
"one unhealthy instance in the ring with RF = 2, SS = 2 and some previously loaded blocks": {
replicationFactor: 2,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt)
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)

r.Ingesters["instance-3"] = ring.InstanceDesc{
Addr: "127.0.0.3",
Timestamp: time.Now().Add(-time.Hour).Unix(),
State: ring.ACTIVE,
Tokens: []uint32{block3Hash + 1},
}
},
prevLoadedBlocks: map[string]map[ulid.ULID]struct{}{
"instance-3": {block2: struct{}{}, block4: struct{}{}},
},
expectedUsers: []usersExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil},
{instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}},
},
expectedBlocks: []blocksExpectation{
{instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}},
{instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }},
{instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block2, block4 /* keeping the previously loaded blocks */}},
},
},
"LEAVING instance in the ring should continue to keep its shard blocks and they should NOT be replicated to another instance": {
replicationFactor: 1,
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2},
Expand Down Expand Up @@ -372,7 +401,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
block4: {},
}

err = filter.FilterBlocks(ctx, userID, metas, map[ulid.ULID]struct{}{}, synced)
err = filter.FilterBlocks(ctx, userID, metas, testData.prevLoadedBlocks[expected.instanceID], synced)
require.NoError(t, err)

var actualBlocks []ulid.ULID
Expand Down