From 912b9611c0fa2c1178dc07fa3304e6d4ddecf4ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 9 Dec 2024 12:36:03 +0200 Subject: [PATCH] receive: expanded postings fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Increase hash seed size. Perhaps in prod we have more than 1 million unique metric names in some tenant(-s)? - Add test that fuzzes queries against two receivers. One of them has expanded postings cache enabled. - Use fnv1a instead of xxhash like in Cortex. Perhaps fnv1a has a more uniform distribution? Not really sure why that would have any impact. Also, I've tested this in prod with some ad-hoc feature flag functionality through os.Stat(). Cannot see any impact anymore. Signed-off-by: Giedrius Statkevičius --- go.mod | 1 + go.sum | 2 + pkg/receive/expandedpostingscache/cache.go | 23 ++-- test/e2e/e2ethanos/services.go | 4 +- test/e2e/receive_test.go | 120 +++++++++++++++++++++ 5 files changed, 139 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index c3aba21dbf..b856043127 100644 --- a/go.mod +++ b/go.mod @@ -119,6 +119,7 @@ require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.34.0 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 + github.com/segmentio/fasthash v1.0.3 github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb diff --git a/go.sum b/go.sum index a2f615ae39..3dc07f2b3b 100644 --- a/go.sum +++ b/go.sum @@ -2195,6 +2195,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 h1:yoKAVkEVwAqbGbR8n87rHQ1dulL25rKloGadb3vm770= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30/go.mod h1:sH0u6fq6x4R5M7WxkoQFY/o7UaiItec0o1LinLCJNq8= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4= github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY= diff --git a/pkg/receive/expandedpostingscache/cache.go b/pkg/receive/expandedpostingscache/cache.go index d5c7069735..e59ae8273a 100644 --- a/pkg/receive/expandedpostingscache/cache.go +++ b/pkg/receive/expandedpostingscache/cache.go @@ -14,8 +14,8 @@ import ( "sync" "time" - "github.com/cespare/xxhash" "github.com/oklog/ulid" + "github.com/segmentio/fasthash/fnv1a" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -53,8 +53,8 @@ var ( const ( // size of the seed array. Each seed is a 64bits int (8 bytes) - // totaling 8mb. - seedArraySize = 1024 * 1024 + // totaling 16mb. + seedArraySize = 4 * 1024 * 1024 numOfSeedsStripes = 512 ) @@ -127,7 +127,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { return } - h := MemHashString(metricName) + h := memHashString(metricName) i := h % uint64(len(c.headSeedByMetricName)) l := h % uint64(len(c.strippedLock)) c.strippedLock[l].Lock() @@ -198,7 +198,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. } func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { - h := MemHashString(metricName) + h := memHashString(metricName) i := h % uint64(len(c.headSeedByMetricName)) l := h % uint64(len(c.strippedLock)) c.strippedLock[l].RLock() @@ -249,13 +249,17 @@ func isHeadBlock(blockID ulid.ULID) bool { } func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { + var metricName string for _, m := range ms { if m.Name == labels.MetricName && m.Type == labels.MatchEqual { - return m.Value, true + if metricName != "" { + return "", false + } + metricName = m.Value } } - return "", false + return metricName, metricName != "" } // TODO(GiedriusS): convert Thanos caching system to be promised-based @@ -320,6 +324,7 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) r.ts = c.timeNow() c.created(k, r.sizeBytes) c.expire() + loaded = r } if ok { @@ -409,6 +414,6 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool return r >= ttl } -func MemHashString(str string) uint64 { - return xxhash.Sum64String(str) +func memHashString(str string) uint64 { + return fnv1a.HashString64(str) } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b4448aa633..b463049a0f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -668,8 +668,8 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable { } if r.expandedPostingsCache { - args["--tsdb.head.expanded-postings-cache-size"] = "1000" - args["--tsdb.block.expanded-postings-cache-size"] = "1000" + args["--tsdb.head.expanded-postings-cache-size"] = "10" + args["--tsdb.block.expanded-postings-cache-size"] = "10" } if r.limit != 0 && r.metaMonitoring != "" { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 2d5003274f..a67737b58d 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -7,13 +7,16 @@ import ( "context" "fmt" "log" + "math/rand" "net/http" "net/http/httputil" "net/url" "os" + "strings" "testing" "time" + "github.com/cortexproject/promqlsmith" "github.com/efficientgo/core/backoff" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" @@ -24,6 +27,7 @@ import ( "github.com/golang/snappy" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" @@ -1324,3 +1328,119 @@ func TestReceiveCpnp(t *testing.T) { }, v) } + +func TestExpandedCache(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("expanded-cache") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + i1 := e2ethanos.NewReceiveBuilder(e, "ingestor1").WithIngestionEnabled().WithExpandedPostingsCache().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i1)) + + q1 := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q1)) + + i2 := e2ethanos.NewReceiveBuilder(e, "ingestor2").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i2)) + + q2 := e2ethanos.NewQuerierBuilder(e, "2", i2.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q2)) + + avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1", + e2ethanos.AvalancheOptions{ + MetricCount: "500", + SeriesCount: "500", + MetricInterval: "3600", + SeriesInterval: "30", + ValueInterval: "30", + + RemoteURL: e2ethanos.RemoteWriteEndpoint(i1.InternalEndpoint("remote-write")), + RemoteWriteInterval: "10s", + RemoteBatchSize: "1000", + RemoteRequestCount: "5000", + + TenantID: "test-tenant", + }) + avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2", + e2ethanos.AvalancheOptions{ + MetricCount: "500", + SeriesCount: "500", + MetricInterval: "3600", + SeriesInterval: "30", + ValueInterval: "30", + + RemoteURL: e2ethanos.RemoteWriteEndpoint(i2.InternalEndpoint("remote-write")), + RemoteWriteInterval: "10s", + RemoteBatchSize: "1000", + RemoteRequestCount: "5000", + + TenantID: "test-tenant", + }) + + testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2)) + + ss := []labels.Labels{ + labels.FromStrings(model.MetricNameLabel, "avalanche_metric_mmmmm_0_110", "cycle_id", "0"), + } + + rnd := rand.New(rand.NewSource(time.Now().Unix())) + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(false), + promqlsmith.WithEnableAtModifier(false), + } + + time.Sleep(60 * time.Second) + + f := make(chan error) + for i := 0; i < 10; i++ { + go func() { + cl := promclient.NewDefaultClient() + for { + ps := promqlsmith.New(rnd, ss, opts...) + + qry := ps.WalkInstantQuery() + + tm := time.Now() + + t.Log(qry.String()) + + res1, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q1.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil && (strings.Contains(err.Error(), "unknown response type") || strings.Contains(err.Error(), "overflows int64")) { + continue + } + testutil.Ok(t, err) + + res2, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q2.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{ + Deduplicate: false, + }) + + testutil.Ok(t, err) + + for _, s := range res1 { + s.Value = 0 + } + for _, s := range res2 { + s.Value = 0 + } + + if !res1.Equal(res2) { + f <- fmt.Errorf("Results are not equal %v %v %v", qry.String(), res1, res2) + } + } + }() + } + + for { + select { + case err := <-f: + t.Fatal(err) + default: + } + time.Sleep(10 * time.Second) + } + +}