Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes unmarshalling of tailing responses. #3237

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/build"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

var (
Expand Down Expand Up @@ -303,7 +304,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
tss = append(tss, *ts)
}

}
default:
return nil, fmt.Errorf("unexpected result type, expected a log stream result instead received %v", value.Type())
Expand All @@ -313,7 +313,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}

func (r *Reader) run() {

r.closeAndReconnect()

tailResponse := &loghttp.TailResponse{}
Expand All @@ -332,7 +331,7 @@ func (r *Reader) run() {
// Set a read timeout of 10x the interval we expect to see messages
// Ignore the error as it will get caught when we call ReadJSON
_ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval))
err := r.conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, r.conn)
if err != nil {
fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err)
// Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection
Expand Down
5 changes: 2 additions & 3 deletions pkg/logcli/query/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

// TailQuery connects to the Loki websocket endpoint and tails logs
Expand Down Expand Up @@ -43,7 +44,7 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
}

for {
err := conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
log.Println("Error reading stream:", err)
return
Expand All @@ -52,7 +53,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
labels := loghttp.LabelSet{}
for _, stream := range tailResponse.Streams {
if !q.NoLabels {

if len(q.IgnoreLabelsKey) > 0 || len(q.ShowLabelsKey) > 0 {

ls := stream.Labels
Expand All @@ -70,7 +70,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
} else {
labels = stream.Labels
}

}

for _, entry := range stream.Entries {
Expand Down
15 changes: 15 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"unsafe"

json "github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -42,3 +43,17 @@ func NewStream(s *loghttp.Stream) logproto.Stream {
Labels: s.Labels.String(),
}
}

// WebsocketReader knows how to read message to a websocket connection.
type WebsocketReader interface {
ReadMessage() (int, []byte, error)
}

// ReadTailResponseJSON unmarshals the loghttp.TailResponse from a websocket reader.
func ReadTailResponseJSON(r *loghttp.TailResponse, reader WebsocketReader) error {
_, data, err := reader.ReadMessage()
if err != nil {
return err
}
return jsoniter.Unmarshal(data, r)
}
42 changes: 42 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/loghttp"
legacy_loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/marshal"
)

// covers requests to /loki/api/v1/push
Expand Down Expand Up @@ -88,3 +91,42 @@ func Benchmark_DecodePushRequest(b *testing.B) {
require.Equal(b, 10000, len(actual.Streams[0].Entries))
}
}

type websocket struct {
buf []byte
}

func (ws *websocket) WriteMessage(t int, data []byte) error {
ws.buf = append(ws.buf, data...)
return nil
}

func (ws *websocket) ReadMessage() (int, []byte, error) {
return 0, ws.buf, nil
}

