Skip to content

Commit

Permalink
Uses custom json-iter decoder for log entries.
Browse files Browse the repository at this point in the history
Previously we were using json.Unmarshal for each line. However json-iter uses a Pool for each calls and I believe this can cause to increase memory usage.

For each line we would put in a pool the iterator to re-use it, once put in a pool, the last data is retained, since we handle millions of lines, this can cause problem, using a custom extensions, keep using a pool but at the root object only, not for each line.

On top of that we're going to process that json payload 50% faster.

```
❯ benchcmp  before.txt after.txt2
benchmark                          old ns/op     new ns/op     delta
Benchmark_DecodePushRequest-16     13509236      6677037       -50.57%
benchmark                          old allocs     new allocs     delta
Benchmark_DecodePushRequest-16     106149         38719          -63.52%
benchmark                          old bytes     new bytes     delta
Benchmark_DecodePushRequest-16     10350362      5222989       -49.54%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jan 12, 2021
1 parent c9b85b3 commit 7385735
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 65 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/klauspost/compress v1.9.5
github.com/mitchellh/mapstructure v1.3.3
github.com/moby/term v0.0.0-20200915141129-7f0af18e79f2 // indirect
github.com/modern-go/reflect2 v1.0.1
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -48,6 +47,8 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
case applicationJSON:
var err error

// todo once https://github.com/weaveworks/common/commit/73225442af7da93ec8f6a6e2f7c8aafaee3f8840 is in Loki.
// We can try to pass the body as bytes.buffer instead to avoid reading into another buffer.
if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
Expand Down
108 changes: 108 additions & 0 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package loghttp

import (
"strconv"
"time"
"unsafe"

jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
)

func init() {
jsoniter.RegisterExtension(&jsonExtension{})
}

// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
}

type jsonExtension struct {
jsoniter.DummyExtension
}

type sliceEntryDecoder struct {
}

func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
i := 0
var ts time.Time
var line string
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
case 0:
ts, ok = readTimestamp(iter)
i++
return ok
case 1:
line = iter.ReadString()
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
})
return true
}
return false
})
}

func readTimestamp(iter *jsoniter.Iterator) (time.Time, bool) {
s := iter.ReadString()
if iter.Error != nil {
return time.Time{}, false
}
t, err := strconv.ParseInt(s, 10, 64)
if err != nil {
iter.ReportError("error reading entry timestamp", err.Error())
return time.Time{}, false

}
return time.Unix(0, t), true
}

type entryEncoder struct{}

func (entryEncoder) IsEmpty(ptr unsafe.Pointer) bool {
// we don't omit-empty with log entries.
return false
}

func (entryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
e := *((*Entry)(ptr))
stream.WriteArrayStart()
stream.WriteRaw(`"`)
stream.WriteRaw(strconv.FormatInt(e.Timestamp.UnixNano(), 10))
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
stream.WriteArrayEnd()
}

func (e *jsonExtension) CreateDecoder(typ reflect2.Type) jsoniter.ValDecoder {
if typ == reflect2.TypeOf([]Entry{}) {
return sliceEntryDecoder{}
}
return nil
}

func (e *jsonExtension) CreateEncoder(typ reflect2.Type) jsoniter.ValEncoder {
if typ == reflect2.TypeOf(Entry{}) {
return entryEncoder{}
}
return nil
}
42 changes: 3 additions & 39 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"time"
"unsafe"

Expand All @@ -31,7 +30,7 @@ const (
QueryStatusFail = "fail"
)

