Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce shardable probabilistic topk for instant queries. (backport k227) #14765

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ querier_rf1:
# CLI flag: -querier-rf1.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of labels the heap of a topk query using a count min
# sketch can track.
# CLI flag: -querier-rf1.engine.max-count-min-sketch-heap-size
[max_count_min_sketch_heap_size: <int> | default = 10000]

# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier-rf1.max-concurrent
Expand Down Expand Up @@ -3841,6 +3846,9 @@ otlp_config:
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]

# List of LogQL vector and range aggregations that should be sharded.
[shard_aggregations: <list of strings>]

# Enable metric aggregation. When enabled, pushed streams will be sampled for
# bytes and count, and these metric will be written back into Loki as a special
# __aggregated_metric__ stream, which can be queried for faster histogram
Expand Down Expand Up @@ -4245,6 +4253,11 @@ engine:
# CLI flag: -querier.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of labels the heap of a topk query using a count min
# sketch can track.
# CLI flag: -querier.engine.max-count-min-sketch-heap-size
[max_count_min_sketch_heap_size: <int> | default = 10000]

# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier.max-concurrent
Expand Down
111 changes: 110 additions & 1 deletion integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time",
"-querier.shard-aggregations=quantile_over_time,approx_topk",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
Expand Down Expand Up @@ -784,6 +784,115 @@ func TestOTLPLogsIngestQuery(t *testing.T) {
})
}

func TestProbabilisticQuery(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// the run querier.
var (
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time,approx_topk",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("lineA", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))
require.NoError(t, cliDistributor.PushLogLine("lineB", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))

require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "one"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "two"}))
})

t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunQuery(context.Background(), `approx_topk(1, count_over_time({job=~".+"}[1h]))`)
require.NoError(t, err)
assert.Equal(t, "vector", resp.Data.ResultType)

var values []string
var labels []string
for _, value := range resp.Data.Vector {
values = append(values, value.Value)
labels = append(labels, value.Metric["job"])
}
assert.ElementsMatch(t, []string{"3"}, values)
assert.ElementsMatch(t, []string{"one"}, labels)
})
}

func TestCategorizedLabels(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
Expand Down
Loading
Loading