diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index e166c91357224..cb75cf19f5175 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -19,6 +20,14 @@ import ( "github.com/grafana/loki/pkg/canary/writer" ) +type canary struct { + lock sync.Mutex + + writer *writer.Writer + reader *reader.Reader + comparator *comparator.Comparator +} + func main() { lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector") @@ -52,10 +61,28 @@ func main() { sentChan := make(chan time.Time) receivedChan := make(chan time.Time) - w := writer.NewWriter(os.Stdout, sentChan, *interval, *size) - r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) - c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true) + c := &canary{} + startCanary := func() { + c.stop() + + c.lock.Lock() + defer c.lock.Unlock() + + c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) + c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) + c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true) + } + + startCanary() + http.HandleFunc("/resume", func(_ http.ResponseWriter, _ *http.Request) { + _, _ = fmt.Fprintf(os.Stderr, "restarting\n") + startCanary() + }) + http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) { + _, _ = fmt.Fprintf(os.Stderr, "suspending\n") + c.stop() + }) http.Handle("/metrics", promhttp.Handler()) go func() { err := http.ListenAndServe(":"+strconv.Itoa(*port), nil) @@ -64,25 +91,29 @@ func main() { } }() - interrupt := make(chan os.Signal, 1) terminate := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - signal.Notify(terminate, syscall.SIGTERM) - - for { - select { - case <-interrupt: - _, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n") - w.Stop() - r.Stop() - c.Stop() - case <-terminate: - _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") - w.Stop() - r.Stop() - c.Stop() - return - } + signal.Notify(terminate, syscall.SIGTERM, os.Interrupt) + + for range terminate { + _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") + c.stop() + return } +} + +func (c *canary) stop() { + c.lock.Lock() + defer c.lock.Unlock() + + if c.writer == nil || c.reader == nil || c.comparator == nil { + return + } + + c.writer.Stop() + c.reader.Stop() + c.comparator.Stop() + c.writer = nil + c.reader = nil + c.comparator = nil } diff --git a/docs/operations/loki-canary.md b/docs/operations/loki-canary.md index 1ce48a3599b75..900ec297ef575 100644 --- a/docs/operations/loki-canary.md +++ b/docs/operations/loki-canary.md @@ -47,6 +47,13 @@ determine if they are truly missing or only missing from the WebSocket. If missing entries are not found in the direct query, the `missing_entries` counter is incremented. +### Control + +Loki Canary responds to two endpoints to allow dynamic suspending/resuming of the +canary process. This can be useful if you'd like to quickly disable or reenable the +canary. To stop or start the canary issue an HTTP GET request against the `/suspend` or +`/resume` endpoints. + ## Installation ### Binary diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 3d118eb0c7b71..b687ce26cd60d 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -86,12 +86,14 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D done: make(chan struct{}), } - responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "loki_canary", - Name: "response_latency", - Help: "is how long it takes for log lines to be returned from Loki in seconds.", - Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets), - }) + if responseLatency == nil { + responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki_canary", + Name: "response_latency", + Help: "is how long it takes for log lines to be returned from Loki in seconds.", + Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets), + }) + } go c.run()