diff --git a/.gitignore b/.gitignore index 5bdb6748f0c2..a9a40e0fb4fe 100644 --- a/.gitignore +++ b/.gitignore @@ -11,8 +11,10 @@ cmd/promtail/promtail cmd/loki/loki-debug cmd/promtail/promtail-debug cmd/docker-driver/docker-driver +cmd/loki-canary/loki-canary /loki /promtail /logcli +/loki-canary dlv -rootfs/ \ No newline at end of file +rootfs/ diff --git a/README.md b/README.md index 03dfde547955..b6fb36c4e4be 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ Once you have promtail, Loki, and Grafana running, continue with [our usage docs - [Promtail](./docs/promtail.md) is an agent which can tail your log files and push them to Loki. - [Docker Logging Driver](./cmd/docker-driver/README.md) is a docker plugin to send logs directly to Loki from Docker containers. - [Logcli](./docs/logcli.md) on how to query your logs without Grafana. +- [Loki Canary](./docs/canary/README.md) for monitoring your Loki installation for missing logs. - [Troubleshooting](./docs/troubleshooting.md) for help around frequent error messages. - [Usage](./docs/usage.md) for how to set up a Loki datasource in Grafana and query your logs. diff --git a/cmd/loki-canary/Dockerfile b/cmd/loki-canary/Dockerfile new file mode 100644 index 000000000000..432be7d6ccff --- /dev/null +++ b/cmd/loki-canary/Dockerfile @@ -0,0 +1,4 @@ +FROM alpine:3.9 +RUN apk add --update --no-cache ca-certificates +ADD loki-canary /usr/bin +ENTRYPOINT [ "/usr/bin/loki-canary" ] diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go new file mode 100644 index 000000000000..a289a1631165 --- /dev/null +++ b/cmd/loki-canary/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/grafana/loki/pkg/canary/comparator" + "github.com/grafana/loki/pkg/canary/reader" + "github.com/grafana/loki/pkg/canary/writer" +) + +func main() { + + lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector") + lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector") + port := flag.Int("port", 3500, "Port which loki-canary should expose metrics") + addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100") + tls := flag.Bool("tls", false, "Does the loki connection use TLS?") + user := flag.String("user", "", "Loki username") + pass := flag.String("pass", "", "Loki password") + + interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") + size := flag.Int("size", 100, "Size in bytes of each log line") + wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost") + pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki") + buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram") + flag.Parse() + + if *addr == "" { + _, _ = fmt.Fprintf(os.Stderr, "Must specify a Loki address with -addr\n") + os.Exit(1) + } + + sentChan := make(chan time.Time) + receivedChan := make(chan time.Time) + + w := writer.NewWriter(os.Stdout, sentChan, *interval, *size) + r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) + c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r) + + http.Handle("/metrics", promhttp.Handler()) + go func() { + err := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if err != nil { + panic(err) + } + }() + + interrupt := make(chan os.Signal, 1) + terminate := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + signal.Notify(terminate, syscall.SIGTERM) + + for { + select { + case <-interrupt: + _, _ = fmt.Fprintf(os.Stderr, "suspending indefinetely\n") + w.Stop() + r.Stop() + c.Stop() + case <-terminate: + _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") + w.Stop() + r.Stop() + c.Stop() + return + } + } + +} diff --git a/docs/canary/README.md b/docs/canary/README.md new file mode 100644 index 000000000000..45144399b6c8 --- /dev/null +++ b/docs/canary/README.md @@ -0,0 +1,108 @@ + +# loki-canary + +A standalone app to audit the log capturing performance of Loki. + +## how it works + +![block_diagram](block.png) + +loki-canary writes a log to a file and stores the timestamp in an internal array, the contents look something like this: + +```nohighlight +1557935669096040040 ppppppppppppppppppppppppppppppppppppppppppppppppppppppppppp +``` + +The relevant part is the timestamp, the `p`'s are just filler bytes to make the size of the log configurable. + +Promtail (or another agent) then reads the log file and ships it to Loki. + +Meanwhile loki-canary opens a websocket connection to loki and listens for logs it creates + +When a log is received on the websocket, the timestamp in the log message is compared to the internal array. + +If the received log is: + + * The next in the array to be received, it is removed from the array and the (current time - log timestamp) is recorded in the `response_latency` histogram, this is the expected behavior for well behaving logs + * Not the next in the array received, is is removed from the array, the response time is recorded in the `response_latency` histogram, and the `out_of_order_entries` counter is incremented + * Not in the array at all, it is checked against a separate list of received logs to either increment the `duplicate_entries` counter or the `unexpected_entries` counter. + +In the background, loki-canary also runs a timer which iterates through all the entries in the internal array, if any are older than the duration specified by the `-wait` flag (default 60s), they are removed from the array and the `websocket_missing_entries` counter is incremented. Then an additional query is made directly to loki for these missing entries to determine if they were actually missing or just didn't make it down the websocket. If they are not found in the followup query the `missing_entries` counter is incremented. + +## building and running + +`make` will run tests and build a docker image + +`make build` will create a binary `loki-canary` alongside the makefile + +To run the image, you can do something simple like: + +`kubectl run loki-canary --generator=run-pod/v1 --image=grafana/loki-canary:latest --restart=Never --image-pull-policy=Never --labels=name=loki-canary -- -addr=loki:3100` + +Or you can do something more complex like deploy it as a daemonset, there is a ksonnet setup for this in the `production` folder, you can import it using jsonnet-bundler: + +```shell +jb install github.com/grafana/loki-canary/production/ksonnet/loki-canary +``` + +Then in your ksonnet environments `main.jsonnet` you'll want something like this: + +```nohighlight +local loki_canary = import 'loki-canary/loki-canary.libsonnet'; + +loki_canary { + loki_canary_args+:: { + addr: "loki:3100", + port: 80, + labelname: "instance", + interval: "100ms", + size: 1024, + wait: "3m", + }, + _config+:: { + namespace: "default", + } +} + +``` + +## config + +It is required to pass in the Loki address with the `-addr` flag, if your server uses TLS, also pass `-tls=true` (this will create a wss:// instead of ws:// connection) + +You should also pass the `-labelname` and `-labelvalue` flags, these are used by loki-canary to filter the log stream to only process logs for this instance of loki-canary, so they must be unique per each of your loki-canary instances. The ksonnet config in this project accomplishes this by passing in the pod name as the labelvalue + +If you get a high number of `unexpected_entries` you may not be waiting long enough and should increase `-wait` from 60s to something larger. + +__Be cognizant__ of the relationship between `pruneinterval` and the `interval`. For example, with an interval of 10ms (100 logs per second) and a prune interval of 60s, you will write 6000 logs per minute, if those logs were not received over the websocket, the canary will attempt to query loki directly to see if they are completely lost. __However__ the query return is limited to 1000 results so you will not be able to return all the logs even if they did make it to Loki. + +__Likewise__, if you lower the `pruneinterval` you risk causing a denial of service attack as all your canaries attempt to query for missing logs at whatever your `pruneinterval` is defined at. + +All options: + +```nohighlight + -addr string + The Loki server URL:Port, e.g. loki:3100 + -buckets int + Number of buckets in the response_latency histogram (default 10) + -interval duration + Duration between log entries (default 1s) + -labelname string + The label name for this instance of loki-canary to use in the log selector (default "name") + -labelvalue string + The unique label value for this instance of loki-canary to use in the log selector (default "loki-canary") + -pass string + Loki password + -port int + Port which loki-canary should expose metrics (default 3500) + -pruneinterval duration + Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s) + -size int + Size in bytes of each log line (default 100) + -tls + Does the loki connection use TLS? + -user string + Loki username + -wait duration + Duration to wait for log entries before reporting them lost (default 1m0s) +``` diff --git a/docs/canary/block.png b/docs/canary/block.png new file mode 100644 index 000000000000..f7dd39047bed Binary files /dev/null and b/docs/canary/block.png differ diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go new file mode 100644 index 000000000000..caa9135674e4 --- /dev/null +++ b/pkg/canary/comparator/comparator.go @@ -0,0 +1,273 @@ +package comparator + +import ( + "fmt" + "io" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/pkg/canary/reader" +) + +const ( + ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" + ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" + ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" + ErrDuplicateEntry = "received a duplicate entry for ts %v\n" + ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" +) + +var ( + totalEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "total_entries", + Help: "counts log entries written to the file", + }) + outOfOrderEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "out_of_order_entries", + Help: "counts log entries received with a timestamp more recent than the others in the queue", + }) + wsMissingEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "websocket_missing_entries", + Help: "counts log entries not received within the maxWait duration via the websocket connection", + }) + missingEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "missing_entries", + Help: "counts log entries not received within the maxWait duration via both websocket and direct query", + }) + unexpectedEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "unexpected_entries", + Help: "counts a log entry received which was not expected (e.g. received after reported missing)", + }) + duplicateEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "duplicate_entries", + Help: "counts a log entry received more than one time", + }) + responseLatency prometheus.Histogram +) + +type Comparator struct { + entMtx sync.Mutex + w io.Writer + entries []*time.Time + ackdEntries []*time.Time + maxWait time.Duration + pruneInterval time.Duration + sent chan time.Time + recv chan time.Time + rdr reader.LokiReader + quit chan struct{} + done chan struct{} +} + +func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.Duration, + buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader) *Comparator { + c := &Comparator{ + w: writer, + entries: []*time.Time{}, + maxWait: maxWait, + pruneInterval: pruneInterval, + sent: sentChan, + recv: receivedChan, + rdr: reader, + quit: make(chan struct{}), + done: make(chan struct{}), + } + + responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki_canary", + Name: "response_latency", + Help: "is how long it takes for log lines to be returned from Loki in seconds.", + Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets), + }) + + go c.run() + + return c +} + +func (c *Comparator) Stop() { + if c.quit != nil { + close(c.quit) + <-c.done + c.quit = nil + } +} + +func (c *Comparator) entrySent(time time.Time) { + c.entMtx.Lock() + defer c.entMtx.Unlock() + c.entries = append(c.entries, &time) + totalEntries.Inc() +} + +// entryReceived removes the received entry from the buffer if it exists, reports on out of order entries received +func (c *Comparator) entryReceived(ts time.Time) { + c.entMtx.Lock() + defer c.entMtx.Unlock() + + // Output index + k := 0 + matched := false + for i, e := range c.entries { + if ts.Equal(*e) { + matched = true + // If this isn't the first item in the list we received it out of order + if i != 0 { + outOfOrderEntries.Inc() + _, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) + } + responseLatency.Observe(time.Since(ts).Seconds()) + // Put this element in the acknowledged entries list so we can use it to check for duplicates + c.ackdEntries = append(c.ackdEntries, c.entries[i]) + // Do not increment output index, effectively causing this element to be dropped + } else { + // If the current index doesn't match the output index, update the array with the correct position + if i != k { + c.entries[k] = c.entries[i] + } + k++ + } + } + if !matched { + duplicate := false + for _, e := range c.ackdEntries { + if ts.Equal(*e) { + duplicate = true + duplicateEntries.Inc() + _, _ = fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano()) + break + } + } + if !duplicate { + _, _ = fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano()) + unexpectedEntries.Inc() + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(c.entries); i++ { + c.entries[i] = nil // or the zero value of T + } + c.entries = c.entries[:k] +} + +func (c *Comparator) Size() int { + return len(c.entries) +} + +func (c *Comparator) run() { + t := time.NewTicker(c.pruneInterval) + defer func() { + t.Stop() + close(c.done) + }() + + for { + select { + case e := <-c.recv: + c.entryReceived(e) + case e := <-c.sent: + c.entrySent(e) + case <-t.C: + c.pruneEntries() + case <-c.quit: + return + } + } +} + +func (c *Comparator) pruneEntries() { + c.entMtx.Lock() + defer c.entMtx.Unlock() + + missing := []*time.Time{} + k := 0 + for i, e := range c.entries { + // If the time is outside our range, assume the entry has been lost report and remove it + if e.Before(time.Now().Add(-c.maxWait)) { + missing = append(missing, e) + wsMissingEntries.Inc() + _, _ = fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) + } else { + if i != k { + c.entries[k] = c.entries[i] + } + k++ + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(c.entries); i++ { + c.entries[i] = nil // or the zero value of T + } + c.entries = c.entries[:k] + if len(missing) > 0 { + go c.confirmMissing(missing) + } + + // Prune the acknowledged list, remove anything older than our maxwait + k = 0 + for i, e := range c.ackdEntries { + if e.Before(time.Now().Add(-c.maxWait)) { + // Do nothing, if we don't increment the output index k, this will be dropped + } else { + if i != k { + c.ackdEntries[k] = c.ackdEntries[i] + } + k++ + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(c.ackdEntries); i++ { + c.ackdEntries[i] = nil // or the zero value of T + } + c.ackdEntries = c.ackdEntries[:k] +} + +func (c *Comparator) confirmMissing(missing []*time.Time) { + // Because we are querying loki timestamps vs the timestamp in the log, + // make the range +/- 10 seconds to allow for clock inaccuracies + start := *missing[0] + start = start.Add(-10 * time.Second) + end := *missing[len(missing)-1] + end = end.Add(10 * time.Second) + recvd, err := c.rdr.Query(start, end) + if err != nil { + _, _ = fmt.Fprintf(c.w, "error querying loki: %s", err) + return + } + k := 0 + for i, m := range missing { + found := false + for _, r := range recvd { + if (*m).Equal(r) { + // Entry was found in loki, this can be dropped from the list of missing + // which is done by NOT incrementing the output index k + found = true + } + } + if !found { + // Item is still missing + if i != k { + missing[k] = missing[i] + } + k++ + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(missing); i++ { + missing[i] = nil // or the zero value of T + } + missing = missing[:k] + for _, e := range missing { + missingEntries.Inc() + _, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) + } +} diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go new file mode 100644 index 000000000000..452f39b2114c --- /dev/null +++ b/pkg/canary/comparator/comparator_test.go @@ -0,0 +1,276 @@ +package comparator + +import ( + "bytes" + "fmt" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" +) + +func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { + outOfOrderEntries = &mockCounter{} + wsMissingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + duplicateEntries = &mockCounter{} + + actual := &bytes.Buffer{} + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil) + + t1 := time.Now() + t2 := t1.Add(1 * time.Second) + t3 := t2.Add(1 * time.Second) + t4 := t3.Add(1 * time.Second) + + c.entrySent(t1) + c.entrySent(t2) + c.entrySent(t3) + c.entrySent(t4) + + c.entryReceived(t1) + assert.Equal(t, 3, c.Size()) + c.entryReceived(t4) + assert.Equal(t, 2, c.Size()) + c.entryReceived(t2) + c.entryReceived(t3) + assert.Equal(t, 0, c.Size()) + + expected := fmt.Sprintf(ErrOutOfOrderEntry, t4, []time.Time{t2, t3}) + assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 1, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 0, wsMissingEntries.(*mockCounter).count) + assert.Equal(t, 0, duplicateEntries.(*mockCounter).count) + + // This avoids a panic on subsequent test execution, + // seems ugly but was easy, and multiple instantiations + // of the comparator should be an error + prometheus.Unregister(responseLatency) +} + +func TestComparatorEntryReceivedNotExpected(t *testing.T) { + outOfOrderEntries = &mockCounter{} + wsMissingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + duplicateEntries = &mockCounter{} + + actual := &bytes.Buffer{} + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil) + + t1 := time.Now() + t2 := t1.Add(1 * time.Second) + t3 := t2.Add(1 * time.Second) + t4 := t3.Add(1 * time.Second) + + c.entrySent(t2) + c.entrySent(t3) + c.entrySent(t4) + + c.entryReceived(t2) + assert.Equal(t, 2, c.Size()) + c.entryReceived(t1) + assert.Equal(t, 2, c.Size()) + c.entryReceived(t3) + assert.Equal(t, 1, c.Size()) + c.entryReceived(t4) + assert.Equal(t, 0, c.Size()) + + expected := fmt.Sprintf(ErrUnexpectedEntry, t1.UnixNano()) + assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 1, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 0, wsMissingEntries.(*mockCounter).count) + assert.Equal(t, 0, duplicateEntries.(*mockCounter).count) + + // This avoids a panic on subsequent test execution, + // seems ugly but was easy, and multiple instantiations + // of the comparator should be an error + prometheus.Unregister(responseLatency) +} + +func TestComparatorEntryReceivedDuplicate(t *testing.T) { + outOfOrderEntries = &mockCounter{} + wsMissingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + duplicateEntries = &mockCounter{} + + actual := &bytes.Buffer{} + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil) + + t1 := time.Now() + t2 := t1.Add(1 * time.Second) + t3 := t2.Add(1 * time.Second) + t4 := t3.Add(1 * time.Second) + + c.entrySent(t1) + c.entrySent(t2) + c.entrySent(t3) + c.entrySent(t4) + + c.entryReceived(t1) + assert.Equal(t, 3, c.Size()) + c.entryReceived(t2) + assert.Equal(t, 2, c.Size()) + c.entryReceived(t2) + assert.Equal(t, 2, c.Size()) + c.entryReceived(t3) + assert.Equal(t, 1, c.Size()) + c.entryReceived(t4) + assert.Equal(t, 0, c.Size()) + + expected := fmt.Sprintf(ErrDuplicateEntry, t2.UnixNano()) + assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 0, wsMissingEntries.(*mockCounter).count) + assert.Equal(t, 1, duplicateEntries.(*mockCounter).count) + + // This avoids a panic on subsequent test execution, + // seems ugly but was easy, and multiple instantiations + // of the comparator should be an error + prometheus.Unregister(responseLatency) +} + +func TestEntryNeverReceived(t *testing.T) { + outOfOrderEntries = &mockCounter{} + wsMissingEntries = &mockCounter{} + missingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + duplicateEntries = &mockCounter{} + + actual := &bytes.Buffer{} + + t1 := time.Now() + t2 := t1.Add(1 * time.Millisecond) + t3 := t2.Add(1 * time.Millisecond) + t4 := t3.Add(1 * time.Millisecond) + t5 := t4.Add(1 * time.Millisecond) + + found := []time.Time{t1, t3, t4, t5} + + mr := &mockReader{found} + maxWait := 5 * time.Millisecond + c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr) + + c.entrySent(t1) + c.entrySent(t2) + c.entrySent(t3) + c.entrySent(t4) + c.entrySent(t5) + + assert.Equal(t, 5, c.Size()) + + c.entryReceived(t1) + c.entryReceived(t3) + c.entryReceived(t5) + + assert.Equal(t, 2, c.Size()) + + //Wait a few maxWait intervals just to make sure all entries are expired and the async confirmMissing has completed + <-time.After(2 * maxWait) + + expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceived+ErrEntryNotReceivedWs, + t3, []time.Time{t2}, + t5, []time.Time{t2, t4}, + t2.UnixNano(), maxWait.Seconds(), + t2.UnixNano(), maxWait.Seconds(), + t4.UnixNano(), maxWait.Seconds()) + + assert.Equal(t, expected, actual.String()) + assert.Equal(t, 0, c.Size()) + + assert.Equal(t, 2, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 2, wsMissingEntries.(*mockCounter).count) + assert.Equal(t, 1, missingEntries.(*mockCounter).count) + assert.Equal(t, 0, duplicateEntries.(*mockCounter).count) + + // This avoids a panic on subsequent test execution, + // seems ugly but was easy, and multiple instantiations + // of the comparator should be an error + prometheus.Unregister(responseLatency) + +} + +func TestPruneAckdEntires(t *testing.T) { + actual := &bytes.Buffer{} + maxWait := 30 * time.Millisecond + c := NewComparator(actual, maxWait, 10*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), nil) + + t1 := time.Now() + t2 := t1.Add(1 * time.Millisecond) + t3 := t2.Add(1 * time.Millisecond) + t4 := t3.Add(100 * time.Millisecond) + + assert.Equal(t, 0, len(c.ackdEntries)) + + c.entrySent(t1) + c.entrySent(t2) + c.entrySent(t3) + c.entrySent(t4) + + assert.Equal(t, 4, c.Size()) + assert.Equal(t, 0, len(c.ackdEntries)) + + c.entryReceived(t1) + c.entryReceived(t2) + c.entryReceived(t3) + c.entryReceived(t4) + + assert.Equal(t, 0, c.Size()) + assert.Equal(t, 4, len(c.ackdEntries)) + + // Wait a couple maxWaits to make sure the first 3 timestamps get pruned from the ackdEntries, + // the fourth should still remain because it was 100ms newer than t3 + <-time.After(2 * maxWait) + + assert.Equal(t, 1, len(c.ackdEntries)) + assert.Equal(t, t4, *c.ackdEntries[0]) + +} + +type mockCounter struct { + cLck sync.Mutex + count int +} + +func (m *mockCounter) Desc() *prometheus.Desc { + panic("implement me") +} + +func (m *mockCounter) Write(*io_prometheus_client.Metric) error { + panic("implement me") +} + +func (m *mockCounter) Describe(chan<- *prometheus.Desc) { + panic("implement me") +} + +func (m *mockCounter) Collect(chan<- prometheus.Metric) { + panic("implement me") +} + +func (m *mockCounter) Add(float64) { + panic("implement me") +} + +func (m *mockCounter) Inc() { + m.cLck.Lock() + defer m.cLck.Unlock() + m.count++ +} + +type mockReader struct { + resp []time.Time +} + +func (r *mockReader) Query(start time.Time, end time.Time) ([]time.Time, error) { + return r.resp, nil +} diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go new file mode 100644 index 000000000000..f02ec485764e --- /dev/null +++ b/pkg/canary/reader/reader.go @@ -0,0 +1,231 @@ +package reader + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/querier" +) + +var ( + reconnects = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "ws_reconnects", + Help: "counts every time the websocket connection has to reconnect", + }) +) + +type LokiReader interface { + Query(start time.Time, end time.Time) ([]time.Time, error) +} + +type Reader struct { + header http.Header + tls bool + addr string + user string + pass string + lName string + lVal string + conn *websocket.Conn + w io.Writer + recv chan time.Time + quit chan struct{} + shuttingDown bool + done chan struct{} +} + +func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, + address string, user string, pass string, labelName string, labelVal string) *Reader { + h := http.Header{} + if user != "" { + h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}} + } + + rd := Reader{ + header: h, + tls: tls, + addr: address, + user: user, + pass: pass, + lName: labelName, + lVal: labelVal, + w: writer, + recv: receivedChan, + quit: make(chan struct{}), + done: make(chan struct{}), + shuttingDown: false, + } + + go rd.run() + + go func() { + <-rd.quit + if rd.conn != nil { + _, _ = fmt.Fprintf(rd.w, "shutting down reader\n") + rd.shuttingDown = true + _ = rd.conn.Close() + } + }() + + return &rd +} + +func (r *Reader) Stop() { + if r.quit != nil { + close(r.quit) + <-r.done + r.quit = nil + } +} + +func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { + scheme := "http" + if r.tls { + scheme = "https" + } + u := url.URL{ + Scheme: scheme, + Host: r.addr, + Path: "/api/prom/query", + RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) + + "&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) + + "&limit=1000", + } + _, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) + + client := &http.Client{} + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + + req.SetBasicAuth(r.user, r.pass) + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.Println("error closing body", err) + } + }() + + if resp.StatusCode/100 != 2 { + buf, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf("error response from server: %s (%v)", string(buf), err) + } + var decoded logproto.QueryResponse + err = json.NewDecoder(resp.Body).Decode(&decoded) + if err != nil { + return nil, err + } + + tss := []time.Time{} + + for _, stream := range decoded.Streams { + for _, entry := range stream.Entries { + ts, err := parseResponse(&entry) + if err != nil { + _, _ = fmt.Fprint(r.w, err) + continue + } + tss = append(tss, *ts) + } + + } + + return tss, nil +} + +func (r *Reader) run() { + + r.closeAndReconnect() + + tailResponse := &querier.TailResponse{} + + for { + err := r.conn.ReadJSON(tailResponse) + if err != nil { + if r.shuttingDown { + close(r.done) + return + } + _, _ = fmt.Fprintf(r.w, "error reading websocket: %s\n", err) + r.closeAndReconnect() + continue + } + for _, stream := range tailResponse.Streams { + for _, entry := range stream.Entries { + ts, err := parseResponse(&entry) + if err != nil { + _, _ = fmt.Fprint(r.w, err) + continue + } + r.recv <- *ts + } + } + } +} + +func (r *Reader) closeAndReconnect() { + if r.conn != nil { + _ = r.conn.Close() + r.conn = nil + // By incrementing reconnects here we should only count a failure followed by a successful reconnect. + // Initial connections and reconnections from failed tries will not be counted. + reconnects.Inc() + } + for r.conn == nil { + scheme := "ws" + if r.tls { + scheme = "wss" + } + u := url.URL{ + Scheme: scheme, + Host: r.addr, + Path: "/api/prom/tail", + RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)), + } + + _, _ = fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header) + if err != nil { + _, _ = fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err) + <-time.After(5 * time.Second) + continue + } + r.conn = c + } +} + +func parseResponse(entry *logproto.Entry) (*time.Time, error) { + sp := strings.Split(entry.Line, " ") + if len(sp) != 2 { + return nil, errors.Errorf("received invalid entry: %s\n", entry.Line) + } + ts, err := strconv.ParseInt(sp[0], 10, 64) + if err != nil { + return nil, errors.Errorf("failed to parse timestamp: %s\n", sp[0]) + } + t := time.Unix(0, ts) + return &t, nil +} diff --git a/pkg/canary/writer/writer.go b/pkg/canary/writer/writer.go new file mode 100644 index 000000000000..8d9b163a808b --- /dev/null +++ b/pkg/canary/writer/writer.go @@ -0,0 +1,82 @@ +package writer + +import ( + "fmt" + "io" + "strconv" + "strings" + "time" +) + +const ( + LogEntry = "%s %s\n" +) + +type Writer struct { + w io.Writer + sent chan time.Time + interval time.Duration + size int + prevTsLen int + pad string + quit chan struct{} + done chan struct{} +} + +func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval time.Duration, entrySize int) *Writer { + + w := &Writer{ + w: writer, + sent: sentChan, + interval: entryInterval, + size: entrySize, + prevTsLen: 0, + quit: make(chan struct{}), + done: make(chan struct{}), + } + + go w.run() + + return w +} + +func (w *Writer) Stop() { + if w.quit != nil { + close(w.quit) + <-w.done + w.quit = nil + } +} + +func (w *Writer) run() { + t := time.NewTicker(w.interval) + defer func() { + t.Stop() + close(w.done) + }() + for { + select { + case <-t.C: + t := time.Now() + ts := strconv.FormatInt(t.UnixNano(), 10) + tsLen := len(ts) + + // I guess some day this could happen???? + if w.prevTsLen != tsLen { + var str strings.Builder + // Total line length includes timestamp, white space separator, new line char. Subtract those out + for str.Len() < w.size-tsLen-2 { + str.WriteString("p") + } + w.pad = str.String() + w.prevTsLen = tsLen + } + + _, _ = fmt.Fprintf(w.w, LogEntry, ts, w.pad) + w.sent <- t + case <-w.quit: + return + } + } + +} diff --git a/production/ksonnet/loki-canary/config.libsonnet b/production/ksonnet/loki-canary/config.libsonnet new file mode 100644 index 000000000000..d1043c450e2d --- /dev/null +++ b/production/ksonnet/loki-canary/config.libsonnet @@ -0,0 +1,5 @@ +{ + _images+:: { + loki_canary: 'grafana/loki-canary:latest', + }, +} \ No newline at end of file diff --git a/production/ksonnet/loki-canary/jsonnetfile.json b/production/ksonnet/loki-canary/jsonnetfile.json new file mode 100644 index 000000000000..c903ac17c07c --- /dev/null +++ b/production/ksonnet/loki-canary/jsonnetfile.json @@ -0,0 +1,14 @@ +{ + "dependencies": [ + { + "name": "ksonnet-util", + "source": { + "git": { + "remote": "https://github.com/grafana/jsonnet-libs", + "subdir": "ksonnet-util" + } + }, + "version": "master" + } + ] +} diff --git a/production/ksonnet/loki-canary/loki-canary.libsonnet b/production/ksonnet/loki-canary/loki-canary.libsonnet new file mode 100644 index 000000000000..a07c920f6d2a --- /dev/null +++ b/production/ksonnet/loki-canary/loki-canary.libsonnet @@ -0,0 +1,27 @@ +local k = import 'ksonnet-util/kausal.libsonnet'; +local config = import 'config.libsonnet'; + +k + config { + namespace: $.core.v1.namespace.new($._config.namespace), + + local container = $.core.v1.container, + + loki_canary_args:: { + labelvalue: "$(POD_NAME)", + }, + + loki_canary_container:: + container.new('loki-canary', $._images.loki_canary) + + $.util.resourcesRequests('10m', '20Mi') + + container.withPorts($.core.v1.containerPort.new('http-metrics', 80)) + + container.withArgsMixin($.util.mapToFlags($.loki_canary_args)) + + container.withEnv([ + container.envType.fromFieldPath('HOSTNAME', 'spec.nodeName'), + container.envType.fromFieldPath('POD_NAME', 'metadata.name'), + ]), + + local daemonSet = $.extensions.v1beta1.daemonSet, + + loki_canary_daemonset: + daemonSet.new('loki-canary', [$.loki_canary_container]), +}