Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 9986b3e

Browse files
authored
Merge pull request #952 from bloomberg/persistPartition
Persist partition
2 parents 1bc5b4c + d855cf7 commit 9986b3e

File tree

10 files changed

+5
-33
lines changed

10 files changed

+5
-33
lines changed

cmd/metrictank/metrictank.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func main() {
305305
}
306306

307307
if cluster.Mode == cluster.ModeMulti && len(inputs) > 1 {
308-
log.Warn("It is not recommended to run a mulitnode cluster with more than 1 input plugin.")
308+
log.Warn("It is not recommended to run a multi-node cluster with more than 1 input plugin.")
309309
}
310310

311311
/***********************************

docker/docker-chaos/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ brokers = kafka:9092
290290
topic = metricpersist
291291
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
292292
partitions = *
293-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
294-
partition-scheme = bySeries
295293
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
296294
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
297295
# Should match your kafka-mdm-in setting

docker/docker-cluster/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ brokers = kafka:9092
290290
topic = metricpersist
291291
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
292292
partitions = *
293-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
294-
partition-scheme = bySeries
295293
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
296294
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
297295
# Should match your kafka-mdm-in setting

docker/docker-dev-custom-cfg-kafka/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ brokers = kafka:9092
290290
topic = metricpersist
291291
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
292292
partitions = *
293-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
294-
partition-scheme = bySeries
295293
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
296294
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
297295
# Should match your kafka-mdm-in setting

docs/config.md

-2
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,6 @@ brokers = kafka:9092
348348
topic = metricpersist
349349
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
350350
partitions = *
351-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
352-
partition-scheme = bySeries
353351
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
354352
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
355353
# Should match your kafka-mdm-in setting

mdata/notifierKafka/cfg.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
"github.com/Shopify/sarama"
11-
part "github.com/grafana/metrictank/cluster/partitioner"
1211
"github.com/grafana/metrictank/kafka"
1312
"github.com/grafana/metrictank/stats"
1413
"github.com/raintank/worldping-api/pkg/log"
@@ -26,8 +25,6 @@ var offsetDuration time.Duration
2625
var offsetCommitInterval time.Duration
2726
var partitionStr string
2827
var partitions []int32
29-
var partitioner *part.Kafka
30-
var partitionScheme string
3128
var bootTimeOffsets map[int32]int64
3229
var backlogProcessTimeout time.Duration
3330
var backlogProcessTimeoutStr string
@@ -47,7 +44,6 @@ func init() {
4744
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
4845
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
4946
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
50-
fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
5147
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
5248
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
5349
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
@@ -79,6 +75,7 @@ func ConfigProcess(instance string) {
7975
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
8076
config.Producer.Compression = sarama.CompressionSnappy
8177
config.Producer.Return.Successes = true
78+
config.Producer.Partitioner = sarama.NewManualPartitioner
8279
err = config.Validate()
8380
if err != nil {
8481
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
@@ -89,11 +86,6 @@ func ConfigProcess(instance string) {
8986
log.Fatal(4, "kafka-cluster: unable to parse backlog-process-timeout. %s", err)
9087
}
9188

92-
partitioner, err = part.NewKafka(partitionScheme)
93-
if err != nil {
94-
log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err)
95-
}
96-
9789
if partitionStr != "*" {
9890
parts := strings.Split(partitionStr, ",")
9991
for _, part := range parts {

mdata/notifierKafka/notifierKafka.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -253,15 +253,10 @@ func (c *NotifierKafka) flush() {
253253
log.Fatal(4, "kafka-cluster failed to marshal persistMessage to json.")
254254
}
255255
messagesSize.Value(buf.Len())
256-
key := c.bPool.Get()
257-
key, err = partitioner.GetPartitionKey(&def, key)
258-
if err != nil {
259-
log.Fatal(4, "Unable to get partitionKey for metricDef with id %s. %s", def.Id, err)
260-
}
261256
kafkaMsg := &sarama.ProducerMessage{
262-
Topic: topic,
263-
Value: sarama.ByteEncoder(buf.Bytes()),
264-
Key: sarama.ByteEncoder(key),
257+
Topic: topic,
258+
Value: sarama.ByteEncoder(buf.Bytes()),
259+
Partition: def.Partition,
265260
}
266261
payload = append(payload, kafkaMsg)
267262
}
@@ -283,7 +278,6 @@ func (c *NotifierKafka) flush() {
283278
messagesPublished.Add(len(payload))
284279
// put our buffers back in the bufferPool
285280
for _, msg := range payload {
286-
c.bPool.Put([]byte(msg.Key.(sarama.ByteEncoder)))
287281
c.bPool.Put([]byte(msg.Value.(sarama.ByteEncoder)))
288282
}
289283
}()

metrictank-sample.ini

-2
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,6 @@ brokers = kafka:9092
293293
topic = metricpersist
294294
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
295295
partitions = *
296-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
297-
partition-scheme = bySeries
298296
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
299297
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
300298
# Should match your kafka-mdm-in setting

scripts/config/metrictank-docker.ini

-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ brokers = kafka:9092
290290
topic = metricpersist
291291
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
292292
partitions = *
293-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
294-
partition-scheme = bySeries
295293
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
296294
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
297295
# Should match your kafka-mdm-in setting

scripts/config/metrictank-package.ini

-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ brokers = localhost:9092
290290
topic = metricpersist
291291
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
292292
partitions = *
293-
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
294-
partition-scheme = bySeries
295293
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
296294
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
297295
# Should match your kafka-mdm-in setting

0 commit comments

Comments
 (0)