Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata to push payload #9694

Merged
merged 39 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b799556
Add metadata to push payload
salvacorts Jun 13, 2023
b0810c8
Add metadata to tests
salvacorts Jun 13, 2023
b7ae131
update vendor push pkg
salvacorts Jun 13, 2023
9bec73a
WIP fix tests
salvacorts Jun 13, 2023
d876c21
Fix more tests
salvacorts Jun 13, 2023
9c6fdef
Revent some unwanted changes
salvacorts Jun 13, 2023
b4ed160
Remove more unwanted changes
salvacorts Jun 13, 2023
8c6f22e
Update vendor files
salvacorts Jun 13, 2023
c854d06
use json format for metadata labels in v1 push api
sandeepsukhani Jun 16, 2023
95ba8a2
rename metadataLabels to labels
sandeepsukhani Jun 16, 2023
7db3e54
Merge branch 'main' into salvacorts/metadata-push-payload
sandeepsukhani Jun 16, 2023
ea8f3f6
lint and some test fixes
sandeepsukhani Jun 16, 2023
73cf5dc
lint
sandeepsukhani Jun 16, 2023
7b77c69
lint lint lin...
sandeepsukhani Jun 16, 2023
44b0932
use map object instead of string representation for log labels in jso…
sandeepsukhani Jun 19, 2023
617e864
revert some accidental chagnes
sandeepsukhani Jun 19, 2023
f43dfab
Make labels in loghttp.Entry of type LabelSet
salvacorts Jun 29, 2023
384d556
Apply CR feedback
salvacorts Jun 29, 2023
4088112
Avoid allocations for empty labels
salvacorts Jun 29, 2023
0b0af55
Fix json encoding and add tests
salvacorts Jun 29, 2023
d4e311c
Change error message
salvacorts Jun 29, 2023
10d404e
Add more labels to tests cases
salvacorts Jun 29, 2023
fcd1e66
Fix encode and add more tests
salvacorts Jun 29, 2023
64d3d6c
Do not use struct
salvacorts Jun 30, 2023
96c804d
Merge branch 'main' into salvacorts/metadata-push-payload
sandeepsukhani Jul 5, 2023
7f25bb8
remove a TODO
sandeepsukhani Jul 5, 2023
752ae6b
rename to NonIndexedLabels
salvacorts Jul 10, 2023
4b67145
Add missing protos updates
salvacorts Jul 10, 2023
aa58916
Update vendor
salvacorts Jul 10, 2023
4c60a9e
Update after mod vendor
salvacorts Jul 10, 2023
4028758
Use slice of labels for json and proto structures
salvacorts Jul 11, 2023
8c76150
Fix format
salvacorts Jul 11, 2023
fb505b4
Move LabelsAdapter to push pkg
salvacorts Jul 12, 2023
ccaf192
Use same prometheus version in push pkg
salvacorts Jul 12, 2023
fb42e9c
Merge branch 'main' into salvacorts/metadata-push-payload
salvacorts Jul 12, 2023
eaf235c
Update vendor/github.com/grafana/loki/pkg/push/types.go
salvacorts Jul 12, 2023
7438809
Fix tests
salvacorts Jul 12, 2023
f6e6918
Don't use yolostring when deserializing push request
salvacorts Jul 12, 2023
f8c33aa
Remove yolostring
salvacorts Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)

Expand Down Expand Up @@ -86,13 +87,28 @@ func New(instanceID, token, baseURL string, opts ...Option) *Client {

// PushLogLine creates a new logline with the current time as timestamp
func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) error {
return c.pushLogLine(line, c.Now, extraLabels...)
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}

func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
}

// PushLogLineWithTimestamp creates a new logline at the given timestamp
// The timestamp has to be a Unix timestamp (epoch seconds)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
return c.pushLogLine(line, timestamp, extraLabelList...)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabels ...map[string]string) error {
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}

func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
var lbls labels.Labels
if len(logLabels) > 0 {
lbls = labels.FromMap(logLabels)
}
return c.pushLogLine(line, timestamp, lbls, extraLabelList...)
}

func formatTS(ts time.Time) string {
Expand All @@ -101,21 +117,22 @@ func formatTS(ts time.Time) string {

type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
Values [][]any `json:"values"`
}

// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)

s := stream{
Stream: map[string]string{
"job": "varlog",
},
Values: [][]string{
Values: [][]any{
{
formatTS(timestamp),
line,
logLabels,
},
},
}
Expand Down
1 change: 0 additions & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
)

Expand Down
10 changes: 5 additions & 5 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,13 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
// ingest logs to the previous period
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))

})

