Skip to content

Commit

Permalink
Change tail response for backward compatibilty with future changes (g…
Browse files Browse the repository at this point in the history
…rafana#590)

Changed it to dict with single key called "streams" and value set to list of logproto.Stream
  • Loading branch information
sandeepsukhani authored and slim-bean committed May 31, 2019
1 parent 2e962ce commit 490ac3b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
40 changes: 21 additions & 19 deletions cmd/logcli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"log"
"strings"

"github.com/fatih/color"
"github.com/grafana/loki/pkg/querier"

"github.com/grafana/loki/pkg/logproto"
"github.com/fatih/color"
)

func tailQuery() {
Expand All @@ -15,7 +15,7 @@ func tailQuery() {
log.Fatalf("Tailing logs failed: %+v", err)
}

stream := new(logproto.Stream)
tailReponse := new(querier.TailResponse)

if len(*ignoreLabelsKey) > 0 {
log.Println("Ingoring labels key:", color.RedString(strings.Join(*ignoreLabelsKey, ",")))
Expand All @@ -26,36 +26,38 @@ func tailQuery() {
}

for {
err := conn.ReadJSON(stream)
err := conn.ReadJSON(tailReponse)
if err != nil {
log.Println("Error reading stream:", err)
return
}

labels := ""
if !*noLabels {
for _, stream := range tailReponse.Streams {
if !*noLabels {

if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 {
if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 {

ls := mustParseLabels(stream.GetLabels())
ls := mustParseLabels(stream.GetLabels())

if len(*showLabelsKey) > 0 {
ls = ls.MatchLabels(true, *showLabelsKey...)
}
if len(*showLabelsKey) > 0 {
ls = ls.MatchLabels(true, *showLabelsKey...)
}

if len(*ignoreLabelsKey) > 0 {
ls = ls.MatchLabels(false, *ignoreLabelsKey...)
}
if len(*ignoreLabelsKey) > 0 {
ls = ls.MatchLabels(false, *ignoreLabelsKey...)
}

labels = ls.String()
labels = ls.String()

} else {
} else {

labels = stream.Labels
labels = stream.Labels
}
}
for _, entry := range stream.Entries {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
for _, entry := range stream.Entries {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
}
11 changes: 10 additions & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
defaulSince = 1 * time.Hour
)

// TailResponse represents response for tail query
type TailResponse struct {
Streams []*logproto.Stream `json:"streams"`
}

// nolint
func intParam(values url.Values, name string, def int) (int, error) {
value := values.Get(name)
Expand Down Expand Up @@ -161,13 +166,17 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
// heap until connection to websocket stays open
queryRequest := *queryRequestPtr
itr := q.tailQuery(r.Context(), &queryRequest)

stream := logproto.Stream{}
tailResponse := TailResponse{[]*logproto.Stream{
&stream,
}}

for itr.Next() {
stream.Entries = []logproto.Entry{itr.Entry()}
stream.Labels = itr.Labels()

err := conn.WriteJSON(stream)
err := conn.WriteJSON(tailResponse)
if err != nil {
level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err))
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
Expand Down

0 comments on commit 490ac3b

Please sign in to comment.