From 490ac3be1ccaa52fdf97569576b1492078336b27 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 16 May 2019 16:04:56 +0530 Subject: [PATCH] Change tail response for backward compatibilty with future changes (#590) Changed it to dict with single key called "streams" and value set to list of logproto.Stream --- cmd/logcli/tail.go | 40 +++++++++++++++++++++------------------- pkg/querier/http.go | 11 ++++++++++- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/cmd/logcli/tail.go b/cmd/logcli/tail.go index 27331aa58a10..3f9a6806251a 100644 --- a/cmd/logcli/tail.go +++ b/cmd/logcli/tail.go @@ -4,9 +4,9 @@ import ( "log" "strings" - "github.com/fatih/color" + "github.com/grafana/loki/pkg/querier" - "github.com/grafana/loki/pkg/logproto" + "github.com/fatih/color" ) func tailQuery() { @@ -15,7 +15,7 @@ func tailQuery() { log.Fatalf("Tailing logs failed: %+v", err) } - stream := new(logproto.Stream) + tailReponse := new(querier.TailResponse) if len(*ignoreLabelsKey) > 0 { log.Println("Ingoring labels key:", color.RedString(strings.Join(*ignoreLabelsKey, ","))) @@ -26,36 +26,38 @@ func tailQuery() { } for { - err := conn.ReadJSON(stream) + err := conn.ReadJSON(tailReponse) if err != nil { log.Println("Error reading stream:", err) return } labels := "" - if !*noLabels { + for _, stream := range tailReponse.Streams { + if !*noLabels { - if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 { + if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 { - ls := mustParseLabels(stream.GetLabels()) + ls := mustParseLabels(stream.GetLabels()) - if len(*showLabelsKey) > 0 { - ls = ls.MatchLabels(true, *showLabelsKey...) - } + if len(*showLabelsKey) > 0 { + ls = ls.MatchLabels(true, *showLabelsKey...) + } - if len(*ignoreLabelsKey) > 0 { - ls = ls.MatchLabels(false, *ignoreLabelsKey...) - } + if len(*ignoreLabelsKey) > 0 { + ls = ls.MatchLabels(false, *ignoreLabelsKey...) + } - labels = ls.String() + labels = ls.String() - } else { + } else { - labels = stream.Labels + labels = stream.Labels + } + } + for _, entry := range stream.Entries { + printLogEntry(entry.Timestamp, labels, entry.Line) } - } - for _, entry := range stream.Entries { - printLogEntry(entry.Timestamp, labels, entry.Line) } } } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index c57b8e979659..3161ff6f619d 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -25,6 +25,11 @@ const ( defaulSince = 1 * time.Hour ) +// TailResponse represents response for tail query +type TailResponse struct { + Streams []*logproto.Stream `json:"streams"` +} + // nolint func intParam(values url.Values, name string, def int) (int, error) { value := values.Get(name) @@ -161,13 +166,17 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { // heap until connection to websocket stays open queryRequest := *queryRequestPtr itr := q.tailQuery(r.Context(), &queryRequest) + stream := logproto.Stream{} + tailResponse := TailResponse{[]*logproto.Stream{ + &stream, + }} for itr.Next() { stream.Entries = []logproto.Entry{itr.Entry()} stream.Labels = itr.Labels() - err := conn.WriteJSON(stream) + err := conn.WriteJSON(tailResponse) if err != nil { level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err)) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {