Skip to content

Commit

Permalink
Fixes unmarshalling of tailing responses. (#3237)
Browse files Browse the repository at this point in the history
* Fixes unmarshalling of tailing responses.

Some websocket were still using old json package.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes missing update of generated files.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 26, 2021
1 parent 6856209 commit a5cb38d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 22 deletions.
5 changes: 2 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/build"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

var (
Expand Down Expand Up @@ -303,7 +304,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
tss = append(tss, *ts)
}

}
default:
return nil, fmt.Errorf("unexpected result type, expected a log stream result instead received %v", value.Type())
Expand All @@ -313,7 +313,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}

func (r *Reader) run() {

r.closeAndReconnect()

tailResponse := &loghttp.TailResponse{}
Expand All @@ -332,7 +331,7 @@ func (r *Reader) run() {
// Set a read timeout of 10x the interval we expect to see messages
// Ignore the error as it will get caught when we call ReadJSON
_ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval))
err := r.conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, r.conn)
if err != nil {
fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err)
// Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection
Expand Down
5 changes: 2 additions & 3 deletions pkg/logcli/query/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

// TailQuery connects to the Loki websocket endpoint and tails logs
Expand Down Expand Up @@ -43,7 +44,7 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
}

for {
err := conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
log.Println("Error reading stream:", err)
return
Expand All @@ -52,7 +53,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
labels := loghttp.LabelSet{}
for _, stream := range tailResponse.Streams {
if !q.NoLabels {

if len(q.IgnoreLabelsKey) > 0 || len(q.ShowLabelsKey) > 0 {

ls := stream.Labels
Expand All @@ -70,7 +70,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
} else {
labels = stream.Labels
}

}

for _, entry := range stream.Entries {
Expand Down
15 changes: 15 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"unsafe"

json "github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -42,3 +43,17 @@ func NewStream(s *loghttp.Stream) logproto.Stream {
Labels: s.Labels.String(),
}
}

// WebsocketReader knows how to read message to a websocket connection.
type WebsocketReader interface {
ReadMessage() (int, []byte, error)
}

// ReadTailResponseJSON unmarshals the loghttp.TailResponse from a websocket reader.
func ReadTailResponseJSON(r *loghttp.TailResponse, reader WebsocketReader) error {
_, data, err := reader.ReadMessage()
if err != nil {
return err
}
return jsoniter.Unmarshal(data, r)
}
42 changes: 42 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/loghttp"
legacy_loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/marshal"
)

// covers requests to /loki/api/v1/push
Expand Down Expand Up @@ -88,3 +91,42 @@ func Benchmark_DecodePushRequest(b *testing.B) {
require.Equal(b, 10000, len(actual.Streams[0].Entries))
}
}

type websocket struct {
buf []byte
}

func (ws *websocket) WriteMessage(t int, data []byte) error {
ws.buf = append(ws.buf, data...)
return nil
}

func (ws *websocket) ReadMessage() (int, []byte, error) {
return 0, ws.buf, nil
}

