Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

mt-gateway: support ingesting data into kafka #1608

Merged
merged 24 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f748bf3
initial import of "publish" package from raintank/tsdb-gw (doesn't co…
fitzoh Jan 15, 2020
66a9481
update imports from raintank/tsdb-gw -> grafana/metrictank (still no …
fitzoh Jan 15, 2020
6b76e24
copy-paste metrics_client package from raintank/tsdb_gw to fix compil…
fitzoh Jan 15, 2020
e89cc6e
pull in util/flags package from raintank/tsdb-gw to fix compilation i…
fitzoh Jan 15, 2020
85e9c4f
import BufferPool33 struct from raintank/tsdb-gw to fix compilation e…
fitzoh Jan 15, 2020
d955422
fix compilation error
fitzoh Jan 15, 2020
e37ea38
add missing publisher test dependency (github.com/Shopify/sarama/mocks)
fitzoh Jan 15, 2020
0b5c3fc
go fmt
fitzoh Jan 15, 2020
c2ccac6
get rid of unused variable and the dependency it pulled in
fitzoh Jan 16, 2020
3f7baea
remove unnecessary function/file
fitzoh Jan 16, 2020
8674c57
initial ingest of metrics ingestion files
fitzoh Jan 21, 2020
d50c88e
update imports
fitzoh Jan 21, 2020
3a270f8
drop rate limiting
fitzoh Jan 21, 2020
d4d0b7d
Replace macaron ctx object with base http objects
fitzoh Jan 21, 2020
60a6788
There's no auth in this version, so we should assume admin = true
fitzoh Jan 21, 2020
7adf01d
Wire up new ingest handler
fitzoh Jan 21, 2020
82e377e
go fmt
fitzoh Jan 21, 2020
94c2112
configure kafka for ingestor endpoint
fitzoh Jan 21, 2020
b6887d9
sync tool docs
fitzoh Jan 21, 2020
87cb16c
fix message + make more descriptive
fitzoh Jan 22, 2020
186fab2
update imports
fitzoh Jan 22, 2020
1abc592
Refactor responses/logging for failed ingest requests
fitzoh Jan 22, 2020
c90e7a7
remove unused function
fitzoh Jan 22, 2020
3237276
Fprintf -> Fprint
fitzoh Jan 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions cmd/mt-gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package main

import (
"fmt"
log "github.com/sirupsen/logrus"
"net/http"
"net/http/httputil"
"strconv"
"strings"

"github.com/grafana/metrictank/cmd/mt-gateway/ingest"
"github.com/grafana/metrictank/publish"
"github.com/grafana/metrictank/publish/kafka"
log "github.com/sirupsen/logrus"
)

//Maintains a set of `http.Handlers` for the different API endpoints.
Expand All @@ -20,18 +25,27 @@ type Api struct {
//Constructs a new Api based on the passed in URLS
func NewApi(urls Urls) Api {
api := Api{}
api.ingestHandler = withMiddleware("ingest", ingestHandlerStub)
api.ingestHandler = withMiddleware("ingest", ingestHandler(urls))
api.graphiteHandler = withMiddleware("graphite", httputil.NewSingleHostReverseProxy(urls.graphite))
api.metrictankHandler = withMiddleware("metrictank", httputil.NewSingleHostReverseProxy(urls.metrictank))
api.bulkImportHandler = withMiddleware("bulk-importer", bulkImportHandler(urls))
return api
}

//TODO replace this with an actual implementation
var ingestHandlerStub = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
_, _ = fmt.Fprintln(w, "http ingest not yet implemented")
})
func ingestHandler(urls Urls) http.Handler {
publisher := kafka.New(strings.Split(urls.kafkaBrokers, ","), true)
if publisher == nil {
log.Info("metrics ingestion not enabled (no kafka brokers configured)")
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = fmt.Fprintln(w, "metrics ingestion not enabled (no kafka brokers configured)")
})
} else {
publish.Init(publisher)
return http.HandlerFunc(ingest.Metrics)
}

}

//Returns a proxy to the bulk importer if one is configured, otherwise a handler that always returns a 503
func bulkImportHandler(urls Urls) http.Handler {
Expand Down
216 changes: 216 additions & 0 deletions cmd/mt-gateway/ingest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package ingest

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/golang/snappy"
"github.com/grafana/metrictank/publish"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/schema/msg"
"github.com/grafana/metrictank/stats"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
)

