Skip to content

Commit

Permalink
Fixes unaligned shards between ingesters and storage. (#4087)
Browse files Browse the repository at this point in the history
* Fixes unaligned shards between ingesters and storage.

Currently shards computation are not correctly aligned between ingester and storage causing bad metric results.

- Fixed the hash caculation which was missing the metric name `logs`.
- Revert some changes from 5f0e245 which was not keeping the alignement of shards.

For example if the shard is 9 of 16 total, then it should also be 9 and 25 of 32. The previous commit was trying to use 10 and 11 of 32.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* better tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Aug 3, 2021
1 parent d6f2d4a commit 10bd165
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
25 changes: 11 additions & 14 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,15 @@ func (ii *InvertedIndex) getShards(shard *astmapper.ShardAnnotation) []*indexSha
return ii.shards
}

indexFactor := int(ii.totalShards)
// calculate the start of the hash ring desired
lowerBound := shard.Shard * indexFactor / shard.Of
// calculate the end of the hash ring desired
upperBound := (shard.Shard + 1) * indexFactor / shard.Of
// see if the upper bound is cleanly doesn't align cleanly with the next shard
// which can happen when the schema sharding factor and inverted index
// sharding factor are not multiples of each other.
rem := (shard.Shard + 1) * indexFactor % shard.Of
if rem > 0 {
// there's overlap on the upper shard
upperBound = upperBound + 1
totalRequested := int(ii.totalShards) / shard.Of
result := make([]*indexShard, totalRequested)
var j int
for i := 0; i < totalRequested; i++ {
subShard := ((shard.Shard) + (i * shard.Of))
result[j] = ii.shards[subShard]
j++
}

return ii.shards[lowerBound:upperBound]
return result
}

func validateShard(totalShards uint32, shard *astmapper.ShardAnnotation) error {
Expand Down Expand Up @@ -131,6 +125,9 @@ func labelsSeriesID(ls labels.Labels, dest []byte) {

// Backwards-compatible with model.Metric.String()
func labelsString(b *bytes.Buffer, ls labels.Labels) {
// metrics name is used in the store for computing shards.
// see chunk/schema_util.go for more details. `labelsString()`
b.WriteString("logs")
b.WriteByte('{')
i := 0
for _, l := range ls {
Expand Down
51 changes: 29 additions & 22 deletions pkg/ingester/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,10 @@ func Test_GetShards(t *testing.T) {
{16, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}},

// idx factor a larger multiple of schema factor
{32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 1}},
{32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{8, 9}},
{32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{30, 31}},
{64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{60, 61, 62, 63}},

// schema factor is a larger multiple of idx factor
{16, &astmapper.ShardAnnotation{Shard: 0, Of: 32}, []uint32{0}},
{16, &astmapper.ShardAnnotation{Shard: 4, Of: 32}, []uint32{2}},
{16, &astmapper.ShardAnnotation{Shard: 15, Of: 32}, []uint32{7}},

// idx factor smaller but not a multiple of schema factor
{4, &astmapper.ShardAnnotation{Shard: 0, Of: 5}, []uint32{0}},
{4, &astmapper.ShardAnnotation{Shard: 1, Of: 5}, []uint32{0, 1}},
{4, &astmapper.ShardAnnotation{Shard: 4, Of: 5}, []uint32{3}},

// schema factor smaller but not a multiple of idx factor
{8, &astmapper.ShardAnnotation{Shard: 0, Of: 5}, []uint32{0, 1}},
{8, &astmapper.ShardAnnotation{Shard: 2, Of: 5}, []uint32{3, 4}},
{8, &astmapper.ShardAnnotation{Shard: 3, Of: 5}, []uint32{4, 5, 6}},
{8, &astmapper.ShardAnnotation{Shard: 4, Of: 5}, []uint32{6, 7}},
{32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}},
{32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}},
{32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}},
{64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}},
} {
tt := tt
t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) {
Expand Down Expand Up @@ -95,9 +79,9 @@ func TestDeleteAddLoopkup(t *testing.T) {
}
sort.Sort(cortexpb.FromLabelAdaptersToLabels(lbs))

require.Equal(t, uint32(7), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
require.Equal(t, uint32(26), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
// make sure we consistent
require.Equal(t, uint32(7), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
require.Equal(t, uint32(26), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
index.Add(lbs, model.Fingerprint((cortexpb.FromLabelAdaptersToLabels(lbs).Hash())))
index.Delete(cortexpb.FromLabelAdaptersToLabels(lbs), model.Fingerprint(cortexpb.FromLabelAdaptersToLabels(lbs).Hash()))
ids, err := index.Lookup([]*labels.Matcher{
Expand All @@ -106,3 +90,26 @@ func TestDeleteAddLoopkup(t *testing.T) {
require.NoError(t, err)
require.Len(t, ids, 0)
}

func Test_hash_mapping(t *testing.T) {
lbs := labels.Labels{
labels.Label{Name: "compose_project", Value: "loki-boltdb-storage-s3"},
labels.Label{Name: "compose_service", Value: "ingester-2"},
labels.Label{Name: "container_name", Value: "loki-boltdb-storage-s3_ingester-2_1"},
labels.Label{Name: "filename", Value: "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log"},
labels.Label{Name: "host", Value: "docker-desktop"},
labels.Label{Name: "source", Value: "stderr"},
}

for _, shard := range []uint32{16, 32, 64, 128} {
t.Run(fmt.Sprintf("%d", shard), func(t *testing.T) {
ii := NewWithShards(shard)
ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1)

res, err := ii.Lookup([]*labels.Matcher{{Type: labels.MatchEqual, Name: "compose_project", Value: "loki-boltdb-storage-s3"}}, &astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16})
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, model.Fingerprint(1), res[0])
})
}
}

0 comments on commit 10bd165

Please sign in to comment.