//QueryResponse represents the http json response to a label query
// QueryResponse represents the http json response to a Loki range and instant query
type QueryResponse struct {
Status string `json:"status"`
Data QueryResponseData `json:"data"`
Expand All @@ -58,7 +57,7 @@ type ResultValue interface {
Type() ResultType
}

//QueryResponseData represents the http json response to a label query
// QueryResponseData represents the http json response to a label query
type QueryResponseData struct {
ResultType ResultType `json:"resultType"`
Result ResultValue `json:"result"`
Expand Down Expand Up @@ -92,18 +91,12 @@ func (s Streams) ToProto() []logproto.Stream {
return result
}

//Stream represents a log stream. It includes a set of log entries and their labels.
// Stream represents a log stream. It includes a set of log entries and their labels.
type Stream struct {
Labels LabelSet `json:"stream"`
Entries []Entry `json:"values"`
}

//Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
unmarshal := struct {
Expand Down Expand Up @@ -152,35 +145,6 @@ func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
return nil
}

// MarshalJSON implements the json.Marshaler interface.
func (e *Entry) MarshalJSON() ([]byte, error) {
l, err := json.Marshal(e.Line)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[\"%d\",%s]", e.Timestamp.UnixNano(), l)), nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (e *Entry) UnmarshalJSON(data []byte) error {
var unmarshal []string

err := json.Unmarshal(data, &unmarshal)
if err != nil {
return err
}

t, err := strconv.ParseInt(unmarshal[0], 10, 64)
if err != nil {
return err
}

e.Timestamp = time.Unix(0, t)
e.Line = unmarshal[1]

return nil
}

// Scalar is a single timestamp/float with no labels
type Scalar model.Scalar

Expand Down
3 changes: 0 additions & 3 deletions pkg/logql/marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
// WriteQueryResponseJSON marshals the promql.Value to v1 loghttp JSON and then
// writes it to the provided io.Writer.
func WriteQueryResponseJSON(v logql.Result, w io.Writer) error {

value, err := NewResultValue(v.Data)

if err != nil {
return err
}
Expand Down Expand Up @@ -52,7 +50,6 @@ func WriteLabelResponseJSON(l logproto.LabelResponse, w io.Writer) error {
// then writes it to the provided connection.
func WriteTailResponseJSON(r legacy.TailResponse, c *websocket.Conn) error {
v1Response, err := NewTailResponse(r)

if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/logql/marshal/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ func Test_TailResponseMarshalLoop(t *testing.T) {
}

func Test_WriteSeriesResponseJSON(t *testing.T) {

for i, tc := range []struct {
input logproto.SeriesResponse
expected string
Expand Down Expand Up @@ -512,3 +511,13 @@ func testJSONBytesEqual(t *testing.T, expected []byte, actual []byte, msg string

require.Equalf(t, expectedValue, actualValue, msg, args)
}

func Benchmark_Encode(b *testing.B) {
buf := bytes.NewBuffer(nil)

for n := 0; n < b.N; n++ {
for _, queryTest := range queryTests {
require.NoError(b, WriteQueryResponseJSON(logql.Result{Data: queryTest.actual}, buf))
}
}
}
24 changes: 4 additions & 20 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package unmarshal

import (
"io"
"unsafe"

json "github.com/json-iterator/go"

Expand All @@ -13,12 +14,9 @@ import (
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
var request loghttp.PushRequest

err := json.NewDecoder(b).Decode(&request)

if err != nil {
if err := json.NewDecoder(b).Decode(&request); err != nil {
return err
}

*r = NewPushRequest(request)

return nil
Expand All @@ -39,22 +37,8 @@ func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) logproto.Stream {
ret := logproto.Stream{
Entries: make([]logproto.Entry, len(s.Entries)),
return logproto.Stream{
Entries: *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)),
Labels: s.Labels.String(),
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
}

return ret
}

// NewEntry constructs a logproto.Entry from a Entry
func NewEntry(e loghttp.Entry) logproto.Entry {
return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
35 changes: 34 additions & 1 deletion pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package unmarshal

import (
"fmt"
"io/ioutil"
"strings"
"testing"
Expand Down Expand Up @@ -44,7 +45,6 @@ var pushTests = []struct {
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))
Expand All @@ -55,3 +55,36 @@ func Test_DecodePushRequest(t *testing.T) {
require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

func Benchmark_DecodePushRequest(b *testing.B) {
requestFmt := `{
"streams": [
{
"stream": {
"test": "test",
"foo" : "bar"
},
"values":[
%s
]
}
]
}`
var entries strings.Builder
for i := 0; i < 10000; i++ {
entries.WriteString(`[ "123456789012345", "WARN [CompactionExecutor:61771] 2021-01-12 09:40:23,192 TimeWindowCompactionController.java:41 - You are running with sstables overlapping checks disabled, it can result in loss of data" ],`)
}
requestString := fmt.Sprintf(requestFmt, entries.String()[:len(entries.String())-1])
r := strings.NewReader("")

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
var actual logproto.PushRequest
r.Reset(requestString)
err := DecodePushRequest(r, &actual)
require.NoError(b, err)
require.Equal(b, 10000, len(actual.Streams[0].Entries))
}
}
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ github.com/moby/term/windows
# github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
## explicit
github.com/modern-go/reflect2
# github.com/morikuni/aec v1.0.0
github.com/morikuni/aec
Expand Down

0 comments on commit 7385735

Please sign in to comment.