func Test_ReadTailResponse(t *testing.T) {
ws := &websocket{}
require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{
Streams: []logproto.Stream{
{Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}},
},
DroppedEntries: []legacy_loghttp.DroppedEntry{
{Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`},
},
}, ws))
res := &loghttp.TailResponse{}
require.NoError(t, ReadTailResponseJSON(res, ws))

require.Equal(t, &loghttp.TailResponse{
Streams: []loghttp.Stream{
{
Labels: loghttp.LabelSet{"app": "bar"},
Entries: []loghttp.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}},
},
},
DroppedStreams: []loghttp.DroppedStream{
{Timestamp: time.Unix(0, 1), Labels: loghttp.LabelSet{"app": "foo"}},
},
}, res)
}
16 changes: 0 additions & 16 deletions pkg/promtail/server/ui/assets_vfsdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ var Assets = func() http.FileSystem {
name: "static",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
},
"/static/.DS_Store": &vfsgen۰CompressedFileInfo{
name: ".DS_Store",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
uncompressedSize: 6148,

compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xec\x97\x51\xcb\x93\x60\x14\xc7\xcf\xa3\xf6\xf6\xcc\x88\xbc\x08\xaa\x8b\xc0\x9b\xee\x46\x4c\x23\x6a\x17\x81\xd9\xba\xd8\x5d\x60\xd4\xc5\x8a\x4d\xa7\x6c\x86\xf3\x19\xea\x26\x65\xc6\xee\xfb\x06\xf5\x21\xfa\x1e\x7d\x81\x3e\x4e\xe1\x3c\x85\x9a\x75\xf9\x32\xc6\xf9\xc1\xf8\x39\x3c\x3e\xc3\xff\xd0\xe7\x1c\x00\x60\xf6\xce\x37\x00\x34\x00\xe0\x50\x5b\x51\xa1\x17\x8e\x9f\xbf\x90\x1a\x05\xac\x5a\x43\x2c\xbd\x48\x78\xb8\x1c\x41\x10\xa7\x4e\xf5\xec\xca\xb0\x84\x14\xd2\x69\xd4\x7a\x7e\x9f\x02\xc0\xfd\x9f\x47\x8e\x95\x32\x84\xb0\x81\x55\xa7\xea\x5b\xa7\x4a\x82\x77\xdd\x95\xd8\xdd\xbe\x1a\x2f\x4f\xb7\x58\xf3\xdd\xdb\x46\x61\x9a\x8d\x46\x3f\x98\x24\x2b\x57\x2e\xae\x72\xae\x72\xf5\xba\xfa\xd6\x59\x8b\xdc\xc9\xdc\x6c\x97\xda\x6e\x32\xab\xbe\xbd\x70\xb3\xb5\x87\xc7\x2f\x85\x88\xfe\x1c\xbb\xde\xab\x30\xc8\xe7\xda\xcd\x67\x22\xce\xdc\x30\x0e\x92\xe3\xc5\xa1\x1f\x78\x6e\xf2\xe6\x75\x18\xfb\x22\xb7\xc5\x2e\xf6\xd3\x59\xe3\x04\xe7\x03\x3e\x98\x6b\x77\x8a\xc2\x18\x9b\x43\xdd\x1c\x9b\xe5\x50\x2f\x0c\xd3\x7c\x30\xd4\x1f\x3e\x7e\x54\x96\x03\x7e\xeb\x9e\xf1\x64\xba\xd8\xbc\xff\x50\x7c\x2c\x3f\x7d\xa9\x43\x63\x0c\xd3\xbb\xd1\x49\xf3\xeb\xef\xbb\xdb\x3b\x49\x1c\x89\x78\x55\xbf\x19\xe1\x02\xf6\x10\x40\x0c\x3e\x08\x48\x3a\xe9\x7c\xee\xa4\xd3\xaa\x3d\xdb\x94\x5a\x77\xd9\x49\x8b\xb8\x1c\x30\x6b\x7e\x8d\xa2\x20\x08\xa2\xe7\xfd\xa0\xa3\x2d\xf4\xa1\x36\xc3\xf3\x12\x5a\x69\x5c\xa3\xa1\x75\xb4\x85\x3e\xd4\x66\x58\x27\xa1\x15\x34\x47\x6b\x68\x1d\x6d\xa1\x0f\xad\x4d\x85\xe1\xf0\xc1\xf0\x97\x19\x4e\x28\x0c\x47\x0f\xa6\xa3\x2d\xfa\x1f\x09\xa2\x0f\xb9\x96\x56\xed\xff\xcf\xff\x3d\xff\x13\x04\x71\xce\xfb\xbc\x32\x71\x26\xf6\x7f\x86\x2f\x09\x1b\x81\x45\xa3\x31\xe8\x6b\x04\xb0\xb6\xda\x8a\x6f\x37\x1a\x02\x6a\x04\x08\xe2\xc4\xf8\x15\x00\x00\xff\xff\xd8\xf0\xdf\x78\x04\x18\x00\x00"),
},
"/static/css": &vfsgen۰DirInfo{
name: "css",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
Expand Down Expand Up @@ -78,13 +71,6 @@ var Assets = func() http.FileSystem {
name: "vendor",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
},
"/static/vendor/.DS_Store": &vfsgen۰CompressedFileInfo{
name: ".DS_Store",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
uncompressedSize: 6148,

compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xec\x98\xc1\x8e\xd2\x40\x18\xc7\xff\x33\x8b\xeb\x80\x31\xdb\xc3\x1a\xf5\x60\xd2\x8b\x37\x20\xdb\xb2\x46\xf7\x60\x82\xb8\x1e\xf6\x66\x82\xd1\xc3\x6a\x76\x67\xb6\x64\xa9\xa9\x1d\xd2\x0e\x92\x15\x31\xdc\x7d\x03\x7d\x08\xdf\xc3\x17\xf0\x71\x34\xb4\x9f\xa6\xb0\xd5\xb3\x9a\xef\x97\xc0\xaf\xd0\x99\x3f\x81\x8f\xb6\xdf\x14\x80\x18\x4c\xa3\x00\xf0\x00\x28\x94\x96\x4d\xd4\xa2\xe8\x71\x09\x49\xde\x5e\xe5\x15\x19\x39\x1c\x32\x68\x4c\xd0\x01\xf3\x2f\xb0\xaa\xdd\x0e\x0c\x2c\x2c\xdc\x5a\xfd\xf6\xd1\x45\x80\x2e\x7a\x47\x89\x3d\x33\x89\x35\xf4\x37\x79\x04\xa0\xfb\xbd\xa0\x48\xd8\xad\x99\xdd\x43\x07\x0e\x17\x98\x60\x04\x8d\x71\xf1\x1c\x6d\xe4\x7c\xd9\xc8\xb9\x51\x93\xb3\x8f\x0e\xce\x91\x14\x49\x63\xc4\x38\x83\x45\x8a\x7c\x3d\x49\xdc\xd9\x48\x92\x78\x7d\x69\xcc\xc7\xba\x31\x66\x96\x4f\x68\xcc\x57\x33\x49\xe2\xdc\xed\xed\x7d\x13\x72\xab\x71\x65\xfb\xaa\x52\x2d\xd5\xba\xde\x7a\x35\x1c\xdb\xd9\xd0\x69\x37\xcd\x07\x3a\x3b\x5e\xbd\x7a\xaa\xdd\xd8\xd0\xf6\x33\x6b\x93\x5f\xdb\xda\x3c\x8f\x47\xb3\x13\x6f\xf7\xb1\x4d\x9d\x8e\xd3\x51\x56\x4c\x8e\xa3\x91\xd1\xd9\xcb\x17\x71\x1a\xd9\xd9\xc0\x4e\xd3\x28\x3f\xae\xec\x50\xaa\xa9\x9a\x27\xde\xed\xf9\x3c\x38\x08\xdb\x7e\x78\x10\x2e\xda\xfe\x3c\x08\xc3\x5e\xdb\xbf\xf7\xe0\xfe\x62\xd1\x54\x37\xef\x06\x0f\x8f\x4e\xdf\x5c\xbc\x9b\xbf\x5f\x7c\xf8\x54\x16\x4e\x08\xaa\xe0\xce\x46\x45\x3f\xff\xfc\x76\x6f\x87\x59\x9a\xd8\xf4\xbc\x3c\x3a\x19\x86\x61\x2a\xd0\x59\x41\x5d\xe3\x9f\x82\x61\x98\x9a\xf3\x83\x4f\xee\x93\x97\xa5\x05\xed\x97\xe4\x46\x65\x8e\x47\xf6\xc9\x7d\xf2\xb2\xb4\xa0\x71\x92\xdc\x20\x2b\xb2\x47\xf6\xc9\x7d\xf2\x72\xad\xed\x11\xb4\xf8\x10\xf4\xc9\x82\x56\x28\xc2\x23\xfb\xe4\x3e\xd7\x91\x61\xea\xd8\x2a\xe5\xad\xae\xff\x4f\x7e\xbf\xfe\x67\x18\xe6\x7f\xbe\xce\x37\x0e\x87\x87\x83\x3f\xdc\x26\x90\xd4\x08\x9c\x56\x1a\x83\x6a\x23\x50\x6d\x02\x64\x79\xb3\xf0\x56\xe5\x7d\x6e\x04\x18\xe6\x2f\xe3\x47\x00\x00\x00\xff\xff\x27\xc4\x76\x81\x04\x18\x00\x00"),
},
"/static/vendor/bootstrap-4.1.3": &vfsgen۰DirInfo{
name: "bootstrap-4.1.3",
modTime: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
Expand Down Expand Up @@ -489,7 +475,6 @@ var Assets = func() http.FileSystem {
fs["/templates"].(os.FileInfo),
}
fs["/static"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
fs["/static/.DS_Store"].(os.FileInfo),
fs["/static/css"].(os.FileInfo),
fs["/static/img"].(os.FileInfo),
fs["/static/js"].(os.FileInfo),
Expand All @@ -506,7 +491,6 @@ var Assets = func() http.FileSystem {
fs["/static/js/targets.js"].(os.FileInfo),
}
fs["/static/vendor"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
fs["/static/vendor/.DS_Store"].(os.FileInfo),
fs["/static/vendor/bootstrap-4.1.3"].(os.FileInfo),
fs["/static/vendor/bootstrap3-typeahead"].(os.FileInfo),
fs["/static/vendor/bootstrap4-glyphicons"].(os.FileInfo),
Expand Down