Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit ceb7da0

Browse files
committed
simply partitioner code
- Define a partitionFunc when creating a new partitioner instead of comparing the partitionScheme string on every call. - in fakemetrics/kafkamdm use a Partitioner Interface instead of using partition.Kafka directly.
1 parent 8d8900b commit ceb7da0

File tree

2 files changed

+23
-61
lines changed

2 files changed

+23
-61
lines changed

cluster/partitioner/partitioner.go

+11-24
Original file line numberDiff line numberDiff line change
@@ -11,44 +11,31 @@ type Partitioner interface {
1111
}
1212

1313
type Kafka struct {
14-
PartitionBy string
14+
partitionFunc func(schema.PartitionedMetric, int32) (int32, error)
1515
}
1616

1717
func NewKafka(partitionBy string) (*Kafka, error) {
18+
var method schema.PartitionByMethod
1819
switch partitionBy {
1920
case "byOrg":
21+
method = schema.PartitionByOrg
2022
case "bySeries":
23+
method = schema.PartitionBySeries
2124
case "bySeriesWithTags":
25+
method = schema.PartitionBySeriesWithTags
2226
default:
2327
return nil, fmt.Errorf("partitionBy must be one of 'byOrg|bySeries|bySeriesWithTags'. got %s", partitionBy)
2428
}
2529
return &Kafka{
26-
PartitionBy: partitionBy,
30+
partitionFunc: func(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
31+
return m.PartitionID(method, numPartitions)
32+
},
2733
}, nil
2834
}
2935

3036
func (k *Kafka) Partition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
31-
partition, err := k.GetPartition(m, numPartitions)
32-
if err != nil {
33-
return 0, err
37+
if k.partitionFunc == nil {
38+
return -1, fmt.Errorf("unknown partitionBy setting.")
3439
}
35-
return partition, nil
36-
}
37-
38-
func (k *Kafka) GetPartition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
39-
switch k.PartitionBy {
40-
case "byOrg":
41-
// partition by organisation: metrics for the same org should go to the same
42-
// partition/MetricTank (optimize for locality~performance)
43-
return m.PartitionID(schema.PartitionByOrg, numPartitions)
44-
case "bySeries":
45-
// partition by series: metrics are distributed across all metrictank instances
46-
// to allow horizontal scalability
47-
return m.PartitionID(schema.PartitionBySeries, numPartitions)
48-
case "bySeriesWithTags":
49-
// partition by series with tags: metrics are distributed across all metrictank instances
50-
// to allow horizontal scalability
51-
return m.PartitionID(schema.PartitionBySeriesWithTags, numPartitions)
52-
}
53-
return -1, fmt.Errorf("unknown partitionBy setting.")
40+
return k.partitionFunc(m, numPartitions)
5441
}

stacktest/fakemetrics/out/kafkamdm/kafkamdm.go

+12-37
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ type KafkaMdm struct {
2323
config *sarama.Config
2424
client sarama.SyncProducer
2525
hash hash.Hash32
26-
part *p.Kafka
27-
lmPart LastNumPartitioner
28-
partScheme string
26+
part p.Partitioner
2927
numPartitions int32
3028
}
3129

@@ -46,11 +44,6 @@ func (p *LastNumPartitioner) Partition(m schema.PartitionedMetric, numPartitions
4644
return int32(part), nil
4745
}
4846

49-
// key is by metric name, but won't be used for partition setting
50-
func (p *LastNumPartitioner) GetPartition(m schema.PartitionedMetric, numPartitions int32) (int32, error) {
51-
return m.PartitionID(schema.PartitionBySeries, numPartitions)
52-
}
53-
5447
func New(topic string, brokers []string, codec string, stats met.Backend, partitionScheme string, numPartitions int32) (*KafkaMdm, error) {
5548
// We are looking for strong consistency semantics.
5649
// Because we don't change the flush settings, sarama will try to produce messages
@@ -81,8 +74,7 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
8174
if err != nil {
8275
return nil, err
8376
}
84-
var part *p.Kafka
85-
var lmPart LastNumPartitioner
77+
var part p.Partitioner
8678
switch partitionScheme {
8779
case "byOrg":
8880
part, err = p.NewKafka("byOrg")
@@ -91,11 +83,9 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
9183
case "bySeriesWithTags":
9284
part, err = p.NewKafka("bySeriesWithTags")
9385
case "lastNum":
94-
lmPart = LastNumPartitioner{}
95-
// sets partition based on message partition field
96-
config.Producer.Partitioner = sarama.NewManualPartitioner
86+
part = &LastNumPartitioner{}
9787
default:
98-
err = fmt.Errorf("partitionScheme must be one of 'byOrg|bySeries|lastNum'. got %s", partitionScheme)
88+
err = fmt.Errorf("partitionScheme must be one of 'byOrg|bySeries|bySeriesWithTags|lastNum'. got %s", partitionScheme)
9989
}
10090
if err != nil {
10191
return nil, err
@@ -109,8 +99,6 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
10999
client: client,
110100
hash: fnv.New32a(),
111101
part: part,
112-
lmPart: lmPart,
113-
partScheme: partitionScheme,
114102
numPartitions: numPartitions,
115103
}, nil
116104
}
@@ -139,28 +127,15 @@ func (k *KafkaMdm) Flush(metrics []*schema.MetricData) error {
139127

140128
k.MessageBytes.Value(int64(len(data)))
141129

142-
if k.partScheme == "lastNum" {
143-
partition, err := k.lmPart.Partition(metric, 0)
144-
if err != nil {
145-
return fmt.Errorf("Failed to get partition for metric. %s", err)
146-
}
147-
148-
payload[i] = &sarama.ProducerMessage{
149-
Partition: partition,
150-
Topic: k.topic,
151-
Value: sarama.ByteEncoder(data),
152-
}
153-
} else {
154-
partition, err := k.part.GetPartition(metric, k.numPartitions)
155-
if err != nil {
156-
return fmt.Errorf("Failed to get partition for metric. %s", err)
157-
}
130+
partition, err := k.part.Partition(metric, k.numPartitions)
131+
if err != nil {
132+
return fmt.Errorf("Failed to get partition for metric. %s", err)
133+
}
158134

159-
payload[i] = &sarama.ProducerMessage{
160-
Partition: partition,
161-
Topic: k.topic,
162-
Value: sarama.ByteEncoder(data),
163-
}
135+
payload[i] = &sarama.ProducerMessage{
136+
Partition: partition,
137+
Topic: k.topic,
138+
Value: sarama.ByteEncoder(data),
164139
}
165140

166141
}

0 commit comments

Comments
 (0)