Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsdbrelay: denormalization backfill app #1076

Merged
merged 1 commit into from
Jun 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions cmd/backfill/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Backfill denormalizes historic OpenTSDB data.
//
// For ongoing denormalization use the functionality in tsdbrelay.
package main

import (
"flag"
"fmt"
"log"
"net/url"
"strconv"
"time"

"bosun.org/cmd/tsdbrelay/denormalize"
"bosun.org/collect"
"bosun.org/opentsdb"
)

var (
start = flag.String("start", "2013/01/01", "Start date to backfill.")
end = flag.String("end", "", "End date to backfill. Will go to now if not specified.")
ruleFlag = flag.String("rule", "", "A denormalization rule. ex `os.cpu__host`")
tsdbHost = flag.String("host", "", "OpenTSDB host")
batchSize = flag.Int("batch", 500, "batch size to send points to OpenTSDB")
)

func main() {
flag.Parse()
if *tsdbHost == "" {
flag.PrintDefaults()
log.Fatal("host must be supplied")
}
putUrl := (&url.URL{Scheme: "http", Host: *tsdbHost, Path: "api/put"}).String()

if *ruleFlag == "" {
flag.PrintDefaults()
log.Fatal("rule must be supplied")
}
rules, err := denormalize.ParseDenormalizationRules(*ruleFlag)
if err != nil {
log.Fatal(err)
}
if len(rules) > 1 {
log.Fatal("Please specify only one rule")
}
var rule *denormalize.DenormalizationRule
var metric string
for k, v := range rules {
metric = k
rule = v
}

query := &opentsdb.Query{Metric: metric, Aggregator: "avg"}
query.Tags, err = queryForAggregateTags(query)
if err != nil {
log.Fatal(err)
}

startDate, err := opentsdb.ParseTime(*start)
if err != nil {
log.Fatal(err)
}
endDate := time.Now().UTC()
if *end != "" {
endDate, err = opentsdb.ParseTime(*end)
if err != nil {
log.Fatal(err)
}
}

backfill := func(batchStart, batchEnd time.Time) (err error) {
startTimeString := batchStart.Format(opentsdb.TSDBTimeFormat)
endTimeString := batchEnd.Format(opentsdb.TSDBTimeFormat)
defer func() {
if err != nil {
log.Fatalf("Error on batch %s - %s. %v \n", startTimeString, endTimeString, err)
}
}()
req := opentsdb.Request{Start: startTimeString, End: endTimeString, Queries: []*opentsdb.Query{query}}
resp, err := req.Query(*tsdbHost)
if err != nil {
return err
}
dps := []*opentsdb.DataPoint{}
for _, r := range resp {
for t, p := range r.DPS {

timeStamp, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return err
}
dp := &opentsdb.DataPoint{
Timestamp: timeStamp,
Metric: r.Metric,
Tags: r.Tags,
Value: p,
}
err = rule.Translate(dp)
if err != nil {
return err
}
dps = append(dps, dp)
}
}
fmt.Printf("%s - %s: %d dps\n", startTimeString, endTimeString, len(dps))
total := 0
for len(dps) > 0 {
count := len(dps)
if len(dps) > *batchSize {
count = *batchSize
}
putResp, err := collect.SendDataPoints(dps[:count], putUrl)
if err != nil {
return err
}
defer putResp.Body.Close()

if putResp.StatusCode != 204 {
return fmt.Errorf("Non 204 status code from opentsdb: %d", putResp.StatusCode)
}
dps = dps[count:]
total += count
}
fmt.Printf("Relayed %d data points.\n", total)
return nil
}

// walk backwards a day at a time
curEnd := endDate
for curEnd.After(startDate) {
curStart := curEnd.Add(-24 * time.Hour)
if curStart.Before(startDate) {
curStart = startDate
}
backfill(curStart, curEnd)
curEnd = curEnd.Add(-24 * time.Hour)
}
}