t.Run("query-lookback-default", func(t *testing.T) {
Expand Down
72 changes: 63 additions & 9 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"github.com/prometheus/prometheus/model/labels"
)

func init() {
Expand All @@ -16,8 +17,9 @@ func init() {

// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
Timestamp time.Time
Line string
NonIndexedLabels labels.Labels
}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand All @@ -27,25 +29,49 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
switch i {
case 0: // timestamp
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
if t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
var nonIndexedLabels labels.Labels
if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: yoloString(key),
Value: yoloString(value),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have doubts about using yoloString here.
if underlying array of value is reused , the values here will be also updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove yoloString here. I had used it earlier because the labels were anyways converted to string immediately. The references to string are now being retained longer so it is not safe to use yoloString here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in Slack, removed.

})
return nil
}); err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
Expand All @@ -67,6 +93,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var nonIndexedLabels labels.Labels
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -81,15 +108,30 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
return false
}
return true
case 2:
iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool {
labelValue := iter.ReadString()
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: labelName,
Value: labelValue,
})
return true
})
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
iter.ReportError("error reading entry", "array must have at least 2 and up to 3 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
Timestamp: ts,
Line: line,
NonIndexedLabels: nonIndexedLabels,
})
return true
}
Expand Down Expand Up @@ -126,6 +168,18 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
if len(e.NonIndexedLabels) > 0 {
stream.WriteMore()
stream.WriteObjectStart()
for i, lbl := range e.NonIndexedLabels {
if i > 0 {
stream.WriteMore()
}
stream.WriteObjectField(lbl.Name)
stream.WriteString(lbl.Value)
}
stream.WriteObjectEnd()
}
stream.WriteArrayEnd()
}

Expand Down
132 changes: 128 additions & 4 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"time"
"unsafe"

"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"

"github.com/buger/jsonparser"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
)

var (
Expand Down Expand Up @@ -57,9 +57,129 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}

// PushRequest models a log stream push
// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []*Stream `json:"streams"`
Streams []LogProtoStream `json:"streams"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this stored a pointer, now it stores a struct. Not sure we want this (would need to be tested independently for performance)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from this PR:
#1145

The only reason why I think this is a pointer is so we don't make a copy of the stream when calling NewStream which receives a ptr to loghttp.Stream
https://github.com/grafana/loki/pull/1145/files#diff-71619e1c80a73b34eade235a55d012a0ddbb3375b8d4ac89c1f4fd672145b915R34

With Sandeep's changes, we no longer use that since now we decode from json to loghttp.PushRequest to then cast it to logproto.PushRequest

Streams: *(*[]logproto.Stream)(unsafe.Pointer(&request.Streams)),

So having that as a ptr or not shouldn't make much difference.

}

// LogProtoStream helps with unmarshalling of each log stream for push request.
// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug:
type LogProtoStream logproto.Stream

func (s *LogProtoStream) UnmarshalJSON(data []byte) error {
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
switch string(key) {
case "stream":
labels := make(LabelSet)
err := jsonparser.ObjectEach(val, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
labels[yoloString(key)] = yoloString(val)
return nil
})
if err != nil {
return err
}
s.Labels = labels.String()
case "values":
if ty == jsonparser.Null {
return nil
}
entries, err := unmarshalHTTPToLogProtoEntries(val)
if err != nil {
return err
}
s.Entries = entries
}
return nil
})
return err
}

func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) {
var (
entries []logproto.Entry
parseError error
)
if _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
if err != nil || parseError != nil {
return
}
if ty == jsonparser.Null {
return
}
e, err := unmarshalHTTPToLogProtoEntry(value)
if err != nil {
parseError = err
return
}
entries = append(entries, e)
}); err != nil {
parseError = err
}

if parseError != nil {
return nil, parseError
}

return entries, nil
}

func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
var (
i int
parseError error
e logproto.Entry
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if (i == 0 || i == 1) && t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
} else if i == 2 && t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
switch i {
case 0: // timestamp
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // nonIndexedLabels
var nonIndexedLabels labels.Labels
err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: yoloString(key),
Value: yoloString(val),
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
})
return nil
})
if err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
if parseError != nil {
return e, parseError
}
return e, err
}

// ResultType holds the type of the result
Expand Down Expand Up @@ -441,3 +561,7 @@ func labelVolumeLimit(r *http.Request) error {

return nil
}

func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scariest thing in the PR ) we should use it only if we check that the buffer is not updated, however, a lot of functions in stack trace mean that somebody can do some optimization to reuse the slice and it will break all our implementation. Why do we need it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in Slack, removed.

Loading