From 31c05869229d2c735df0cf6c689af1d2f2e757c6 Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Wed, 27 Jun 2018 14:52:16 -0400 Subject: [PATCH 1/5] Remove partition scheme from notifierKafka --- mdata/notifierKafka/cfg.go | 10 +--------- mdata/notifierKafka/notifierKafka.go | 12 +++--------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index b45f17b00f..d3665fc34b 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -8,7 +8,6 @@ import ( "time" "github.com/Shopify/sarama" - part "github.com/grafana/metrictank/cluster/partitioner" "github.com/grafana/metrictank/kafka" "github.com/grafana/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" @@ -26,8 +25,6 @@ var offsetDuration time.Duration var offsetCommitInterval time.Duration var partitionStr string var partitions []int32 -var partitioner *part.Kafka -var partitionScheme string var bootTimeOffsets map[int32]int64 var backlogProcessTimeout time.Duration var backlogProcessTimeoutStr string @@ -47,7 +44,6 @@ func init() { fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)") fs.StringVar(&topic, "topic", "metricpersist", "kafka topic") fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in") - fs.StringVar(&partitionScheme, "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)") fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") @@ -79,6 +75,7 @@ func ConfigProcess(instance string) { config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message config.Producer.Compression = sarama.CompressionSnappy config.Producer.Return.Successes = true + config.Producer.Partitioner = sarama.NewManualPartitioner err = config.Validate() if err != nil { log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) @@ -89,11 +86,6 @@ func ConfigProcess(instance string) { log.Fatal(4, "kafka-cluster: unable to parse backlog-process-timeout. %s", err) } - partitioner, err = part.NewKafka(partitionScheme) - if err != nil { - log.Fatal(4, "kafka-cluster: failed to initialize partitioner. %s", err) - } - if partitionStr != "*" { parts := strings.Split(partitionStr, ",") for _, part := range parts { diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index a7a5013513..ac394b18d5 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -253,15 +253,10 @@ func (c *NotifierKafka) flush() { log.Fatal(4, "kafka-cluster failed to marshal persistMessage to json.") } messagesSize.Value(buf.Len()) - key := c.bPool.Get() - key, err = partitioner.GetPartitionKey(&def, key) - if err != nil { - log.Fatal(4, "Unable to get partitionKey for metricDef with id %s. %s", def.Id, err) - } kafkaMsg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(buf.Bytes()), - Key: sarama.ByteEncoder(key), + Topic: topic, + Value: sarama.ByteEncoder(buf.Bytes()), + Partition: def.Partition, } payload = append(payload, kafkaMsg) } @@ -283,7 +278,6 @@ func (c *NotifierKafka) flush() { messagesPublished.Add(len(payload)) // put our buffers back in the bufferPool for _, msg := range payload { - c.bPool.Put([]byte(msg.Key.(sarama.ByteEncoder))) c.bPool.Put([]byte(msg.Value.(sarama.ByteEncoder))) } }() From 233455472b476fb7deff07d501c5834307d2fe29 Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Wed, 27 Jun 2018 14:52:36 -0400 Subject: [PATCH 2/5] Remove references to partition-scheme in docs/configs --- docker/docker-chaos/metrictank.ini | 2 -- docker/docker-dev-custom-cfg-kafka/metrictank.ini | 2 -- docs/config.md | 11 ++++------- metrictank-sample.ini | 2 -- scripts/config/metrictank-docker.ini | 2 -- scripts/config/metrictank-package.ini | 2 -- 6 files changed, 4 insertions(+), 17 deletions(-) diff --git a/docker/docker-chaos/metrictank.ini b/docker/docker-chaos/metrictank.ini index 0ddae81f32..204cf05d4f 100644 --- a/docker/docker-chaos/metrictank.ini +++ b/docker/docker-chaos/metrictank.ini @@ -290,8 +290,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index c5d76bbcb8..8538085de9 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -290,8 +290,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting diff --git a/docs/config.md b/docs/config.md index 41fb38402a..6024a65623 100644 --- a/docs/config.md +++ b/docs/config.md @@ -4,7 +4,7 @@ Metrictank comes with an [example main config file](https://github.com/grafana/m a [storage-schemas.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-schemas.conf) and a [storage-aggregation.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-aggregation.conf) -The files themselves are well documented, but for your convenience, they are replicated below. +The files themselves are well documented, but for your convenience, they are replicated below. Config values for the main ini config file can also be set, or overridden via environment variables. They require the 'MT_' prefix. Any delimiter is represented as an underscore. @@ -24,8 +24,8 @@ MT_KAFKA_MDM_IN_DATA_DIR: /your/data/dir # MT__ # Metrictank.ini -sample config for metrictank -the defaults here match the default behavior. +sample config for metrictank +the defaults here match the default behavior. ## misc ## ``` @@ -348,8 +348,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting @@ -458,7 +456,7 @@ match-cache-size = 1000 # (note in particular that if you remove archives here, we will no longer read from them) # * Retentions must be specified in order of increasing interval and retention # * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. -# +# # A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. # The retentions line can specify multiple retention definitions. You need one or more, space separated. # @@ -527,4 +525,3 @@ aggregationMethod = avg,min,max ``` This file is generated by [config-to-doc](https://github.com/grafana/metrictank/blob/master/scripts/dev/config-to-doc.sh) - diff --git a/metrictank-sample.ini b/metrictank-sample.ini index cc4cfed0ef..a9bb21b065 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -293,8 +293,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index 7546fcbc50..41037343b9 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -290,8 +290,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index b9cc3f6ebd..68f605a8fc 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -290,8 +290,6 @@ brokers = localhost:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting From 7f40416f383406b219b6a3de39012f3bdab53839 Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Wed, 27 Jun 2018 14:53:21 -0400 Subject: [PATCH 3/5] Fix unrelated typo that has been bothering me --- cmd/metrictank/metrictank.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/metrictank/metrictank.go b/cmd/metrictank/metrictank.go index eabb7a0098..8b809cc17e 100644 --- a/cmd/metrictank/metrictank.go +++ b/cmd/metrictank/metrictank.go @@ -305,7 +305,7 @@ func main() { } if cluster.Mode == cluster.ModeMulti && len(inputs) > 1 { - log.Warn("It is not recommended to run a mulitnode cluster with more than 1 input plugin.") + log.Warn("It is not recommended to run a multi-node cluster with more than 1 input plugin.") } /*********************************** From 26003a6ab728b7206539d764aa4a8e3fe469ccf8 Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Fri, 29 Jun 2018 16:00:42 -0400 Subject: [PATCH 4/5] Regen docs --- docs/config.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/config.md b/docs/config.md index 6024a65623..b926130717 100644 --- a/docs/config.md +++ b/docs/config.md @@ -4,7 +4,7 @@ Metrictank comes with an [example main config file](https://github.com/grafana/m a [storage-schemas.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-schemas.conf) and a [storage-aggregation.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-aggregation.conf) -The files themselves are well documented, but for your convenience, they are replicated below. +The files themselves are well documented, but for your convenience, they are replicated below. Config values for the main ini config file can also be set, or overridden via environment variables. They require the 'MT_' prefix. Any delimiter is represented as an underscore. @@ -24,8 +24,8 @@ MT_KAFKA_MDM_IN_DATA_DIR: /your/data/dir # MT__ # Metrictank.ini -sample config for metrictank -the defaults here match the default behavior. +sample config for metrictank +the defaults here match the default behavior. ## misc ## ``` @@ -456,7 +456,7 @@ match-cache-size = 1000 # (note in particular that if you remove archives here, we will no longer read from them) # * Retentions must be specified in order of increasing interval and retention # * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. -# +# # A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. # The retentions line can specify multiple retention definitions. You need one or more, space separated. # @@ -525,3 +525,4 @@ aggregationMethod = avg,min,max ``` This file is generated by [config-to-doc](https://github.com/grafana/metrictank/blob/master/scripts/dev/config-to-doc.sh) + From d855cf7a842c7afb7a6fb3cdcabf7c102b92df2c Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Thu, 19 Jul 2018 15:06:07 -0400 Subject: [PATCH 5/5] Remove partition-scheme from ini --- docker/docker-cluster/metrictank.ini | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 0f79219808..b8a1802cae 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -290,8 +290,6 @@ brokers = kafka:9092 topic = metricpersist # kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions. partitions = * -# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) -partition-scheme = bySeries # offset to start consuming from. Can be one of newest, oldest,last or a time duration # When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`. # Should match your kafka-mdm-in setting