-
Notifications
You must be signed in to change notification settings - Fork 32
/
blockgen.go
163 lines (136 loc) · 3.75 KB
/
blockgen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package blockgen
import (
"context"
"fmt"
"math/rand"
"path"
"time"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanosbench/pkg/seriesgen"
)
// Writer is interface to write time series into Prometheus blocks.
type Writer interface {
storage.Appendable
// Flush writes current block to disk.
// The block will contain values accumulated by `Write`.
Flush() (ulid.ULID, error)
}
// TODO(bwplotka): Add option to create downsampled blocks.
type BlockSpec struct {
metadata.Meta
Series []SeriesSpec
}
type GenType string
const (
Random GenType = "RANDOM"
Counter GenType = "COUNTER"
Gauge GenType = "GAUGE"
)
func (g GenType) Create(random *rand.Rand, mint, maxt int64, opts seriesgen.Characteristics) (seriesgen.SeriesIterator, error) {
switch g {
case Random:
return seriesgen.NewValGen(random, mint, maxt, opts), nil
case Counter:
return seriesgen.NewCounterGen(random, mint, maxt, opts), nil
case Gauge:
return seriesgen.NewGaugeGen(random, mint, maxt, opts), nil
default:
return nil, errors.Errorf("unknown type: %s", string(g))
}
}
type SeriesSpec struct {
Labels labels.Labels `yaml:"labels"`
// Targets multiples labels by given targets.
Targets int `yaml:"targets"`
Type GenType `yaml:"type"`
MinTime, MaxTime int64
seriesgen.Characteristics `yaml:",inline"`
}
func durToMilis(t time.Duration) int64 {
return int64(t.Seconds() * 1000)
}
// Generate creates a block from given spec using given go routines in a given directory.
func Generate(ctx context.Context, logger log.Logger, goroutines int, dir string, block BlockSpec) (ulid.ULID, error) {
w, err := NewTSDBBlockWriter(logger, dir)
if err != nil {
return ulid.ULID{}, err
}
extLset := block.Thanos.Labels
if extLset == nil {
extLset = map[string]string{}
}
set := &blockSeriesSet{config: block, extLset: labels.FromMap(extLset)}
if err := seriesgen.Append(ctx, goroutines, w, set); err != nil {
return ulid.ULID{}, errors.Wrap(err, "append")
}
id, err := w.Flush()
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "flush")
}
bdir := path.Join(dir, id.String())
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "meta read")
}
meta.Thanos = block.Thanos
if err := meta.WriteToDir(logger, bdir); err != nil {
return ulid.ULID{}, errors.Wrap(err, "meta write")
}
return id, nil
}
type blockSeriesSet struct {
config BlockSpec
extLset labels.Labels
i int
target int
err error
curr seriesgen.Series
}
func (s *blockSeriesSet) Next() bool {
if s.target > 0 {
s.target--
}
if s.target <= 0 && s.i >= len(s.config.Series) {
return false
}
if s.target <= 0 {
s.i++
s.target = s.config.Series[s.i-1].Targets
}
series := s.config.Series[s.i-1]
lset := labels.Labels(append([]labels.Label{{Name: "__blockgen_target__", Value: fmt.Sprintf("%v", s.target)}}, series.Labels...))
b := make([]byte, 0, 1024)
for _, v := range lset {
b = append(b, v.Name...)
b = append(b, '\xff')
b = append(b, v.Value...)
b = append(b, '\xff')
}
for _, v := range s.extLset {
b = append(b, v.Name...)
b = append(b, '\xff')
b = append(b, v.Value...)
b = append(b, '\xff')
}
// Stable random per series name.
iter, err := series.Type.Create(
rand.New(rand.NewSource(int64(xxhash.Sum64(b)))),
series.MinTime,
series.MaxTime,
series.Characteristics,
)
if err != nil {
s.err = err
return false
}
s.curr = seriesgen.NewSeriesGen(lset, iter)
return true
}
func (s *blockSeriesSet) At() seriesgen.Series { return s.curr }
func (s *blockSeriesSet) Err() error { return s.err }