func Test_ReadTailResponse(t *testing.T) {
ws := &websocket{}
require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{
Streams: []logproto.Stream{
{Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}},
},
DroppedEntries: []legacy_loghttp.DroppedEntry{
{Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`},
},
}, ws))
res := &loghttp.TailResponse{}
require.NoError(t, ReadTailResponseJSON(res, ws))

require.Equal(t, &loghttp.TailResponse{
Streams: []loghttp.Stream{
{
Labels: loghttp.LabelSet{"app": "bar"},
Entries: []loghttp.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}},
},
},
DroppedStreams: []loghttp.DroppedStream{
{Timestamp: time.Unix(0, 1), Labels: loghttp.LabelSet{"app": "foo"}},
},
}, res)
}
16 changes: 0 additions & 16 deletions pkg/promtail/server/ui/assets_vfsdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ var Assets = func() http.FileSystem {
name: "static",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
},
"/static/.DS_Store": &vfsgen۰CompressedFileInfo{
name: ".DS_Store",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
uncompressedSize: 6148,

compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xec\x97\x51\xcb\x93\x60\x14\xc7\xcf\xa3\xf6\xf6\xcc\x88\xbc\x08\xaa\x8b\xc0\x9b\xee\x46\x4c\x23\x6a\x17\x81\xd9\xba\xd8\x5d\x60\xd4\xc5\x8a\x4d\xa7\x6c\x86\xf3\x19\xea\x26\x65\xc6\xee\xfb\x06\xf5\x21\xfa\x1e\x7d\x81\x3e\x4e\xe1\x3c\x85\x9a\x75\xf9\x32\xc6\xf9\xc1\xf8\x39\x3c\x3e\xc3\xff\xd0\xe7\x1c\x00\x60\xf6\xce\x37\x00\x34\x00\xe0\x50\x5b\x51\xa1\x17\x8e\x9f\xbf\x90\x1a\x05\xac\x5a\x43\x2c\xbd\x48\x78\xb8\x1c\x41\x10\xa7\x4e\xf5\xec\xca\xb0\x84\x14\xd2\x69\xd4\x7a\x7e\x9f\x02\xc0\xfd\x9f\x47\x8e\x95\x32\x84\xb0\x81\x55\xa7\xea\x5b\xa7\x4a\x82\x77\xdd\x95\xd8\xdd\xbe\x1a\x2f\x4f\xb7\x58\xf3\xdd\xdb\x46\x61\x9a\x8d\x46\x3f\x98\x24\x2b\x57\x2e\xae\x72\xae\x72\xf5\xba\xfa\xd6\x59\x8b\xdc\xc9\xdc\x6c\x97\xda\x6e\x32\xab\xbe\xbd\x70\xb3\xb5\x87\xc7\x2f\x85\x88\xfe\x1c\xbb\xde\xab\x30\xc8\xe7\xda\xcd\x67\x22\xce\xdc\x30\x0e\x92\xe3\xc5\xa1\x1f\x78\x6e\xf2\xe6\x75\x18\xfb\x22\xb7\xc5\x2e\xf6\xd3\x59\xe3\x04\xe7\x03\x3e\x98\x6b\x77\x8a\xc2\x18\x9b\x43\xdd\x1c\x9b\xe5\x50\x2f\x0c\xd3\x7c\x30\xd4\x1f\x3e\x7e\x54\x96\x03\x7e\xeb\x9e\xf1\x64\xba\xd8\xbc\xff\x50\x7c\x2c\x3f\x7d\xa9\x43\x63\x0c\xd3\xbb\xd1\x49\xf3\xeb\xef\xbb\xdb\x3b\x49\x1c\x89\x78\x55\xbf\x19\xe1\x02\xf6\x10\x40\x0c\x3e\x08\x48\x3a\xe9\x7c\xee\xa4\xd3\xaa\x3d\xdb\x94\x5a\x77\xd9\x49\x8b\xb8\x1c\x30\x6b\x7e\x8d\xa2\x20\x08\xa2\xe7\xfd\xa0\xa3\x2d\xf4\xa1\x36\xc3\xf3\x12\x5a\x69\x5c\xa3\xa1\x75\xb4\x85\x3e\xd4\x66\x58\x27\xa1\x15\x34\x47\x6b\x68\x1d\x6d\xa1\x0f\xad\x4d\x85\xe1\xf0\xc1\xf0\x97\x19\x4e\x28\x0c\x47\x0f\xa6\xa3\x2d\xfa\x1f\x09\xa2\x0f\xb9\x96\x56\xed\xff\xcf\xff\x3d\xff\x13\x04\x71\xce\xfb\xbc\x32\x71\x26\xf6\x7f\x86\x2f\x09\x1b\x81\x45\xa3\x31\xe8\x6b\x04\xb0\xb6\xda\x8a\x6f\x37\x1a\x02\x6a\x04\x08\xe2\xc4\xf8\x15\x00\x00\xff\xff\xd8\xf0\xdf\x78\x04\x18\x00\x00"),
},
"/static/css": &vfsgen۰DirInfo{
name: "css",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
Expand Down Expand Up @@ -78,13 +71,6 @@ var Assets = func() http.FileSystem {
name: "vendor",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
},
"/static/vendor/.DS_Store": &vfsgen۰CompressedFileInfo{
name: ".DS_Store",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
uncompressedSize: 6148,

compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xec\x98\xc1\x8e\xd2\x40\x18\xc7\xff\x33\x8b\xeb\x80\x31\xdb\xc3\x1a\xf5\x60\xd2\x8b\x37\x20\xdb\xb2\x46\xf7\x60\x82\xb8\x1e\xf6\x66\x82\xd1\xc3\x6a\x76\x67\xb6\x64\xa9\xa9\x1d\xd2\x0e\x92\x15\x31\xdc\x7d\x03\x7d\x08\xdf\xc3\x17\xf0\x71\x34\xb4\x9f\xa6\xb0\xd5\xb3\x9a\xef\x97\xc0\xaf\xd0\x99\x3f\x81\x8f\xb6\xdf\x14\x80\x18\x4c\xa3\x00\xf0\x00\x28\x94\x96\x4d\xd4\xa2\xe8\x71\x09\x49\xde\x5e\xe5\x15\x19\x39\x1c\x32\x68\x4c\xd0\x01\xf3\x2f\xb0\xaa\xdd\x0e\x0c\x2c\x2c\xdc\x5a\xfd\xf6\xd1\x45\x80\x2e\x7a\x47\x89\x3d\x33\x89\x35\xf4\x37\x79\x04\xa0\xfb\xbd\xa0\x48\xd8\xad\x99\xdd\x43\x07\x0e\x17\x98\x60\x04\x8d\x71\xf1\x1c\x6d\xe4\x7c\xd9\xc8\xb9\x51\x93\xb3\x8f\x0e\xce\x91\x14\x49\x63\xc4\x38\x83\x45\x8a\x7c\x3d\x49\xdc\xd9\x48\x92\x78\x7d\x69\xcc\xc7\xba\x31\x66\x96\x4f\x68\xcc\x57\x33\x49\xe2\xdc\xed\xed\x7d\x13\x72\xab\x71\x65\xfb\xaa\x52\x2d\xd5\xba\xde\x7a\x35\x1c\xdb\xd9\xd0\x69\x37\xcd\x07\x3a\x3b\x5e\xbd\x7a\xaa\xdd\xd8\xd0\xf6\x33\x6b\x93\x5f\xdb\xda\x3c\x8f\x47\xb3\x13\x6f\xf7\xb1\x4d\x9d\x8e\xd3\x51\x56\x4c\x8e\xa3\x91\xd1\xd9\xcb\x17\x71\x1a\xd9\xd9\xc0\x4e\xd3\x28\x3f\xae\xec\x50\xaa\xa9\x9a\x27\xde\xed\xf9\x3c\x38\x08\xdb\x7e\x78\x10\x2e\xda\xfe\x3c\x08\xc3\x5e\xdb\xbf\xf7\xe0\xfe\x62\xd1\x54\x37\xef\x06\x0f\x8f\x4e\xdf\x5c\xbc\x9b\xbf\x5f\x7c\xf8\x54\x16\x4e\x08\xaa\xe0\xce\x46\x45\x3f\xff\xfc\x76\x6f\x87\x59\x9a\xd8\xf4\xbc\x3c\x3a\x19\x86\x61\x2a\xd0\x59\x41\x5d\xe3\x9f\x82\x61\x98\x9a\xf3\x83\x4f\xee\x93\x97\xa5\x05\xed\x97\xe4\x46\x65\x8e\x47\xf6\xc9\x7d\xf2\xb2\xb4\xa0\x71\x92\xdc\x20\x2b\xb2\x47\xf6\xc9\x7d\xf2\x72\xad\xed\x11\xb4\xf8\x10\xf4\xc9\x82\x56\x28\xc2\x23\xfb\xe4\x3e\xd7\x91\x61\xea\xd8\x2a\xe5\xad\xae\xff\x4f\x7e\xbf\xfe\x67\x18\xe6\x7f\xbe\xce\x37\x0e\x87\x87\x83\x3f\xdc\x26\x90\xd4\x08\x9c\x56\x1a\x83\x6a\x23\x50\x6d\x02\x64\x79\xb3\xf0\x56\xe5\x7d\x6e\x04\x18\xe6\x2f\xe3\x47\x00\x00\x00\xff\xff\x27\xc4\x76\x81\x04\x18\x00\x00"),
},
"/static/vendor/bootstrap-4.1.3": &vfsgen۰DirInfo{
name: "bootstrap-4.1.3",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
Expand Down Expand Up @@ -489,7 +475,6 @@ var Assets = func() http.FileSystem {
fs["/templates"].(os.FileInfo),
}
fs["/static"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
fs["/static/.DS_Store"].(os.FileInfo),
fs["/static/css"].(os.FileInfo),
fs["/static/img"].(os.FileInfo),
fs["/static/js"].(os.FileInfo),
Expand All @@ -506,7 +491,6 @@ var Assets = func() http.FileSystem {
fs["/static/js/targets.js"].(os.FileInfo),
}
fs["/static/vendor"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
fs["/static/vendor/.DS_Store"].(os.FileInfo),
fs["/static/vendor/bootstrap-4.1.3"].(os.FileInfo),
fs["/static/vendor/bootstrap3-typeahead"].(os.FileInfo),
fs["/static/vendor/bootstrap4-glyphicons"].(os.FileInfo),
Expand Down

0 comments on commit a5cb38d

Please sign in to comment.