var (
metricsValid = stats.NewCounterRate32("metrics.http.valid") // valid metrics received (not necessarily published)
metricsRejected = stats.NewCounterRate32("metrics.http.rejected") // invalid metrics received

metricsTSLock = &sync.Mutex{}
metricsTimestamp = make(map[int]*stats.Range32)

discardedSamples = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "gateway",
Name: "invalid_samples_total",
Help: "The total number of samples that were discarded because they are invalid.",
},
[]string{"reason", "org"},
)
)

func getMetricsTimestampStat(org int) *stats.Range32 {
metricsTSLock.Lock()
metricTimestamp, ok := metricsTimestamp[org]
if !ok {
metricTimestamp = stats.NewRange32(fmt.Sprintf("metrics.timestamp.http.%d", org)) // min/max timestamps seen in each interval
metricsTimestamp[org] = metricTimestamp
}
metricsTSLock.Unlock()
return metricTimestamp
}

func Metrics(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
switch contentType {
case "rt-metric-binary":
metricsBinary(w, r, false)
case "rt-metric-binary-snappy":
metricsBinary(w, r, true)
case "application/json":
metricsJson(w, r)
default:
writeErrorResponse(w, 400, "unknown content-type: %s", contentType)
}
}

type discardsByReason map[string]int
type discardsByOrg map[int]discardsByReason

func (dbo discardsByOrg) Add(org int, reason string) {
dbr, ok := dbo[org]
if !ok {
dbr = make(discardsByReason)
}
dbr[reason]++
dbo[org] = dbr
}

func prepareIngest(in []*schema.MetricData, toPublish []*schema.MetricData) ([]*schema.MetricData, MetricsResponse) {
resp := NewMetricsResponse()
promDiscards := make(discardsByOrg)

var metricTimestamp *stats.Range32

for i, m := range in {
if m.Mtype == "" {
m.Mtype = "gauge"
}
if err := m.Validate(); err != nil {
log.Debugf("received invalid metric: %v %v %v", m.Name, m.OrgId, m.Tags)
resp.AddInvalid(err, i)
promDiscards.Add(m.OrgId, err.Error())
continue
}
metricTimestamp = getMetricsTimestampStat(m.OrgId)
metricTimestamp.ValueUint32(uint32(m.Time))
toPublish = append(toPublish, m)
}

// track invalid/discards in graphite and prometheus
metricsRejected.Add(resp.Invalid)
metricsValid.Add(len(toPublish))
for org, promDiscardsByOrg := range promDiscards {
for reason, cnt := range promDiscardsByOrg {
discardedSamples.WithLabelValues(reason, strconv.Itoa(org)).Add(float64(cnt))
}
}
return toPublish, resp
}

func metricsJson(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
writeErrorResponse(w, 400, "no data included in request.")
return
}
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
select {
case <-r.Context().Done():
writeErrorResponse(w, 499, "request canceled")
default:
writeErrorResponse(w, 500, "unable to read request body. %s", err)
}
return
}
metrics := make([]*schema.MetricData, 0)
err = json.Unmarshal(body, &metrics)
if err != nil {
writeErrorResponse(w, 400, "unable to parse request body. %s", err)
return
}

toPublish := make([]*schema.MetricData, 0, len(metrics))
toPublish, resp := prepareIngest(metrics, toPublish)

select {
case <-r.Context().Done():
writeErrorResponse(w, 499, "request canceled")
return
default:
}

err = publish.Publish(toPublish)
if err != nil {
writeErrorResponse(w, 500, "failed to publish metrics. %s", err)
return
}

// track published in the response (it already has discards)
resp.Published = len(toPublish)
w.WriteHeader(200)
json.NewEncoder(w).Encode(resp)
}

func metricsBinary(w http.ResponseWriter, r *http.Request, compressed bool) {
if r.Body == nil {
writeErrorResponse(w, 400, "no data included in request.")
return
}
var bodyReadCloser io.ReadCloser
if compressed {
bodyReadCloser = ioutil.NopCloser(snappy.NewReader(r.Body))
} else {
bodyReadCloser = r.Body
}
defer bodyReadCloser.Close()

body, err := ioutil.ReadAll(bodyReadCloser)
if err != nil {
select {
case <-r.Context().Done():
writeErrorResponse(w, 499, "request canceled")
default:
writeErrorResponse(w, 500, "unable to read request body. %s", err)
}
return
}
metricData := new(msg.MetricData)
err = metricData.InitFromMsg(body)
if err != nil {
writeErrorResponse(w, 400, "payload not metricData. %s", err)
return
}

err = metricData.DecodeMetricData()
if err != nil {
writeErrorResponse(w, 400, "failed to unmarshal metricData. %s", err)
return
}

toPublish := make([]*schema.MetricData, 0, len(metricData.Metrics))
toPublish, resp := prepareIngest(metricData.Metrics, toPublish)

select {
case <-r.Context().Done():
writeErrorResponse(w, 499, "request canceled")
return
default:
}

err = publish.Publish(toPublish)
if err != nil {
writeErrorResponse(w, 500, "failed to publish metrics. %s", err)
return
}

// track published in the response (it already has discards)
resp.Published = len(toPublish)
w.WriteHeader(200)
json.NewEncoder(w).Encode(resp)
}

