Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: expanded postings fixes #126

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.34.0
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
github.com/segmentio/fasthash v1.0.3
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
go.opentelemetry.io/contrib/propagators/autoprop v0.54.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 h1:yoKAVkEVwAqbGbR8n87rHQ1dulL25rKloGadb3vm770=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30/go.mod h1:sH0u6fq6x4R5M7WxkoQFY/o7UaiItec0o1LinLCJNq8=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY=
Expand Down
23 changes: 14 additions & 9 deletions pkg/receive/expandedpostingscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"sync"
"time"

"github.com/cespare/xxhash"
"github.com/oklog/ulid"
"github.com/segmentio/fasthash/fnv1a"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -53,8 +53,8 @@ var (

const (
// size of the seed array. Each seed is a 64bits int (8 bytes)
// totaling 8mb.
seedArraySize = 1024 * 1024
// totaling 16mb.
seedArraySize = 4 * 1024 * 1024

numOfSeedsStripes = 512
)
Expand Down Expand Up @@ -127,7 +127,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
return
}

h := MemHashString(metricName)
h := memHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}

func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
h := memHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
Expand Down Expand Up @@ -249,13 +249,17 @@ func isHeadBlock(blockID ulid.ULID) bool {
}

func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
var metricName string
for _, m := range ms {
if m.Name == labels.MetricName && m.Type == labels.MatchEqual {
return m.Value, true
if metricName != "" {
return "", false
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention here is to return "", false if two Matchers match the given metric name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose is to not cache if there more than two matchers on __name__. Technically such queries don't make sense and will never return results but just in case to avoid filling the cache with empty items

}
metricName = m.Value
}
}

return "", false
return metricName, metricName != ""
}

// TODO(GiedriusS): convert Thanos caching system to be promised-based
Expand Down Expand Up @@ -320,6 +324,7 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
r.ts = c.timeNow()
c.created(k, r.sizeBytes)
c.expire()
loaded = r
}

if ok {
Expand Down Expand Up @@ -409,6 +414,6 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool
return r >= ttl
}

func MemHashString(str string) uint64 {
return xxhash.Sum64String(str)
func memHashString(str string) uint64 {
return fnv1a.HashString64(str)
}
4 changes: 2 additions & 2 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable {
}

if r.expandedPostingsCache {
args["--tsdb.head.expanded-postings-cache-size"] = "1000"
args["--tsdb.block.expanded-postings-cache-size"] = "1000"
args["--tsdb.head.expanded-postings-cache-size"] = "10"
args["--tsdb.block.expanded-postings-cache-size"] = "10"
}

if r.limit != 0 && r.metaMonitoring != "" {
Expand Down
120 changes: 120 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
Expand All @@ -24,6 +27,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -1324,3 +1328,119 @@ func TestReceiveCpnp(t *testing.T) {
}, v)

}

func TestExpandedCache(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("expanded-cache")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i1 := e2ethanos.NewReceiveBuilder(e, "ingestor1").WithIngestionEnabled().WithExpandedPostingsCache().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i1))

q1 := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q1))

i2 := e2ethanos.NewReceiveBuilder(e, "ingestor2").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i2))

q2 := e2ethanos.NewQuerierBuilder(e, "2", i2.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q2))

avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1",
e2ethanos.AvalancheOptions{
MetricCount: "500",
SeriesCount: "500",
MetricInterval: "3600",
SeriesInterval: "30",
ValueInterval: "30",

RemoteURL: e2ethanos.RemoteWriteEndpoint(i1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "10s",
RemoteBatchSize: "1000",
RemoteRequestCount: "5000",

TenantID: "test-tenant",
})
avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2",
e2ethanos.AvalancheOptions{
MetricCount: "500",
SeriesCount: "500",
MetricInterval: "3600",
SeriesInterval: "30",
ValueInterval: "30",

RemoteURL: e2ethanos.RemoteWriteEndpoint(i2.InternalEndpoint("remote-write")),
RemoteWriteInterval: "10s",
RemoteBatchSize: "1000",
RemoteRequestCount: "5000",

TenantID: "test-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2))

ss := []labels.Labels{
labels.FromStrings(model.MetricNameLabel, "avalanche_metric_mmmmm_0_110", "cycle_id", "0"),
}

rnd := rand.New(rand.NewSource(time.Now().Unix()))
opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(false),
promqlsmith.WithEnableAtModifier(false),
}

time.Sleep(60 * time.Second)

f := make(chan error)
for i := 0; i < 10; i++ {
go func() {
cl := promclient.NewDefaultClient()
for {
ps := promqlsmith.New(rnd, ss, opts...)

qry := ps.WalkInstantQuery()

tm := time.Now()

t.Log(qry.String())

res1, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q1.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{
Deduplicate: false,
})
if err != nil && (strings.Contains(err.Error(), "unknown response type") || strings.Contains(err.Error(), "overflows int64")) {
continue
}
testutil.Ok(t, err)

res2, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q2.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{
Deduplicate: false,
})

testutil.Ok(t, err)

for _, s := range res1 {
s.Value = 0
}
for _, s := range res2 {
s.Value = 0
}

if !res1.Equal(res2) {
f <- fmt.Errorf("Results are not equal %v %v %v", qry.String(), res1, res2)
}
}
}()
}

for {
select {
case err := <-f:
t.Fatal(err)
default:
}
time.Sleep(10 * time.Second)
}

}
Loading