diff --git a/cmd/backfill/main.go b/cmd/backfill/main.go new file mode 100644 index 0000000000..880fc56248 --- /dev/null +++ b/cmd/backfill/main.go @@ -0,0 +1,156 @@ +// Backfill denormalizes historic OpenTSDB data. +// +// For ongoing denormalization use the functionality in tsdbrelay. +package main + +import ( + "flag" + "fmt" + "log" + "net/url" + "strconv" + "time" + + "bosun.org/cmd/tsdbrelay/denormalize" + "bosun.org/collect" + "bosun.org/opentsdb" +) + +var ( + start = flag.String("start", "2013/01/01", "Start date to backfill.") + end = flag.String("end", "", "End date to backfill. Will go to now if not specified.") + ruleFlag = flag.String("rule", "", "A denormalization rule. ex `os.cpu__host`") + tsdbHost = flag.String("host", "", "OpenTSDB host") + batchSize = flag.Int("batch", 500, "batch size to send points to OpenTSDB") +) + +func main() { + flag.Parse() + if *tsdbHost == "" { + flag.PrintDefaults() + log.Fatal("host must be supplied") + } + putUrl := (&url.URL{Scheme: "http", Host: *tsdbHost, Path: "api/put"}).String() + + if *ruleFlag == "" { + flag.PrintDefaults() + log.Fatal("rule must be supplied") + } + rules, err := denormalize.ParseDenormalizationRules(*ruleFlag) + if err != nil { + log.Fatal(err) + } + if len(rules) > 1 { + log.Fatal("Please specify only one rule") + } + var rule *denormalize.DenormalizationRule + var metric string + for k, v := range rules { + metric = k + rule = v + } + + query := &opentsdb.Query{Metric: metric, Aggregator: "avg"} + query.Tags, err = queryForAggregateTags(query) + if err != nil { + log.Fatal(err) + } + + startDate, err := opentsdb.ParseTime(*start) + if err != nil { + log.Fatal(err) + } + endDate := time.Now().UTC() + if *end != "" { + endDate, err = opentsdb.ParseTime(*end) + if err != nil { + log.Fatal(err) + } + } + + backfill := func(batchStart, batchEnd time.Time) (err error) { + startTimeString := batchStart.Format(opentsdb.TSDBTimeFormat) + endTimeString := batchEnd.Format(opentsdb.TSDBTimeFormat) + defer func() { + if err != nil { + log.Fatalf("Error on batch %s - %s. %v \n", startTimeString, endTimeString, err) + } + }() + req := opentsdb.Request{Start: startTimeString, End: endTimeString, Queries: []*opentsdb.Query{query}} + resp, err := req.Query(*tsdbHost) + if err != nil { + return err + } + dps := []*opentsdb.DataPoint{} + for _, r := range resp { + for t, p := range r.DPS { + + timeStamp, err := strconv.ParseInt(t, 10, 64) + if err != nil { + return err + } + dp := &opentsdb.DataPoint{ + Timestamp: timeStamp, + Metric: r.Metric, + Tags: r.Tags, + Value: p, + } + err = rule.Translate(dp) + if err != nil { + return err + } + dps = append(dps, dp) + } + } + fmt.Printf("%s - %s: %d dps\n", startTimeString, endTimeString, len(dps)) + total := 0 + for len(dps) > 0 { + count := len(dps) + if len(dps) > *batchSize { + count = *batchSize + } + putResp, err := collect.SendDataPoints(dps[:count], putUrl) + if err != nil { + return err + } + defer putResp.Body.Close() + + if putResp.StatusCode != 204 { + return fmt.Errorf("Non 204 status code from opentsdb: %d", putResp.StatusCode) + } + dps = dps[count:] + total += count + } + fmt.Printf("Relayed %d data points.\n", total) + return nil + } + + // walk backwards a day at a time + curEnd := endDate + for curEnd.After(startDate) { + curStart := curEnd.Add(-24 * time.Hour) + if curStart.Before(startDate) { + curStart = startDate + } + backfill(curStart, curEnd) + curEnd = curEnd.Add(-24 * time.Hour) + } +} + +func queryForAggregateTags(query *opentsdb.Query) (opentsdb.TagSet, error) { + req := opentsdb.Request{} + req.Queries = []*opentsdb.Query{query} + req.Start = "1h-ago" + resp, err := req.Query(*tsdbHost) + if err != nil { + return nil, err + } + if len(resp) < 1 { + return nil, fmt.Errorf("No points in last hour to learn aggregate tags") + } + tagset := make(opentsdb.TagSet) + for _, t := range resp[0].AggregateTags { + tagset[t] = "*" + } + return tagset, nil +} diff --git a/cmd/tsdbrelay/denormalization.go b/cmd/tsdbrelay/denormalize/denormalization.go similarity index 91% rename from cmd/tsdbrelay/denormalization.go rename to cmd/tsdbrelay/denormalize/denormalization.go index 724f58ae38..f5b614bf79 100644 --- a/cmd/tsdbrelay/denormalization.go +++ b/cmd/tsdbrelay/denormalize/denormalization.go @@ -1,4 +1,4 @@ -package main +package denormalize import ( "fmt" @@ -28,9 +28,7 @@ func (d *DenormalizationRule) String() string { return fmt.Sprintf("%s{%s} -> __%s.%s", d.Metric, inputTags, outputTags, d.Metric) } -var denormalizationRules map[string]*DenormalizationRule - -func parseDenormalizationRules(config string) (map[string]*DenormalizationRule, error) { +func ParseDenormalizationRules(config string) (map[string]*DenormalizationRule, error) { m := make(map[string]*DenormalizationRule) rules := strings.Split(config, ",") for _, r := range rules { diff --git a/cmd/tsdbrelay/denormalization_test.go b/cmd/tsdbrelay/denormalize/denormalization_test.go similarity index 98% rename from cmd/tsdbrelay/denormalization_test.go rename to cmd/tsdbrelay/denormalize/denormalization_test.go index c3fd54fd02..00aef60355 100644 --- a/cmd/tsdbrelay/denormalization_test.go +++ b/cmd/tsdbrelay/denormalize/denormalization_test.go @@ -1,4 +1,4 @@ -package main +package denormalize import ( "testing" diff --git a/cmd/tsdbrelay/main.go b/cmd/tsdbrelay/main.go index 8d13aadaf2..773c7cf6b0 100644 --- a/cmd/tsdbrelay/main.go +++ b/cmd/tsdbrelay/main.go @@ -12,20 +12,23 @@ import ( "net/http/httputil" "net/url" + "bosun.org/cmd/tsdbrelay/denormalize" "bosun.org/opentsdb" ) var ( - listenAddr = flag.String("l", ":4242", "Listen address.") - bosunServer = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.") - tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.") - logVerbose = flag.Bool("v", false, "enable verbose logging") - denormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `___metric__tagvalue__tagvalue`") + listenAddr = flag.String("l", ":4242", "Listen address.") + bosunServer = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.") + tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.") + logVerbose = flag.Bool("v", false, "enable verbose logging") + toDenormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `___metric__tagvalue__tagvalue`") ) var ( tsdbPutURL string bosunIndexURL string + + denormalizationRules map[string]*denormalize.DenormalizationRule ) func main() { @@ -36,9 +39,9 @@ func main() { log.Println("listen on", *listenAddr) log.Println("relay to bosun at", *bosunServer) log.Println("relay to tsdb at", *tsdbServer) - if *denormalize != "" { + if *toDenormalize != "" { var err error - denormalizationRules, err = parseDenormalizationRules(*denormalize) + denormalizationRules, err = denormalize.ParseDenormalizationRules(*toDenormalize) if err != nil { log.Fatal(err) } diff --git a/collect/collect.go b/collect/collect.go index f29f5fa34b..2487fad490 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -5,7 +5,6 @@ package collect // import "bosun.org/collect" import ( - "encoding/json" "fmt" "net/http" "net/url" @@ -53,7 +52,7 @@ var ( tsdbURL string osHostname string metricRoot string - queue []json.RawMessage + queue []*opentsdb.DataPoint qlock, mlock, slock sync.Mutex // Locks for queues, maps, stats. counters = make(map[string]*addMetric) sets = make(map[string]*setMetric) diff --git a/collect/queue.go b/collect/queue.go index 8c4c69b619..aff2cd7da9 100644 --- a/collect/queue.go +++ b/collect/queue.go @@ -22,12 +22,7 @@ func queuer() { slock.Unlock() break } - m, err := json.Marshal(dp) - if err != nil { - slog.Error(err) - } else { - queue = append(queue, m) - } + queue = append(queue, dp) select { case dp = <-tchan: continue @@ -60,37 +55,24 @@ func send() { } } -func sendBatch(batch []json.RawMessage) { +func sendBatch(batch []*opentsdb.DataPoint) { if Print { for _, d := range batch { - slog.Info(string(d)) + j, err := d.MarshalJSON() + if err != nil { + slog.Error(err) + } + slog.Info(string(j)) } recordSent(len(batch)) return } - var buf bytes.Buffer - g := gzip.NewWriter(&buf) - if err := json.NewEncoder(g).Encode(batch); err != nil { - slog.Error(err) - return - } - if err := g.Close(); err != nil { - slog.Error(err) - return - } - req, err := http.NewRequest("POST", tsdbURL, &buf) - if err != nil { - slog.Error(err) - return - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Content-Encoding", "gzip") now := time.Now() - resp, err := client.Do(req) - d := time.Since(now).Nanoseconds() / 1e6 + resp, err := SendDataPoints(batch, tsdbURL) if err == nil { defer resp.Body.Close() } + d := time.Since(now).Nanoseconds() / 1e6 Add("collect.post.total_duration", Tags, d) Add("collect.post.count", Tags, 1) // Some problem with connecting to the server; retry later. @@ -111,13 +93,8 @@ func sendBatch(batch []json.RawMessage) { } restored := 0 for _, msg := range batch { - var dp opentsdb.DataPoint - if err := json.Unmarshal(msg, &dp); err != nil { - slog.Error(err) - continue - } restored++ - tchan <- &dp + tchan <- msg } d := time.Second * 5 Add("collect.post.restore", Tags, int64(restored)) @@ -136,3 +113,23 @@ func recordSent(num int) { sent += int64(num) slock.Unlock() } + +func SendDataPoints(dps []*opentsdb.DataPoint, tsdb string) (*http.Response, error) { + var buf bytes.Buffer + g := gzip.NewWriter(&buf) + if err := json.NewEncoder(g).Encode(dps); err != nil { + return nil, err + } + if err := g.Close(); err != nil { + return nil, err + } + req, err := http.NewRequest("POST", tsdb, &buf) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") + + resp, err := client.Do(req) + return resp, err +}