Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a4bc9e1

Browse files
authored
Merge pull request #1335 from grafana/limit_writer_org
Make the importer utilities rely on TSDB-GW for authentication and org-association
2 parents b42707f + d966645 commit a4bc9e1

22 files changed

+1430
-944
lines changed

cmd/mt-whisper-importer-reader/main.go

+9-163
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,22 @@ import (
1212
"os"
1313
"path/filepath"
1414
"regexp"
15-
"sort"
1615
"strings"
1716
"sync"
1817
"sync/atomic"
1918
"time"
2019

2120
"github.com/grafana/metrictank/conf"
2221
"github.com/grafana/metrictank/logger"
23-
"github.com/grafana/metrictank/mdata"
24-
"github.com/grafana/metrictank/mdata/chunk"
22+
"github.com/grafana/metrictank/mdata/importer"
2523
"github.com/kisielk/whisper-go/whisper"
26-
"github.com/raintank/schema"
2724
log "github.com/sirupsen/logrus"
2825
)
2926

3027
var (
3128
httpEndpoint = flag.String(
3229
"http-endpoint",
33-
"http://127.0.0.1:8080/chunks",
30+
"http://127.0.0.1:8080/metrics/import",
3431
"The http endpoint to send the data to",
3532
)
3633
namePrefix = flag.String(
@@ -48,11 +45,6 @@ var (
4845
false,
4946
"Defines if chunks that have not completed their chunk span should be written",
5047
)
51-
orgId = flag.Int(
52-
"orgid",
53-
1,
54-
"Organization ID the data belongs to ",
55-
)
5648
insecureSSL = flag.Bool(
5749
"insecure-ssl",
5850
false,
@@ -78,15 +70,15 @@ var (
7870
"",
7971
"A regex pattern to be applied to all metric names, only matching ones will be imported",
8072
)
81-
importUpTo = flag.Uint(
82-
"import-up-to",
73+
importUntil = flag.Uint(
74+
"import-until",
8375
math.MaxUint32,
84-
"Only import up to the specified timestamp",
76+
"Only import up to, but not including, the specified timestamp",
8577
)
86-
importAfter = flag.Uint(
87-
"import-after",
78+
importFrom = flag.Uint(
79+
"import-from",
8880
0,
89-
"Only import after the specified timestamp",
81+
"Only import starting from the specified timestamp",
9082
)
9183
positionFile = flag.String(
9284
"position-file",
@@ -167,7 +159,7 @@ func processFromChan(pos *posTracker, files chan string, wg *sync.WaitGroup) {
167159

168160
name := getMetricName(file)
169161
log.Debugf("Processing file %s (%s)", file, name)
170-
data, err := getMetric(w, file, name)
162+
data, err := importer.NewArchiveRequest(w, schemas, file, name, uint32(*importFrom), uint32(*importUntil), *writeUnfinishedChunks)
171163
if err != nil {
172164
log.Errorf("Failed to get metric: %q", err.Error())
173165
continue
@@ -246,152 +238,6 @@ func getMetricName(file string) string {
246238
return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1)
247239
}
248240

249-
// pointSorter sorts points by timestamp
250-
type pointSorter []whisper.Point
251-
252-
func (a pointSorter) Len() int { return len(a) }
253-
func (a pointSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
254-
func (a pointSorter) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }
255-
256-
// the whisper archives are organized like a ringbuffer. since we need to
257-
// insert the points into the chunks in order we first need to sort them
258-
func sortPoints(points pointSorter) pointSorter {
259-
sort.Sort(points)
260-
return points
261-
}
262-
263-
func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) {
264-
switch whisperMethod {
265-
case whisper.AggregationAverage:
266-
return schema.Avg, nil
267-
case whisper.AggregationSum:
268-
return schema.Sum, nil
269-
case whisper.AggregationLast:
270-
return schema.Lst, nil
271-
case whisper.AggregationMax:
272-
return schema.Max, nil
273-
case whisper.AggregationMin:
274-
return schema.Min, nil
275-
default:
276-
return 0, fmt.Errorf("Unknown whisper method: %d", whisperMethod)
277-
}
278-
}
279-
280-
func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, error) {
281-
if len(w.Header.Archives) == 0 {
282-
return nil, fmt.Errorf("Whisper file contains no archives: %q", file)
283-
}
284-
285-
method, err := convertWhisperMethod(w.Header.Metadata.AggregationMethod)
286-
if err != nil {
287-
return nil, err
288-
}
289-
290-
points := make(map[int][]whisper.Point)
291-
for i := range w.Header.Archives {
292-
p, err := w.DumpArchive(i)
293-
if err != nil {
294-
return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
295-
}
296-
points[i] = p
297-
}
298-
299-
res := &mdata.ArchiveRequest{
300-
MetricData: schema.MetricData{
301-
Name: name,
302-
Value: 0,
303-
Interval: int(w.Header.Archives[0].SecondsPerPoint),
304-
Unit: "unknown",
305-
Time: 0,
306-
Mtype: "gauge",
307-
Tags: []string{},
308-
OrgId: *orgId,
309-
},
310-
}
311-
res.MetricData.SetId()
312-
mkey, err := schema.MKeyFromString(res.MetricData.Id)
313-
if err != nil {
314-
panic(err)
315-
}
316-
317-
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
318-
conversion := newConversion(w.Header.Archives, points, method)
319-
for retIdx, retention := range selectedSchema.Retentions {
320-
convertedPoints := conversion.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
321-
for m, p := range convertedPoints {
322-
if len(p) == 0 {
323-
continue
324-
}
325-
326-
var amkey schema.AMKey
327-
if retIdx == 0 {
328-
amkey = schema.AMKey{MKey: mkey}
329-
} else {
330-
amkey = schema.GetAMKey(mkey, m, retention.ChunkSpan)
331-
}
332-
333-
encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
334-
for _, chunk := range encodedChunks {
335-
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(
336-
nil,
337-
amkey,
338-
uint32(retention.MaxRetention()),
339-
chunk.Series.T0,
340-
chunk.Encode(retention.ChunkSpan),
341-
time.Now(),
342-
))
343-
}
344-
345-
if res.MetricData.Time < int64(p[len(p)-1].Timestamp) {
346-
res.MetricData.Time = int64(p[len(p)-1].Timestamp)
347-
}
348-
}
349-
}
350-
351-
return res, nil
352-
}
353-
354-
func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32) []*chunk.Chunk {
355-
var point whisper.Point
356-
var t0, prevT0 uint32
357-
var c *chunk.Chunk
358-
var encodedChunks []*chunk.Chunk
359-
360-
for _, point = range points {
361-
// this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well:
362-
// https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378
363-
if point.Timestamp == 0 {
364-
continue
365-
}
366-
367-
t0 = point.Timestamp - (point.Timestamp % chunkSpan)
368-
if prevT0 == 0 {
369-
c = chunk.New(t0)
370-
prevT0 = t0
371-
} else if prevT0 != t0 {
372-
c.Finish()
373-
encodedChunks = append(encodedChunks, c)
374-
375-
c = chunk.New(t0)
376-
prevT0 = t0
377-
}
378-
379-
err := c.Push(point.Timestamp, point.Value)
380-
if err != nil {
381-
panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err))
382-
}
383-
}
384-
385-
// if the last written point was also the last one of the current chunk,
386-
// or if writeUnfinishedChunks is on, we close the chunk and push it
387-
if point.Timestamp == t0+chunkSpan-intervalIn || *writeUnfinishedChunks {
388-
c.Finish()
389-
encodedChunks = append(encodedChunks, c)
390-
}
391-
392-
return encodedChunks
393-
}
394-
395241
// scan a directory and feed the list of whisper files relative to base into the given channel
396242
func getFileListIntoChan(pos *posTracker, fileChan chan string) {
397243
filepath.Walk(

cmd/mt-whisper-importer-writer/main.go

+31-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package main
22

33
import (
4+
"errors"
45
"flag"
56
"fmt"
67
"net/http"
78
"os"
9+
"strconv"
810
"strings"
911
"time"
1012

@@ -20,17 +22,18 @@ import (
2022
"github.com/grafana/metrictank/idx/cassandra"
2123
"github.com/grafana/metrictank/logger"
2224
"github.com/grafana/metrictank/mdata"
25+
"github.com/grafana/metrictank/mdata/importer"
2326
bigTableStore "github.com/grafana/metrictank/store/bigtable"
2427
cassandraStore "github.com/grafana/metrictank/store/cassandra"
2528
)
2629

2730
var (
2831
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
2932
exitOnError = flag.Bool("exit-on-error", false, "Exit with a message when there's an error")
30-
httpEndpoint = flag.String("http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on")
33+
httpEndpoint = flag.String("http-endpoint", "0.0.0.0:8080", "The http endpoint to listen on")
3134
ttlsStr = flag.String("ttls", "35d", "list of ttl strings used by MT separated by ','")
3235
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
33-
uriPath = flag.String("uri-path", "/chunks", "the URI on which we expect chunks to get posted")
36+
uriPath = flag.String("uri-path", "/metrics/import", "the URI on which we expect chunks to get posted")
3437
numPartitions = flag.Int("num-partitions", 1, "Number of Partitions")
3538
logLevel = flag.String("log-level", "info", "log level. panic|fatal|error|warning|info|debug")
3639

@@ -173,8 +176,15 @@ func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
173176
}
174177

175178
func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
176-
data := mdata.ArchiveRequest{}
177-
err := data.UnmarshalCompressed(req.Body)
179+
orgId, err := getOrgId(req)
180+
if err != nil {
181+
w.WriteHeader(http.StatusForbidden)
182+
w.Write([]byte(err.Error()))
183+
return
184+
}
185+
186+
data := importer.ArchiveRequest{}
187+
err = data.UnmarshalCompressed(req.Body)
178188
if err != nil {
179189
throwError(w, fmt.Sprintf("Error decoding cwr stream: %q", err))
180190
return
@@ -189,13 +199,15 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
189199
"Received %d cwrs for metric %s. The first has Key: %s, T0: %d, TTL: %d. The last has Key: %s, T0: %d, TTL: %d",
190200
len(data.ChunkWriteRequests),
191201
data.MetricData.Name,
192-
data.ChunkWriteRequests[0].Key.String(),
202+
data.ChunkWriteRequests[0].Archive.String(),
193203
data.ChunkWriteRequests[0].T0,
194204
data.ChunkWriteRequests[0].TTL,
195-
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Key.String(),
205+
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Archive.String(),
196206
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].T0,
197207
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].TTL)
198208

209+
data.MetricData.OrgId = orgId
210+
data.MetricData.SetId()
199211
partition, err := s.partitioner.Partition(&data.MetricData, int32(*numPartitions))
200212
if err != nil {
201213
throwError(w, fmt.Sprintf("Error partitioning: %q", err))
@@ -210,7 +222,18 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
210222

211223
s.index.AddOrUpdate(mkey, &data.MetricData, partition)
212224
for _, cwr := range data.ChunkWriteRequests {
213-
cwr := cwr // important because we pass by reference and this var will get overwritten in the next loop
214-
s.store.Add(&cwr)
225+
cwrWithOrg := cwr.GetChunkWriteRequest(nil, mkey)
226+
s.store.Add(&cwrWithOrg)
227+
}
228+
}
229+
230+
func getOrgId(req *http.Request) (int, error) {
231+
if orgIdStr := req.Header.Get("X-Org-Id"); len(orgIdStr) > 0 {
232+
if orgId, err := strconv.Atoi(orgIdStr); err == nil {
233+
return orgId, nil
234+
} else {
235+
return 0, fmt.Errorf("Invalid value in X-Org-Id header (%s): %s", orgIdStr, err)
236+
}
215237
}
238+
return 0, errors.New("Missing X-Org-Id header")
216239
}

docs/tools.md

+7-9
Original file line numberDiff line numberDiff line change
@@ -673,19 +673,17 @@ Usage of ./mt-whisper-importer-reader:
673673
-http-auth string
674674
The credentials used to authenticate in the format "user:password"
675675
-http-endpoint string
676-
The http endpoint to send the data to (default "http://127.0.0.1:8080/chunks")
677-
-import-after uint
678-
Only import after the specified timestamp
679-
-import-up-to uint
680-
Only import up to the specified timestamp (default 4294967295)
676+
The http endpoint to send the data to (default "http://127.0.0.1:8080/metrics/import")
677+
-import-from uint
678+
Only import starting from the specified timestamp
679+
-import-until uint
680+
Only import up to, but not including, the specified timestamp (default 4294967295)
681681
-insecure-ssl
682682
Disables ssl certificate verification
683683
-name-filter string
684684
A regex pattern to be applied to all metric names, only matching ones will be imported
685685
-name-prefix string
686686
Prefix to prepend before every metric name, should include the '.' if necessary
687-
-orgid int
688-
Organization ID the data belongs to (default 1)
689687
-position-file string
690688
file to store position and load position from
691689
-threads int
@@ -708,7 +706,7 @@ Usage of ./mt-whisper-importer-writer:
708706
-exit-on-error
709707
Exit with a message when there's an error
710708
-http-endpoint string
711-
The http endpoint to listen on (default "127.0.0.1:8080")
709+
The http endpoint to listen on (default "0.0.0.0:8080")
712710
-log-level string
713711
log level. panic|fatal|error|warning|info|debug (default "info")
714712
-num-partitions int
@@ -718,6 +716,6 @@ Usage of ./mt-whisper-importer-writer:
718716
-ttls string
719717
list of ttl strings used by MT separated by ',' (default "35d")
720718
-uri-path string
721-
the URI on which we expect chunks to get posted (default "/chunks")
719+
the URI on which we expect chunks to get posted (default "/metrics/import")
722720
```
723721

0 commit comments

Comments
 (0)