Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: aggregated metric volume queries #14412

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
93cc007
feat: aggregated metric volume queries
trevorwhitney Oct 2, 2024
c78c5fa
feat: add range support to agg metric volume queries
trevorwhitney Oct 7, 2024
55b0aa1
chore: lint and format
trevorwhitney Oct 7, 2024
4ea549a
chore: fix tests, add more requestion validation
trevorwhitney Oct 8, 2024
2135959
chore: Make metric for dequeued tasks in bloom-gateway a Histogram (#…
chaudum Oct 8, 2024
103e020
fix(storage/chunk/client/aws): have GetObject check for canceled cont…
rfratto Oct 8, 2024
d5ce63e
fix(kafka): Set namespace for Loki kafka metrics (#14426)
benclive Oct 8, 2024
0592591
docs: Updated Promtail to Alloy (#14404)
Jayclifford345 Oct 8, 2024
0ee464f
feat(kafka): Enable querier to optionally query partition ingesters (…
benclive Oct 9, 2024
7e589db
chore: Log errors when processing a download task fails (#14436)
chaudum Oct 9, 2024
35bca10
fix: Revert "fix(deps): update module github.com/shirou/gopsutil/v4 t…
trevorwhitney Oct 9, 2024
8963b0e
chore: Rename new querier flag to use dashes (#14438)
benclive Oct 9, 2024
964928d
chore(operator): Update build and runtime deps (#14416)
periklis Oct 9, 2024
4a4fe50
feat(Helm): Update Loki Helm chart for restricted environments (#14440)
davidham Oct 10, 2024
3d500b8
chore: Add new field to "stats-report" log line in bloom gateway (#14…
chaudum Oct 10, 2024
92bae79
docs: remove reference to Agent Flow (#14449)
JStickler Oct 10, 2024
8e94ee6
docs: Revise the LogQL Analyzer topic (#14374)
JStickler Oct 10, 2024
b4d2567
fix(ci): updated helm diff rendering workflow (#14424)
vlad-diachenko Oct 10, 2024
5dadb6d
docs: Update alloy-otel-logs.md to correct a typo (#13827)
wcall Oct 10, 2024
887db47
fix: level detection for warning level (#14444)
trevorwhitney Oct 10, 2024
edcd09a
ci: speciy golangci-lint build tags at runtime (#14456)
trevorwhitney Oct 10, 2024
f9213a2
fix: nix build, downgrade toolchain to go1.23.1 (#14442)
trevorwhitney Oct 10, 2024
9d29b15
fix: always do a range query
trevorwhitney Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,13 @@ func NewVolumeInstantQueryWithDefaults(matchers string) *logproto.VolumeRequest
}

type VolumeInstantQuery struct {
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I tripped over the naming here. It might be better to use a name like UseAggregatedMetrics or FromAggregatedMetrics

}

func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
Expand All @@ -590,11 +591,14 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
return nil, err
}

aggregatedMetrics := volumeAggregatedMetrics(r)

svInstantQuery := VolumeInstantQuery{
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}

svInstantQuery.Start, svInstantQuery.End, err = bounds(r)
Expand All @@ -610,13 +614,14 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
}

type VolumeRangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
}

func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
Expand All @@ -635,14 +640,17 @@ func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
return nil, err
}

aggregatedMetrics := volumeAggregatedMetrics(r)

return &VolumeRangeQuery{
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}, nil
}

Expand Down Expand Up @@ -734,3 +742,8 @@ func volumeAggregateBy(r *http.Request) (string, error) {

return "", errors.New("invalid aggregation option")
}

func volumeAggregatedMetrics(r *http.Request) bool {
l := r.Form.Get("aggregatedMetrics")
return l == "true"
}
95 changes: 74 additions & 21 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
}

func Test_ParseVolumeInstantQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -318,7 +318,7 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
url = `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
Expand Down Expand Up @@ -347,17 +347,47 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.EqualError(t, err, "invalid aggregation option")
})
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
}

func Test_ParseVolumeRangeQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -378,13 +408,6 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`

t.Run("labels", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url + `&aggregateBy=labels`)}

Expand All @@ -406,5 +429,35 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
_, err = ParseVolumeRangeQuery(req)
require.EqualError(t, err, "invalid aggregation option")
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
})
}
Loading
Loading