Skip to content

Commit 4bc1853

Browse files
authored
Merge pull request #375 from graphite-ng/use-schema-from-metrictank
use schema from metrictank
2 parents 6932529 + e2d1016 commit 4bc1853

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2849
-1772
lines changed

Gopkg.lock

+30-28
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@
5959
name = "github.com/taylorchu/toki"
6060

6161
[[constraint]]
62-
name = "gopkg.in/raintank/schema.v1"
63-
version = "1.4.0"
62+
name = "github.com/grafana/metrictank"
63+
branch = "master"
6464

6565
[[constraint]]
6666
name = "github.com/sirupsen/logrus"

route/grafananet.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/jpillora/backoff"
2525
log "github.com/sirupsen/logrus"
2626

27-
"gopkg.in/raintank/schema.v1"
28-
"gopkg.in/raintank/schema.v1/msg"
27+
"github.com/grafana/metrictank/schema"
28+
"github.com/grafana/metrictank/schema/msg"
2929
)
3030

3131
type GrafanaNet struct {

route/kafkamdm.go

+45-17
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,23 @@ import (
1414
log "github.com/sirupsen/logrus"
1515

1616
"github.com/Shopify/sarama"
17+
"github.com/grafana/metrictank/cluster/partitioner"
18+
"github.com/grafana/metrictank/schema"
1719
"github.com/graphite-ng/carbon-relay-ng/persister"
18-
"github.com/raintank/metrictank/cluster/partitioner"
19-
"gopkg.in/raintank/schema.v1"
2020
)
2121

2222
type KafkaMdm struct {
2323
baseRoute
24-
saramaCfg *sarama.Config
25-
producer sarama.SyncProducer
26-
topic string
27-
brokers []string
28-
buf chan []byte
29-
partitioner *partitioner.Kafka
30-
schemas persister.WhisperSchemas
31-
blocking bool
32-
dispatch func(chan []byte, []byte, metrics.Gauge, metrics.Counter)
24+
saramaCfg *sarama.Config
25+
producer sarama.SyncProducer
26+
topic string
27+
numPartitions int32
28+
brokers []string
29+
buf chan []byte
30+
partitioner *partitioner.Kafka
31+
schemas persister.WhisperSchemas
32+
blocking bool
33+
dispatch func(chan []byte, []byte, metrics.Gauge, metrics.Counter)
3334

3435
orgId int // organisation to publish data under
3536

@@ -124,18 +125,45 @@ func NewKafkaMdm(key, prefix, sub, regex, topic, codec, schemasFile, partitionBy
124125
func (r *KafkaMdm) run() {
125126
metrics := make([]*schema.MetricData, 0, r.flushMaxNum)
126127
ticker := time.NewTicker(r.flushMaxWait)
128+
var client sarama.Client
127129
var err error
130+
attempts := 0
128131

129132
for r.producer == nil {
130-
r.producer, err = sarama.NewSyncProducer(r.brokers, r.saramaCfg)
133+
client, err = sarama.NewClient(r.brokers, r.saramaCfg)
131134
if err == sarama.ErrOutOfBrokers {
132135
log.Warnf("kafkaMdm %q: %s", r.key, err)
133136
// sleep before trying to connect again.
134137
time.Sleep(time.Second)
138+
attempts++
139+
// fail after 300 attempts
140+
if attempts > 300 {
141+
log.Fatalf("kafkaMdm %q: no kafka brokers available.", r.key)
142+
}
143+
continue
135144
} else if err != nil {
136145
log.Fatalf("kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err)
137146
}
147+
148+
partitions, err := client.Partitions(r.topic)
149+
if err != nil {
150+
log.Fatalf("kafkaMdm %q: failed to get partitions for topic %s - %s", r.key, r.topic, err)
151+
}
152+
if len(partitions) < 1 {
153+
log.Fatalf("kafkaMdm %q: retrieved 0 partitions for topic %s\nThis might indicate that kafka is not in a ready state.", r.key, r.topic)
154+
}
155+
156+
r.numPartitions = int32(len(partitions))
157+
158+
r.producer, err = sarama.NewSyncProducerFromClient(client)
159+
if err != nil {
160+
log.Fatalf("kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err)
161+
}
138162
}
163+
// sarama documentation states that we need to call Close() on the client
164+
// used to create the SyncProducer
165+
defer client.Close()
166+
139167
log.Infof("kafkaMdm %q: now connected to kafka", r.key)
140168

141169
// flushes the data to kafka and resets buffer. blocks until it succeeds
@@ -154,14 +182,14 @@ func (r *KafkaMdm) run() {
154182
}
155183
size += len(data)
156184

157-
key, err := r.partitioner.GetPartitionKey(metric, nil)
185+
partition, err := r.partitioner.Partition(metric, r.numPartitions)
158186
if err != nil {
159187
panic(err)
160188
}
161189
payload[i] = &sarama.ProducerMessage{
162-
Key: sarama.ByteEncoder(key),
163-
Topic: r.topic,
164-
Value: sarama.ByteEncoder(data),
190+
Partition: partition,
191+
Topic: r.topic,
192+
Value: sarama.ByteEncoder(data),
165193
}
166194
}
167195
err = r.producer.SendMessages(payload)
@@ -202,7 +230,7 @@ func (r *KafkaMdm) run() {
202230
r.numBuffered.Dec(1)
203231
md, err := parseMetric(buf, r.schemas, r.orgId)
204232
if err != nil {
205-
log.Errorf("KafkaMdm %q: %s", r.key, err)
233+
log.Errorf("KafkaMdm %q: parseMetric failed, skipping metric: %s", r.key, err)
206234
continue
207235
}
208236
md.SetId()

route/schemas.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"strconv"
88
"strings"
99

10+
"github.com/grafana/metrictank/schema"
1011
"github.com/graphite-ng/carbon-relay-ng/persister"
11-
"gopkg.in/raintank/schema.v1"
1212
)
1313

1414
func getSchemas(file string) (persister.WhisperSchemas, error) {
@@ -76,7 +76,6 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) (*sche
7676

7777
md := schema.MetricData{
7878
Name: name,
79-
Metric: name,
8079
Interval: s.Retentions[0].SecondsPerPoint(),
8180
Value: val,
8281
Unit: "unknown",
@@ -85,5 +84,13 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) (*sche
8584
Tags: tags,
8685
OrgId: orgId,
8786
}
87+
88+
// ensure MetricData is valid
89+
// this will check for an invalid name and tags
90+
err = md.Validate()
91+
if err != nil {
92+
return nil, err
93+
}
94+
8895
return &md, nil
8996
}

route/schemas_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"strings"
99
"testing"
1010

11+
"github.com/grafana/metrictank/schema"
1112
"github.com/graphite-ng/carbon-relay-ng/persister"
12-
"gopkg.in/raintank/schema.v1"
1313
)
1414

1515
func getMatchEverythingSchemas() persister.WhisperSchemas {
@@ -37,7 +37,6 @@ func TestParseMetricWithTags(t *testing.T) {
3737
sort.Strings(tags)
3838
expectedMd := &schema.MetricData{
3939
Name: name,
40-
Metric: name,
4140
Interval: 10,
4241
Value: value,
4342
Unit: "unknown",
@@ -60,7 +59,6 @@ func TestParseMetricWithoutTags(t *testing.T) {
6059
md, _ := parseMetric(line, schemas, 1)
6160
expectedMd := &schema.MetricData{
6261
Name: name,
63-
Metric: name,
6462
Interval: 10,
6563
Value: value,
6664
Unit: "unknown",

vendor/github.com/cespare/xxhash/LICENSE.txt

+22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)