func queryForAggregateTags(query *opentsdb.Query) (opentsdb.TagSet, error) {
req := opentsdb.Request{}
req.Queries = []*opentsdb.Query{query}
req.Start = "1h-ago"
resp, err := req.Query(*tsdbHost)
if err != nil {
return nil, err
}
if len(resp) < 1 {
return nil, fmt.Errorf("No points in last hour to learn aggregate tags")
}
tagset := make(opentsdb.TagSet)
for _, t := range resp[0].AggregateTags {
tagset[t] = "*"
}
return tagset, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package denormalize

import (
"fmt"
Expand Down Expand Up @@ -28,9 +28,7 @@ func (d *DenormalizationRule) String() string {
return fmt.Sprintf("%s{%s} -> __%s.%s", d.Metric, inputTags, outputTags, d.Metric)
}

var denormalizationRules map[string]*DenormalizationRule

func parseDenormalizationRules(config string) (map[string]*DenormalizationRule, error) {
func ParseDenormalizationRules(config string) (map[string]*DenormalizationRule, error) {
m := make(map[string]*DenormalizationRule)
rules := strings.Split(config, ",")
for _, r := range rules {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package denormalize

import (
"testing"
Expand Down
17 changes: 10 additions & 7 deletions cmd/tsdbrelay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@ import (
"net/http/httputil"
"net/url"

"bosun.org/cmd/tsdbrelay/denormalize"
"bosun.org/opentsdb"
)

var (
listenAddr = flag.String("l", ":4242", "Listen address.")
bosunServer = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.")
tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.")
logVerbose = flag.Bool("v", false, "enable verbose logging")
denormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `___metric__tagvalue__tagvalue`")
listenAddr = flag.String("l", ":4242", "Listen address.")
bosunServer = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.")
tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.")
logVerbose = flag.Bool("v", false, "enable verbose logging")
toDenormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `___metric__tagvalue__tagvalue`")
)

var (
tsdbPutURL string
bosunIndexURL string

denormalizationRules map[string]*denormalize.DenormalizationRule
)

func main() {
Expand All @@ -36,9 +39,9 @@ func main() {
log.Println("listen on", *listenAddr)
log.Println("relay to bosun at", *bosunServer)
log.Println("relay to tsdb at", *tsdbServer)
if *denormalize != "" {
if *toDenormalize != "" {
var err error
denormalizationRules, err = parseDenormalizationRules(*denormalize)
denormalizationRules, err = denormalize.ParseDenormalizationRules(*toDenormalize)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package collect // import "bosun.org/collect"

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -53,7 +52,7 @@ var (
tsdbURL string
osHostname string
metricRoot string
queue []json.RawMessage
queue []*opentsdb.DataPoint
qlock, mlock, slock sync.Mutex // Locks for queues, maps, stats.
counters = make(map[string]*addMetric)
sets = make(map[string]*setMetric)
Expand Down
63 changes: 30 additions & 33 deletions collect/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ func queuer() {
slock.Unlock()
break
}
m, err := json.Marshal(dp)
if err != nil {
slog.Error(err)
} else {
queue = append(queue, m)
}
queue = append(queue, dp)
select {
case dp = <-tchan:
continue
Expand Down Expand Up @@ -60,37 +55,24 @@ func send() {
}
}

func sendBatch(batch []json.RawMessage) {
func sendBatch(batch []*opentsdb.DataPoint) {
if Print {
for _, d := range batch {
slog.Info(string(d))
j, err := d.MarshalJSON()
if err != nil {
slog.Error(err)
}
slog.Info(string(j))
}
recordSent(len(batch))
return
}
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if err := json.NewEncoder(g).Encode(batch); err != nil {
slog.Error(err)
return
}
if err := g.Close(); err != nil {
slog.Error(err)
return
}
req, err := http.NewRequest("POST", tsdbURL, &buf)
if err != nil {
slog.Error(err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
now := time.Now()
resp, err := client.Do(req)
d := time.Since(now).Nanoseconds() / 1e6
resp, err := SendDataPoints(batch, tsdbURL)
if err == nil {
defer resp.Body.Close()
}
d := time.Since(now).Nanoseconds() / 1e6
Add("collect.post.total_duration", Tags, d)
Add("collect.post.count", Tags, 1)
// Some problem with connecting to the server; retry later.
Expand All @@ -111,13 +93,8 @@ func sendBatch(batch []json.RawMessage) {
}
restored := 0
for _, msg := range batch {
var dp opentsdb.DataPoint
if err := json.Unmarshal(msg, &dp); err != nil {
slog.Error(err)
continue
}
restored++
tchan <- &dp
tchan <- msg
}
d := time.Second * 5
Add("collect.post.restore", Tags, int64(restored))
Expand All @@ -136,3 +113,23 @@ func recordSent(num int) {
sent += int64(num)
slock.Unlock()
}

func SendDataPoints(dps []*opentsdb.DataPoint, tsdb string) (*http.Response, error) {
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if err := json.NewEncoder(g).Encode(dps); err != nil {
return nil, err
}
if err := g.Close(); err != nil {
return nil, err
}
req, err := http.NewRequest("POST", tsdb, &buf)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")

resp, err := client.Do(req)
return resp, err
}