Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Persist partition #952

Merged
merged 5 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func main() {
}

if cluster.Mode == cluster.ModeMulti && len(inputs) > 1 {
log.Warn("It is not recommended to run a mulitnode cluster with more than 1 input plugin.")
log.Warn("It is not recommended to run a multi-node cluster with more than 1 input plugin.")
}

/***********************************
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
2 changes: 0 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
10 changes: 1 addition & 9 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/Shopify/sarama"
part "github.com/grafana/metrictank/cluster/partitioner"
"github.com/grafana/metrictank/kafka"
"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
Expand All @@ -26,8 +25,6 @@ var offsetDuration time.Duration
var offsetCommitInterval time.Duration
var partitionStr string
var partitions []int32
var partitioner *part.Kafka
var partitionScheme string
var bootTimeOffsets map[int32]int64
var backlogProcessTimeout time.Duration
var backlogProcessTimeoutStr string
Expand All @@ -47,7 +44,6 @@ func init() {
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
Expand Down Expand Up @@ -79,6 +75,7 @@ func ConfigProcess(instance string) {
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewManualPartitioner
err = config.Validate()
if err != nil {
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
Expand All @@ -89,11 +86,6 @@ func ConfigProcess(instance string) {
log.Fatal(4, "kafka-cluster: unable to parse backlog-process-timeout. %s", err)
}

partitioner, err = part.NewKafka(partitionScheme)
if err != nil {
log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err)
}

if partitionStr != "*" {
parts := strings.Split(partitionStr, ",")
for _, part := range parts {
Expand Down
12 changes: 3 additions & 9 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,10 @@ func (c *NotifierKafka) flush() {
log.Fatal(4, "kafka-cluster failed to marshal persistMessage to json.")
}
messagesSize.Value(buf.Len())
key := c.bPool.Get()
key, err = partitioner.GetPartitionKey(&def, key)
if err != nil {
log.Fatal(4, "Unable to get partitionKey for metricDef with id %s. %s", def.Id, err)
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(buf.Bytes()),
Key: sarama.ByteEncoder(key),
Topic: topic,
Value: sarama.ByteEncoder(buf.Bytes()),
Partition: def.Partition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

}
payload = append(payload, kafkaMsg)
}
Expand All @@ -283,7 +278,6 @@ func (c *NotifierKafka) flush() {
messagesPublished.Add(len(payload))
// put our buffers back in the bufferPool
for _, msg := range payload {
c.bPool.Put([]byte(msg.Key.(sarama.ByteEncoder)))
c.bPool.Put([]byte(msg.Value.(sarama.ByteEncoder)))
}
}()
Expand Down
2 changes: 0 additions & 2 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
2 changes: 0 additions & 2 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ brokers = kafka:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down
2 changes: 0 additions & 2 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ brokers = localhost:9092
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
Expand Down