Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of a /suspend endpoint to Loki Canary #1891

Merged
merged 7 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand All @@ -19,6 +20,14 @@ import (
"github.com/grafana/loki/pkg/canary/writer"
)

type canary struct {
lock sync.Mutex

writer *writer.Writer
reader *reader.Reader
comparator *comparator.Comparator
}

func main() {

lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
Expand Down Expand Up @@ -52,10 +61,30 @@ func main() {
sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

w := writer.NewWriter(os.Stdout, sentChan, *interval, *size)
r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true)
c := &canary{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could make start a function of c too no ? like stop ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering it but decided against it b/c it would have to take like 10 parameters. This seemed less gross.

startCanary := func() *canary {
c.stop()

c.lock.Lock()
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true)

return c
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}

startCanary()

http.HandleFunc("/resume", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "restarting\n")
startCanary()
})
http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "suspending\n")
c.stop()
})
http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
Expand All @@ -64,25 +93,29 @@ func main() {
}
}()

interrupt := make(chan os.Signal, 1)
terminate := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
signal.Notify(terminate, syscall.SIGTERM)

for {
select {
case <-interrupt:
_, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n")
w.Stop()
r.Stop()
c.Stop()
case <-terminate:
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
w.Stop()
r.Stop()
c.Stop()
return
}
signal.Notify(terminate, syscall.SIGTERM, os.Interrupt)

for range terminate {
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
c.stop()
return
}
}

func (c *canary) stop() {
c.lock.Lock()
defer c.lock.Unlock()

if c.writer == nil || c.reader == nil || c.comparator == nil {
return
}

c.writer.Stop()
c.reader.Stop()
c.comparator.Stop()

c.writer = nil
c.reader = nil
c.comparator = nil
}
7 changes: 7 additions & 0 deletions docs/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ determine if they are truly missing or only missing from the WebSocket. If
missing entries are not found in the direct query, the `missing_entries` counter
is incremented.

### Control

Loki Canary responds to two endpoints to allow dynamic suspending/resuming of the
canary process. This can be useful if you'd like to quickly disable or reenable the
canary. To stop or start the canary issue an HTTP GET request against the `/suspend` or
`/resume` endpoints.

## Installation

### Binary
Expand Down
14 changes: 8 additions & 6 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D
done: make(chan struct{}),
}

responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
if responseLatency == nil {
responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
}

go c.run()

Expand Down