Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Adds an interval paramater to query_range queries allowing a sampling of events to be returned based on the provided interval #1965

Merged
merged 6 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since)
cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from)
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
cmd.Flag("step", "Query resolution step width").DurationVar(&q.Step)
cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step)
cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval)
}

cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)
Expand Down
18 changes: 12 additions & 6 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,22 @@ accepts the following query parameters in the URL:
- `limit`: The max number of entries to return
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to one hour ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`.
- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`. Only applies to query types which produce a matrix response.
- `interval`: **Experimental, See Below** Only return entries at (or greater than) the specified interval, can be a `duration` format or float number of seconds. Only applies to queries which produce a stream response.
- `direction`: Determines the sort order of logs. Supported values are `forward` or `backward`. Defaults to `backward.`

Requests against this endpoint require Loki to query the index store in order to
find log streams for particular labels. Because the index store is spread out by
time, the time span covered by `start` and `end`, if large, may cause additional
load against the index server and result in a slow query.

In microservices mode, `/loki/api/v1/query_range` is exposed by the querier and the frontend.

##### Step vs Interval

Use the `step` parameter when making metric queries to Loki, or queries which return a matrix response. It is evaluated in exactly the same way Prometheus evaluates `step`. First the query will be evaluated at `start` and then evaluated again at `start + step` and again at `start + step + step` until `end` is reached. The result will be a matrix of the query result evaluated at each step.

Use the `interval` parameter when making log queries to Loki, or queries which return a stream response. It is evaluated by returning a log entry at `start`, then the next entry will be returned an entry with timestampe >= `start + interval`, and again at `start + interval + interval` and so on until `end` is reached. It does not fill missing entries.

**Note about the experimental nature of interval** This flag may be removed in the future, if so it will likely be in favor of a LogQL expression to perform similar behavior, however that is uncertain at this time. [Issue 1779](https://github.com/grafana/loki/issues/1779) was created to track the discussion, if you are using `interval` please go add your use case and thoughts to that issue.



Response:

```
Expand Down
6 changes: 5 additions & 1 deletion pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log
// QueryRange uses the /api/v1/query_range endpoint to execute a range query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
params := util.NewQueryStringBuilder()
params.SetString("query", queryStr)
params.SetInt32("limit", limit)
Expand All @@ -68,6 +68,10 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time,
params.SetInt("step", int64(step.Seconds()))
}

if interval != 0 {
params.SetInt("interval", int64(interval.Seconds()))
}

return c.doQuery(params.EncodeWithPath(queryRangePath), quiet)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Query struct {
Limit int
Forward bool
Step time.Duration
Interval time.Duration
Quiet bool
NoLabels bool
IgnoreLabelsKey []string
Expand Down Expand Up @@ -70,7 +71,7 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool)
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
} else {
resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Quiet)
resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Interval, q.Quiet)
}

if err != nil {
Expand Down Expand Up @@ -121,7 +122,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
if q.isInstant() {
query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit))
} else {
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.resultsDirection(), uint32(q.Limit))
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit))
}

// execute the query
Expand Down
31 changes: 21 additions & 10 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,15 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) {
if value == "" {
return time.Duration(defaultQueryRangeStep(start, end)) * time.Second, nil
}
return parseSecondsOrDuration(value)
}

if d, err := strconv.ParseFloat(value, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(value); err == nil {
return time.Duration(d), nil
func interval(r *http.Request) (time.Duration, error) {
value := r.Form.Get("interval")
if value == "" {
return 0, nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
return parseSecondsOrDuration(value)
}

// Match extracts and parses multiple matcher groups from a slice of strings
Expand Down Expand Up @@ -160,3 +157,17 @@ func parseDirection(value string, def logproto.Direction) (logproto.Direction, e
}
return logproto.Direction(d), nil
}

func parseSecondsOrDuration(value string) (time.Duration, error) {
if d, err := strconv.ParseFloat(value, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(value); err == nil {
return time.Duration(d), nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
}
55 changes: 55 additions & 0 deletions pkg/loghttp/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,61 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
}
}

func Test_interval(t *testing.T) {
tests := []struct {
name string
reqPath string
expected time.Duration
wantErr bool
}{
{
"valid_step_int",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5",
5 * time.Second,
false,
},
{
"valid_step_duration",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5m",
5 * time.Minute,
false,
},
{
"invalid",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=a",
0,
true,
},
{
"valid_0",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=0",
0,
false,
},
{
"valid_not_included",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000",
0,
false,
},
}
for _, testData := range tests {
testData := testData

t.Run(testData.name, func(t *testing.T) {
req := httptest.NewRequest("GET", testData.reqPath, nil)
err := req.ParseForm()
require.Nil(t, err)
actual, err := interval(req)
if testData.wantErr {
require.Error(t, err)
} else {
assert.Equal(t, testData.expected, actual)
}
})
}
}

func Test_parseTimestamp(t *testing.T) {

now := time.Now()
Expand Down
17 changes: 14 additions & 3 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

var (
errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
errNegativeInterval = errors.New("interval must be >= 0")
)

// QueryStatus holds the status of a query
Expand Down Expand Up @@ -239,6 +240,7 @@ type RangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Interval time.Duration
Query string
Direction logproto.Direction
Limit uint32
Expand Down Expand Up @@ -284,5 +286,14 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, errStepTooSmall
}

result.Interval, err = interval(r)
if err != nil {
return nil, err
}

if result.Interval < 0 {
return nil, errNegativeInterval
}

return &result, nil
}
4 changes: 4 additions & 0 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func TestParseRangeQuery(t *testing.T) {
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`),
}, nil, true},
{"negative interval",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1,interval=-1`),
}, nil, true},
{"good",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD&step=3600`),
Expand Down
39 changes: 28 additions & 11 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
Help: "LogQL query timings",
Buckets: prometheus.DefBuckets,
}, []string{"query_type"})
lastEntryMinTime = time.Unix(-100, 0)
)

