Skip to content

Commit

Permalink
Change tail response for backward compatibilty with future changes
Browse files Browse the repository at this point in the history
Changed it to dict with single key called "streams" and value set to list of logproto.Stream
  • Loading branch information
sandeepsukhani committed May 16, 2019
1 parent 7942493 commit 5669c2c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
40 changes: 21 additions & 19 deletions cmd/logcli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"log"
"strings"

"github.com/fatih/color"
"github.com/grafana/loki/pkg/querier"

"github.com/grafana/loki/pkg/logproto"
"github.com/fatih/color"
)

func tailQuery() {
Expand All @@ -15,7 +15,7 @@ func tailQuery() {
log.Fatalf("Tailing logs failed: %+v", err)
}

stream := new(logproto.Stream)
tailReponse := new(querier.TailResponse)

if len(*ignoreLabelsKey) > 0 {
log.Println("Ingoring labels key:", color.RedString(strings.Join(*ignoreLabelsKey, ",")))
Expand All @@ -26,36 +26,38 @@ func tailQuery() {
}

for {
err := conn.ReadJSON(stream)
err := conn.ReadJSON(tailReponse)
if err != nil {
log.Println("Error reading stream:", err)
return
}

labels := ""
if !*noLabels {
for _, stream := range tailReponse.Streams {
if !*noLabels {

if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 {
if len(*ignoreLabelsKey) > 0 || len(*showLabelsKey) > 0 {

ls := mustParseLabels(stream.GetLabels())
ls := mustParseLabels(stream.GetLabels())

if len(*showLabelsKey) > 0 {
ls = ls.MatchLabels(true, *showLabelsKey...)
}
if len(*showLabelsKey) > 0 {
ls = ls.MatchLabels(true, *showLabelsKey...)
}

if len(*ignoreLabelsKey) > 0 {
ls = ls.MatchLabels(false, *ignoreLabelsKey...)
}
if len(*ignoreLabelsKey) > 0 {
ls = ls.MatchLabels(false, *ignoreLabelsKey...)
}

labels = ls.String()
labels = ls.String()

} else {
} else {

labels = stream.Labels
labels = stream.Labels
}
}
for _, entry := range stream.Entries {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
for _, entry := range stream.Entries {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
}
11 changes: 10 additions & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
defaulSince = 1 * time.Hour
)

// TailResponse represents response for tail query
type TailResponse struct {
Streams []*logproto.Stream `json:"streams"`
}

// nolint
func intParam(values url.Values, name string, def int) (int, error) {
value := values.Get(name)
Expand Down Expand Up @@ -161,13 +166,17 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
// heap until connection to websocket stays open
queryRequest := *queryRequestPtr
itr := q.tailQuery(r.Context(), &queryRequest)

stream := logproto.Stream{}
tailResponse := TailResponse{[]*logproto.Stream{
&stream,
}}

for itr.Next() {
stream.Entries = []logproto.Entry{itr.Entry()}
stream.Labels = itr.Labels()

err := conn.WriteJSON(stream)
err := conn.WriteJSON(tailResponse)
if err != nil {
level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err))
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
Expand Down

0 comments on commit 5669c2c

Please sign in to comment.