Skip to content

Commit

Permalink
Canary: make stream configurable (#2259)
Browse files Browse the repository at this point in the history
* tried to fix canary in non kubernetes env

* update doc
  • Loading branch information
ombre8 authored Jun 26, 2020
1 parent 1e19aa4 commit 17390fd
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
4 changes: 3 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func main() {

lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector")
sName := flag.String("streamname", "stream", "The stream name for this instance of loki-canary to use in the log selector")
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
Expand Down Expand Up @@ -69,7 +71,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal, *sName, *sValue)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
6 changes: 6 additions & 0 deletions docs/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@ All options:
Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s)
-size int
Size in bytes of each log line (default 100)
-streamname string
The stream name for this instance of loki-canary to use in the log selector (default "stream")
-streamvalue string
The unique stream value for this instance of loki-canary to use in the log selector (default "stdout")
-tls
Does the loki connection use TLS?
-user string
Loki username
-version
Print this builds version information
-wait duration
Duration to wait for log entries before reporting them lost (default 1m0s)
```
10 changes: 7 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Reader struct {
addr string
user string
pass string
sName string
sValue string
lName string
lVal string
conn *websocket.Conn
Expand All @@ -53,7 +55,7 @@ type Reader struct {
}

func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
address string, user string, pass string, labelName string, labelVal string) *Reader {
address string, user string, pass string, labelName string, labelVal string, streamName string, streamValue string) *Reader {
h := http.Header{}
if user != "" {
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
Expand All @@ -65,6 +67,8 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
addr: address,
user: user,
pass: pass,
sName: streamName,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
w: writer,
Expand Down Expand Up @@ -106,7 +110,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
Host: r.addr,
Path: "/api/prom/query",
RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) +
"&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) +
"&query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)) +
"&limit=1000",
}
fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())
Expand Down Expand Up @@ -206,7 +210,7 @@ func (r *Reader) closeAndReconnect() {
Scheme: scheme,
Host: r.addr,
Path: "/api/prom/tail",
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)),
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)),
}

fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)
Expand Down

0 comments on commit 17390fd

Please sign in to comment.