Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canary: make stream configurable #2259

Merged
merged 2 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func main() {

lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector")
sName := flag.String("streamname", "stream", "The stream name for this instance of loki-canary to use in the log selector")
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
Expand Down Expand Up @@ -69,7 +71,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal, *sName, *sValue)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
6 changes: 6 additions & 0 deletions docs/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@ All options:
Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s)
-size int
Size in bytes of each log line (default 100)
-streamname string
The stream name for this instance of loki-canary to use in the log selector (default "stream")
-streamvalue string
The unique stream value for this instance of loki-canary to use in the log selector (default "stdout")
-tls
Does the loki connection use TLS?
-user string
Loki username
-version
Print this builds version information
-wait duration
Duration to wait for log entries before reporting them lost (default 1m0s)
```
10 changes: 7 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Reader struct {
addr string
user string
pass string
sName string
sValue string
lName string
lVal string
conn *websocket.Conn
Expand All @@ -53,7 +55,7 @@ type Reader struct {
}

func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
address string, user string, pass string, labelName string, labelVal string) *Reader {
address string, user string, pass string, labelName string, labelVal string, streamName string, streamValue string) *Reader {
h := http.Header{}
if user != "" {
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
Expand All @@ -65,6 +67,8 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
addr: address,
user: user,
pass: pass,
sName: streamName,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
w: writer,
Expand Down Expand Up @@ -106,7 +110,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
Host: r.addr,
Path: "/api/prom/query",
RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) +
"&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) +
"&query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)) +
"&limit=1000",
}
fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())
Expand Down Expand Up @@ -206,7 +210,7 @@ func (r *Reader) closeAndReconnect() {
Scheme: scheme,
Host: r.addr,
Path: "/api/prom/tail",
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)),
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)),
}

fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)
Expand Down