// ValueTypeStreams promql.ValueType for log streams
Expand Down Expand Up @@ -68,7 +69,7 @@ func (opts *EngineOpts) applyDefault() {

// Engine interface used to construct queries
type Engine interface {
NewRangeQuery(qs string, start, end time.Time, step time.Duration, direction logproto.Direction, limit uint32) Query
NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
}

Expand Down Expand Up @@ -144,14 +145,15 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
// NewRangeQuery creates a new LogQL range query.
func (ng *engine) NewRangeQuery(
qs string,
start, end time.Time, step time.Duration,
start, end time.Time, step time.Duration, interval time.Duration,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
},
Expand All @@ -170,6 +172,7 @@ func (ng *engine) NewInstantQuery(
start: ts,
end: ts,
step: 0,
interval: 0,
direction: direction,
limit: limit,
},
Expand Down Expand Up @@ -199,7 +202,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
return streams, err
}

Expand Down Expand Up @@ -297,19 +300,33 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
return promql.Matrix{series}
}

func readStreams(i iter.EntryIterator, size uint32) (Streams, error) {
func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, interval time.Duration) (Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
for ; respSize < size && i.Next(); respSize++ {
// lastEntry should be a really old time so that the first comparison is always true, we use a negative
// value here because many unit tests start at time.Unix(0,0)
lastEntry := lastEntryMinTime
for respSize < size && i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
forwardShouldOutput := dir == logproto.FORWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(interval)) || i.Entry().Timestamp.After(lastEntry.Add(interval)))
backwardShouldOutput := dir == logproto.BACKWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(-interval)) || i.Entry().Timestamp.Before(lastEntry.Add(-interval)))
// If step == 0 output every line.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If step == 0 output every line.
// If interval == 0 output every line.

// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward or reverse step
if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput {
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
streams[labels] = stream
stream.Entries = append(stream.Entries, entry)
lastEntry = i.Entry().Timestamp
respSize++
}
stream.Entries = append(stream.Entries, entry)
}

result := make([]*logproto.Stream, 0, len(streams))
Expand Down
Loading