func writeErrorResponse(w http.ResponseWriter, status int, msg string, fmtArgs ...interface{}) {
w.WriteHeader(status)
formatted := fmt.Sprint(msg, fmtArgs)
log.Error(formatted)
fmt.Fprint(w, formatted)
}
32 changes: 32 additions & 0 deletions cmd/mt-gateway/ingest/metrics_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ingest

type MetricsResponse struct {
Invalid int
Published int
ValidationErrors map[string]ValidationError
}

func NewMetricsResponse() MetricsResponse {
return MetricsResponse{
ValidationErrors: make(map[string]ValidationError),
}
}

type ValidationError struct {
Count int
ExampleIds []int
}

// AddInvalid updates the counts and makes sure there's up to 10 examples per error
func (m *MetricsResponse) AddInvalid(err error, index int) {
key := err.Error()
m.Invalid++
vErr := m.ValidationErrors[key]
vErr.Count++

if vErr.Count < 10 {
vErr.ExampleIds = append(vErr.ExampleIds, index)
}

m.ValidationErrors[key] = vErr
}
3 changes: 3 additions & 0 deletions cmd/mt-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ var (
importerURL = flag.String("importer-url", "", "mt-whisper-importer-writer address")
addr = flag.String("addr", ":80", "http service address")
defaultOrgId = flag.Int("default-org-id", -1, "default org ID to send to downstream services if none is provided")
brokers = flag.String("kafka-tcp-addr", "localhost:9092", "kafka tcp address(es) for metrics, in csv host[:port] format")
)

type Urls struct {
metrictank *url.URL
graphite *url.URL
bulkImporter *url.URL
kafkaBrokers string
}

func init() {
Expand Down Expand Up @@ -54,6 +56,7 @@ func main() {

var err error
urls := Urls{}
urls.kafkaBrokers = *brokers

urls.metrictank, err = url.Parse(*metrictankUrl)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,40 @@ Flags:
http service address (default ":80")
-default-org-id int
default org ID to send to downstream services if none is provided (default -1)
-discard-prefixes string
discard data points starting with one of the given prefixes separated by | (may be given multiple times, once per topic, as a comma-separated list)
-graphite-url string
graphite-api address (default "http://localhost:8080")
-importer-url string
mt-whisper-importer-writer address
-kafka-tcp-addr string
kafka tcp address(es) for metrics, in csv host[:port] format (default "localhost:9092")
-kafka-version string
Kafka version in semver format. All brokers must be this version or newer. (default "0.10.0.0")
-metrics-flush-freq duration
The best-effort frequency of flushes to kafka (default 50ms)
-metrics-kafka-comp string
compression: none|gzip|snappy (default "snappy")
-metrics-max-messages int
The maximum number of messages the producer will send in a single request (default 5000)
-metrics-partition-scheme string
method used for partitioning metrics. (byOrg|bySeries|bySeriesWithTags|bySeriesWithTagsFnv) (may be given multiple times, once per topic, as a comma-separated list) (default "bySeries")
-metrics-publish
enable metric publishing
-metrics-topic string
topic for metrics (may be given multiple times as a comma-separated list) (default "mdm")
-metrictank-url string
metrictank address (default "http://localhost:6060")
-only-org-id value
restrict publishing data belonging to org id; 0 means no restriction (may be given multiple times, once per topic, as a comma-separated list)
-schemas-file string
path to carbon storage-schemas.conf file (default "/etc/gw/storage-schemas.conf")
-v2
enable optimized MetricPoint payload (default true)
-v2-clear-interval duration
interval after which we always resend a full MetricData (default 1h0m0s)
-v2-org
encode org-id in messages (default true)
-version
print version string
```
Expand Down
Loading