-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloki_client.go
105 lines (84 loc) · 2.77 KB
/
loki_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"bufio"
"bytes"
"context"
"fmt"
"github.com/golang/snappy"
l "github.com/nosinovacao/floki/logproto"
t "github.com/nosinovacao/floki/types"
"io"
"net/http"
"sort"
"time"
)
type logCollection []*t.FilebeatLog
var collection = make(logCollection, 0, 1000)
func pushToLoki(streams []*l.Stream) {
req := l.PushRequest{
Streams: streams,
}
buf, err := req.Marshal() //proto.Marshal(&req)
if err != nil {
errorl.Log("msg", "unable to marshall PushRequest", "err", err)
return
}
buf = snappy.Encode(nil, buf)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
httpreq, err := http.NewRequest(http.MethodPost, lokiURL, bytes.NewReader(buf))
if err != nil {
errorl.Log("msg", "unable to create http request", "err", err)
return
}
httpreq = httpreq.WithContext(ctx)
httpreq.Header.Add("Content-Type", "application/x-protobuf")
client := http.Client{}
resp, err := client.Do(httpreq)
if err != nil {
errorl.Log("msg", "http request error", "err", err)
return
}
if resp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(resp.Body, 1024))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
errorl.Log("mng", "server returned an error", "err", err)
return
}
info.Log("msg", "sent streams to loki")
}
func sendToLoki(col logCollection) {
debug.Log("msg", "sendToLoki", "len", len(col))
body := map[string][]l.Entry{}
for _, filebeatLog := range col {
labels := fmt.Sprintf("{name=\"%s\", namespace=\"%s\", instance=\"%s\", replicaset=\"%s\"}",
filebeatLog.Kubernetes.Container.Name,
filebeatLog.Kubernetes.Namespace,
filebeatLog.Kubernetes.Node.Name,
filebeatLog.Kubernetes.Replicaset.Name)
entry := l.Entry{Timestamp: filebeatLog.Timestamp, Line: filebeatLog.JSON.Log}
if val, ok := body[labels]; ok {
body[labels] = append(val, entry)
} else {
body[labels] = []l.Entry{entry}
}
}
streams := make([]*l.Stream, 0, len(body))
for key, val := range body {
stream := &l.Stream{
Labels: key,
Entries: val,
}
sort.SliceStable(stream.Entries, func(i, j int) bool {
return stream.Entries[i].Timestamp.Before(stream.Entries[j].Timestamp)
})
streams = append(streams, stream)
}
debug.Log("msg", fmt.Sprintf("sending %d streams to the server", len(streams)))
pushToLoki(streams)
}