Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use schema from metrictank #375

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
name = "github.com/taylorchu/toki"

[[constraint]]
name = "gopkg.in/raintank/schema.v1"
version = "1.4.0"
name = "github.com/grafana/metrictank"
branch = "master"

[[constraint]]
name = "github.com/sirupsen/logrus"
Expand Down
4 changes: 2 additions & 2 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"

"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/schema/msg"
)

type GrafanaNet struct {
Expand Down
62 changes: 45 additions & 17 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@ import (
log "github.com/sirupsen/logrus"

"github.com/Shopify/sarama"
"github.com/grafana/metrictank/cluster/partitioner"
"github.com/grafana/metrictank/schema"
"github.com/graphite-ng/carbon-relay-ng/persister"
"github.com/raintank/metrictank/cluster/partitioner"
"gopkg.in/raintank/schema.v1"
)

type KafkaMdm struct {
baseRoute
saramaCfg *sarama.Config
producer sarama.SyncProducer
topic string
brokers []string
buf chan []byte
partitioner *partitioner.Kafka
schemas persister.WhisperSchemas
blocking bool
dispatch func(chan []byte, []byte, metrics.Gauge, metrics.Counter)
saramaCfg *sarama.Config
producer sarama.SyncProducer
topic string
numPartitions int32
brokers []string
buf chan []byte
partitioner *partitioner.Kafka
schemas persister.WhisperSchemas
blocking bool
dispatch func(chan []byte, []byte, metrics.Gauge, metrics.Counter)

orgId int // organisation to publish data under

Expand Down Expand Up @@ -124,18 +125,45 @@ func NewKafkaMdm(key, prefix, sub, regex, topic, codec, schemasFile, partitionBy
func (r *KafkaMdm) run() {
metrics := make([]*schema.MetricData, 0, r.flushMaxNum)
ticker := time.NewTicker(r.flushMaxWait)
var client sarama.Client
var err error
attempts := 0

for r.producer == nil {
r.producer, err = sarama.NewSyncProducer(r.brokers, r.saramaCfg)
client, err = sarama.NewClient(r.brokers, r.saramaCfg)
if err == sarama.ErrOutOfBrokers {
log.Warnf("kafkaMdm %q: %s", r.key, err)
// sleep before trying to connect again.
time.Sleep(time.Second)
attempts++
// fail after 300 attempts
if attempts > 300 {
log.Fatalf("kafkaMdm %q: no kafka brokers available.", r.key)
}
continue
} else if err != nil {
log.Fatalf("kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err)
}

partitions, err := client.Partitions(r.topic)
if err != nil {
log.Fatalf("kafkaMdm %q: failed to get partitions for topic %s - %s", r.key, r.topic, err)
}
if len(partitions) < 1 {
log.Fatalf("kafkaMdm %q: retrieved 0 partitions for topic %s\nThis might indicate that kafka is not in a ready state.", r.key, r.topic)
}

r.numPartitions = int32(len(partitions))

r.producer, err = sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err)
}
}
// sarama documentation states that we need to call Close() on the client
// used to create the SyncProducer
defer client.Close()

log.Infof("kafkaMdm %q: now connected to kafka", r.key)

// flushes the data to kafka and resets buffer. blocks until it succeeds
Expand All @@ -154,14 +182,14 @@ func (r *KafkaMdm) run() {
}
size += len(data)

key, err := r.partitioner.GetPartitionKey(metric, nil)
partition, err := r.partitioner.Partition(metric, r.numPartitions)
if err != nil {
panic(err)
}
payload[i] = &sarama.ProducerMessage{
Key: sarama.ByteEncoder(key),
Topic: r.topic,
Value: sarama.ByteEncoder(data),
Partition: partition,
Topic: r.topic,
Value: sarama.ByteEncoder(data),
}
}
err = r.producer.SendMessages(payload)
Expand Down Expand Up @@ -202,7 +230,7 @@ func (r *KafkaMdm) run() {
r.numBuffered.Dec(1)
md, err := parseMetric(buf, r.schemas, r.orgId)
if err != nil {
log.Errorf("KafkaMdm %q: %s", r.key, err)
log.Errorf("KafkaMdm %q: parseMetric failed, skipping metric: %s", r.key, err)
continue
}
md.SetId()
Expand Down
11 changes: 9 additions & 2 deletions route/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"strconv"
"strings"

"github.com/grafana/metrictank/schema"
"github.com/graphite-ng/carbon-relay-ng/persister"
"gopkg.in/raintank/schema.v1"
)

func getSchemas(file string) (persister.WhisperSchemas, error) {
Expand Down Expand Up @@ -76,7 +76,6 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) (*sche

md := schema.MetricData{
Name: name,
Metric: name,
Interval: s.Retentions[0].SecondsPerPoint(),
Value: val,
Unit: "unknown",
Expand All @@ -85,5 +84,13 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) (*sche
Tags: tags,
OrgId: orgId,
}

// ensure MetricData is valid
// this will check for an invalid name and tags
err = md.Validate()
if err != nil {
return nil, err
}

return &md, nil
}
4 changes: 1 addition & 3 deletions route/schemas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"strings"
"testing"

"github.com/grafana/metrictank/schema"
"github.com/graphite-ng/carbon-relay-ng/persister"
"gopkg.in/raintank/schema.v1"
)

func getMatchEverythingSchemas() persister.WhisperSchemas {
Expand Down Expand Up @@ -37,7 +37,6 @@ func TestParseMetricWithTags(t *testing.T) {
sort.Strings(tags)
expectedMd := &schema.MetricData{
Name: name,
Metric: name,
Interval: 10,
Value: value,
Unit: "unknown",
Expand All @@ -60,7 +59,6 @@ func TestParseMetricWithoutTags(t *testing.T) {
md, _ := parseMetric(line, schemas, 1)
expectedMd := &schema.MetricData{
Name: name,
Metric: name,
Interval: 10,
Value: value,
Unit: "unknown",
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/cespare/xxhash/LICENSE.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading