Skip to content

Commit d44495b

Browse files
authored
Merge pull request #26 from raintank/partitionBy
add partitioning of MetricData/MetricDefinitions
2 parents 01adbd1 + 36f89ce commit d44495b

File tree

3 files changed

+391
-63
lines changed

3 files changed

+391
-63
lines changed

metric.go

+46-63
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package schema
33
import (
44
"bytes"
55
"crypto/md5"
6-
"encoding/binary"
76
"errors"
87
"fmt"
8+
"io"
99
"sort"
1010
"strings"
1111
)
@@ -15,18 +15,13 @@ var ErrInvalidOrgIdzero = errors.New("org-id cannot be 0")
1515
var ErrInvalidEmptyName = errors.New("name cannot be empty")
1616
var ErrInvalidMtype = errors.New("invalid mtype")
1717
var ErrInvalidTagFormat = errors.New("invalid tag format")
18+
var ErrUnknownPartitionMethod = errors.New("unknown partition method")
1819

1920
type PartitionedMetric interface {
2021
Validate() error
2122
SetId()
22-
// return a []byte key comprised of the metric's OrgId
23-
// accepts an input []byte to allow callers to re-use
24-
// buffers to reduce memory allocations
25-
KeyByOrgId([]byte) []byte
26-
// return a []byte key comprised of the metric's Name
27-
// accepts an input []byte to allow callers to re-use
28-
// buffers to reduce memory allocations
29-
KeyBySeries([]byte) []byte
23+
// PartitionID returns the partition id that should be used for this metric.
24+
PartitionID(method PartitionByMethod, partitions int32) (int32, error)
3025
}
3126

3227
//go:generate msgp
@@ -63,25 +58,6 @@ func (m *MetricData) Validate() error {
6358
return nil
6459
}
6560

66-
func (m *MetricData) KeyByOrgId(b []byte) []byte {
67-
if cap(b)-len(b) < 4 {
68-
// not enough unused space in the slice so we need to grow it.
69-
newBuf := make([]byte, len(b), len(b)+4)
70-
copy(newBuf, b)
71-
b = newBuf
72-
}
73-
// PutUint32 writes directly to the slice rather then appending.
74-
// so we need to set the length to 4 more bytes then it currently is.
75-
b = b[:len(b)+4]
76-
binary.LittleEndian.PutUint32(b[len(b)-4:], uint32(m.OrgId))
77-
return b
78-
}
79-
80-
func (m *MetricData) KeyBySeries(b []byte) []byte {
81-
b = append(b, []byte(m.Name)...)
82-
return b
83-
}
84-
8561
// returns a id (hash key) in the format OrgId.md5Sum
8662
// the md5sum is a hash of the the concatination of the
8763
// metric + each tag key:value pair (in metrics2.0 sense, so also fields), sorted alphabetically.
@@ -124,7 +100,7 @@ type MetricDefinition struct {
124100

125101
// this is a special attribute that does not need to be set, it is only used
126102
// to keep the state of NameWithTags()
127-
nameWithTags string `json:"-"`
103+
nameWithTags string
128104
}
129105

130106
// NameWithTags deduplicates the name and tags strings by storing their content
@@ -137,27 +113,26 @@ func (m *MetricDefinition) NameWithTags() string {
137113
return m.nameWithTags
138114
}
139115

140-
sort.Strings(m.Tags)
116+
nameWithTagsBuffer := &bytes.Buffer{}
117+
_ = writeSortedTagString(nameWithTagsBuffer, m.Name, m.Tags)
118+
m.nameWithTags = nameWithTagsBuffer.String()
141119

142-
nameWithTagsBuffer := bytes.NewBufferString(m.Name)
143-
tagPositions := make([]int, 0, len(m.Tags)*2)
120+
var i int
121+
cursor := len(m.Name)
122+
m.Name = m.nameWithTags[:cursor]
144123
for _, t := range m.Tags {
145-
if len(t) >= 5 && t[:5] == "name=" {
124+
if len(t) > 5 && t[:5] == "name=" {
146125
continue
147126
}
148-
149-
nameWithTagsBuffer.WriteString(";")
150-
tagPositions = append(tagPositions, nameWithTagsBuffer.Len())
151-
nameWithTagsBuffer.WriteString(t)
152-
tagPositions = append(tagPositions, nameWithTagsBuffer.Len())
127+
m.Tags[i] = m.nameWithTags[cursor+1 : cursor+1+len(t)]
128+
cursor += len(t) + 1
129+
i++
153130
}
154131

155-
m.nameWithTags = nameWithTagsBuffer.String()
156-
m.Tags = make([]string, len(tagPositions)/2)
157-
for i := 0; i < len(m.Tags); i++ {
158-
m.Tags[i] = m.nameWithTags[tagPositions[i*2]:tagPositions[i*2+1]]
132+
// if a "name" tag existed, then we have to shorten the slice
133+
if i < len(m.Tags) {
134+
m.Tags = m.Tags[:i]
159135
}
160-
m.Name = m.nameWithTags[:len(m.Name)]
161136

162137
return m.nameWithTags
163138
}
@@ -178,7 +153,7 @@ func (m *MetricDefinition) SetId() {
178153
fmt.Fprintf(buffer, "%d", m.Interval)
179154

180155
for _, t := range m.Tags {
181-
if len(t) >= 5 && t[:5] == "name=" {
156+
if len(t) > 5 && t[:5] == "name=" {
182157
continue
183158
}
184159

@@ -211,25 +186,6 @@ func (m *MetricDefinition) Validate() error {
211186
return nil
212187
}
213188

214-
func (m *MetricDefinition) KeyByOrgId(b []byte) []byte {
215-
if cap(b)-len(b) < 4 {
216-
// not enough unused space in the slice so we need to grow it.
217-
newBuf := make([]byte, len(b), len(b)+4)
218-
copy(newBuf, b)
219-
b = newBuf
220-
}
221-
// PutUint32 writes directly to the slice rather then appending.
222-
// so we need to set the length to 4 more bytes then it currently is.
223-
b = b[:len(b)+4]
224-
binary.LittleEndian.PutUint32(b[len(b)-4:], uint32(m.OrgId))
225-
return b
226-
}
227-
228-
func (m *MetricDefinition) KeyBySeries(b []byte) []byte {
229-
b = append(b, []byte(m.Name)...)
230-
return b
231-
}
232-
233189
// MetricDefinitionFromMetricData yields a MetricDefinition that has no references
234190
// to the original MetricData
235191
func MetricDefinitionFromMetricData(d *MetricData) *MetricDefinition {
@@ -327,3 +283,30 @@ func ValidateTagValue(value string) bool {
327283

328284
return !strings.ContainsRune(value, ';')
329285
}
286+
287+
func writeSortedTagString(w io.Writer, name string, tags []string) error {
288+
sort.Strings(tags)
289+
290+
_, err := io.WriteString(w, name)
291+
if err != nil {
292+
return err
293+
}
294+
295+
for _, t := range tags {
296+
if len(t) > 5 && t[:5] == "name=" {
297+
continue
298+
}
299+
300+
_, err = io.WriteString(w, ";")
301+
if err != nil {
302+
return err
303+
}
304+
305+
_, err = io.WriteString(w, t)
306+
if err != nil {
307+
return err
308+
}
309+
}
310+
311+
return nil
312+
}

partition.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package schema
2+
3+
import (
4+
"encoding/binary"
5+
"hash/fnv"
6+
7+
"github.com/cespare/xxhash"
8+
jump "github.com/dgryski/go-jump"
9+
)
10+
11+
type PartitionByMethod uint8
12+
13+
const (
14+
// partition by organization id only
15+
PartitionByOrg PartitionByMethod = iota
16+
17+
// partition by the metric name only
18+
PartitionBySeries
19+
20+
// partition by metric name and tags, with the best distribution
21+
// recommended for new deployments.
22+
PartitionBySeriesWithTags
23+
24+
// partition by metric name and tags, with a sub-optimal distribution when using tags.
25+
// compatible with PartitionBySeries if a metric has no tags,
26+
// making it possible to adopt tags for existing PartitionBySeries deployments without a migration.
27+
PartitionBySeriesWithTagsFnv
28+
)
29+
30+
func (m *MetricData) PartitionID(method PartitionByMethod, partitions int32) (int32, error) {
31+
var partition int32
32+
33+
switch method {
34+
case PartitionByOrg:
35+
h := fnv.New32a()
36+
err := binary.Write(h, binary.LittleEndian, uint32(m.OrgId))
37+
if err != nil {
38+
return 0, err
39+
}
40+
partition = int32(h.Sum32()) % partitions
41+
if partition < 0 {
42+
partition = -partition
43+
}
44+
case PartitionBySeries:
45+
h := fnv.New32a()
46+
h.Write([]byte(m.Name))
47+
partition = int32(h.Sum32()) % partitions
48+
if partition < 0 {
49+
partition = -partition
50+
}
51+
case PartitionBySeriesWithTags:
52+
h := xxhash.New()
53+
if err := writeSortedTagString(h, m.Name, m.Tags); err != nil {
54+
return 0, err
55+
}
56+
partition = jump.Hash(h.Sum64(), int(partitions))
57+
case PartitionBySeriesWithTagsFnv:
58+
h := fnv.New32a()
59+
if err := writeSortedTagString(h, m.Name, m.Tags); err != nil {
60+
return 0, err
61+
}
62+
partition = int32(h.Sum32()) % partitions
63+
if partition < 0 {
64+
partition = -partition
65+
}
66+
default:
67+
return 0, ErrUnknownPartitionMethod
68+
}
69+
70+
return partition, nil
71+
}
72+
73+
func (m *MetricDefinition) PartitionID(method PartitionByMethod, partitions int32) (int32, error) {
74+
var partition int32
75+
76+
switch method {
77+
case PartitionByOrg:
78+
h := fnv.New32a()
79+
err := binary.Write(h, binary.LittleEndian, uint32(m.OrgId))
80+
if err != nil {
81+
return 0, err
82+
}
83+
partition = int32(h.Sum32()) % partitions
84+
if partition < 0 {
85+
partition = -partition
86+
}
87+
case PartitionBySeries:
88+
h := fnv.New32a()
89+
h.Write([]byte(m.Name))
90+
partition = int32(h.Sum32()) % partitions
91+
if partition < 0 {
92+
partition = -partition
93+
}
94+
case PartitionBySeriesWithTags:
95+
h := xxhash.New()
96+
h.WriteString(m.NameWithTags())
97+
partition = jump.Hash(h.Sum64(), int(partitions))
98+
case PartitionBySeriesWithTagsFnv:
99+
h := fnv.New32a()
100+
if len(m.nameWithTags) > 0 {
101+
h.Write([]byte(m.nameWithTags))
102+
} else {
103+
if err := writeSortedTagString(h, m.Name, m.Tags); err != nil {
104+
return 0, err
105+
}
106+
}
107+
partition = int32(h.Sum32()) % partitions
108+
if partition < 0 {
109+
partition = -partition
110+
}
111+
default:
112+
return 0, ErrUnknownPartitionMethod
113+
}
114+
115+
return partition, nil
116+
}

0 commit comments

Comments
 (0)