Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 8b99e7f

Browse files
committed
move ArchiveRequest instantiation into importer package
the instantiation of the ArchiveRequest struct should be located together with the struct definition, this moves it there
1 parent ce12737 commit 8b99e7f

11 files changed

+479
-536
lines changed

cmd/mt-whisper-importer-reader/main.go

+1-68
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/grafana/metrictank/logger"
2222
"github.com/grafana/metrictank/mdata/importer"
2323
"github.com/kisielk/whisper-go/whisper"
24-
"github.com/raintank/schema"
2524
log "github.com/sirupsen/logrus"
2625
)
2726

@@ -160,7 +159,7 @@ func processFromChan(pos *posTracker, files chan string, wg *sync.WaitGroup) {
160159

161160
name := getMetricName(file)
162161
log.Debugf("Processing file %s (%s)", file, name)
163-
data, err := getMetric(w, file, name)
162+
data, err := importer.NewArchiveRequest(w, schemas, file, name, uint32(*importFrom), uint32(*importUntil), *writeUnfinishedChunks)
164163
if err != nil {
165164
log.Errorf("Failed to get metric: %q", err.Error())
166165
continue
@@ -239,72 +238,6 @@ func getMetricName(file string) string {
239238
return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1)
240239
}
241240

242-
func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest, error) {
243-
if len(w.Header.Archives) == 0 {
244-
return nil, fmt.Errorf("Whisper file contains no archives: %q", file)
245-
}
246-
247-
method, err := importer.ConvertWhisperMethod(w.Header.Metadata.AggregationMethod)
248-
if err != nil {
249-
return nil, err
250-
}
251-
252-
points := make(map[int][]whisper.Point)
253-
for i := range w.Header.Archives {
254-
p, err := w.DumpArchive(i)
255-
if err != nil {
256-
return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
257-
}
258-
points[i] = p
259-
}
260-
261-
res := &importer.ArchiveRequest{
262-
MetricData: schema.MetricData{
263-
Name: name,
264-
Value: 0,
265-
Interval: int(w.Header.Archives[0].SecondsPerPoint),
266-
Unit: "unknown",
267-
Time: 0,
268-
Mtype: "gauge",
269-
Tags: []string{},
270-
},
271-
}
272-
res.MetricData.SetId()
273-
274-
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
275-
converter := importer.NewConverter(w.Header.Archives, points, method, uint32(*importFrom), uint32(*importUntil))
276-
for retIdx, retention := range selectedSchema.Retentions {
277-
convertedPoints := converter.GetPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
278-
for m, p := range convertedPoints {
279-
if len(p) == 0 {
280-
continue
281-
}
282-
283-
var archive schema.Archive
284-
if retIdx > 0 {
285-
archive = schema.NewArchive(m, retention.ChunkSpan)
286-
}
287-
288-
encodedChunks := importer.EncodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, *writeUnfinishedChunks)
289-
for _, chunk := range encodedChunks {
290-
res.ChunkWriteRequests = append(res.ChunkWriteRequests, importer.NewChunkWriteRequest(
291-
archive,
292-
uint32(retention.MaxRetention()),
293-
chunk.Series.T0,
294-
chunk.Encode(retention.ChunkSpan),
295-
time.Now(),
296-
))
297-
}
298-
299-
if res.MetricData.Time < int64(p[len(p)-1].Timestamp) {
300-
res.MetricData.Time = int64(p[len(p)-1].Timestamp)
301-
}
302-
}
303-
}
304-
305-
return res, nil
306-
}
307-
308241
// scan a directory and feed the list of whisper files relative to base into the given channel
309242
func getFileListIntoChan(pos *posTracker, fileChan chan string) {
310243
filepath.Walk(

mdata/importer/archive_request.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package importer
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"compress/gzip"
7+
"fmt"
8+
"github.com/tinylib/msgp/msgp"
9+
"io"
10+
"time"
11+
12+
"github.com/grafana/metrictank/conf"
13+
"github.com/kisielk/whisper-go/whisper"
14+
"github.com/raintank/schema"
15+
)
16+
17+
//go:generate msgp
18+
19+
// ArchiveRequest is a complete representation of a Metric together with some
20+
// chunk write requests containing data that shall be written into this metric
21+
type ArchiveRequest struct {
22+
MetricData schema.MetricData
23+
ChunkWriteRequests []ChunkWriteRequest
24+
}
25+
26+
func NewArchiveRequest(w *whisper.Whisper, schemas conf.Schemas, file, name string, from, until uint32, writeUnfinishedChunks bool) (*ArchiveRequest, error) {
27+
if len(w.Header.Archives) == 0 {
28+
return nil, fmt.Errorf("Whisper file contains no archives: %q", file)
29+
}
30+
31+
method, err := convertWhisperMethod(w.Header.Metadata.AggregationMethod)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
points := make(map[int][]whisper.Point)
37+
for i := range w.Header.Archives {
38+
p, err := w.DumpArchive(i)
39+
if err != nil {
40+
return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
41+
}
42+
points[i] = p
43+
}
44+
45+
res := &ArchiveRequest{
46+
MetricData: schema.MetricData{
47+
Name: name,
48+
Value: 0,
49+
Interval: int(w.Header.Archives[0].SecondsPerPoint),
50+
Unit: "unknown",
51+
Time: 0,
52+
Mtype: "gauge",
53+
Tags: []string{},
54+
},
55+
}
56+
res.MetricData.SetId()
57+
58+
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
59+
converter := newConverter(w.Header.Archives, points, method, from, until)
60+
for retIdx, retention := range selectedSchema.Retentions {
61+
convertedPoints := converter.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
62+
for m, p := range convertedPoints {
63+
if len(p) == 0 {
64+
continue
65+
}
66+
67+
var archive schema.Archive
68+
if retIdx > 0 {
69+
archive = schema.NewArchive(m, retention.ChunkSpan)
70+
}
71+
72+
encodedChunks := encodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, writeUnfinishedChunks)
73+
for _, chunk := range encodedChunks {
74+
res.ChunkWriteRequests = append(res.ChunkWriteRequests, NewChunkWriteRequest(
75+
archive,
76+
uint32(retention.MaxRetention()),
77+
chunk.Series.T0,
78+
chunk.Encode(retention.ChunkSpan),
79+
time.Now(),
80+
))
81+
}
82+
83+
if res.MetricData.Time < int64(p[len(p)-1].Timestamp) {
84+
res.MetricData.Time = int64(p[len(p)-1].Timestamp)
85+
}
86+
}
87+
}
88+
89+
return res, nil
90+
}
91+
92+
func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {
93+
var buf bytes.Buffer
94+
95+
buf.WriteByte(byte(uint8(1)))
96+
97+
g := gzip.NewWriter(&buf)
98+
err := msgp.Encode(g, a)
99+
if err != nil {
100+
return &buf, fmt.Errorf("ERROR: Encoding MGSP data: %q", err)
101+
}
102+
103+
err = g.Close()
104+
if err != nil {
105+
return &buf, fmt.Errorf("ERROR: Compressing MSGP data: %q", err)
106+
}
107+
108+
return &buf, nil
109+
}
110+
111+
func (a *ArchiveRequest) UnmarshalCompressed(b io.Reader) error {
112+
versionBuf := make([]byte, 1)
113+
readBytes, err := b.Read(versionBuf)
114+
if err != nil || readBytes != 1 {
115+
return fmt.Errorf("ERROR: Failed to read one byte: %s", err)
116+
}
117+
118+
version := uint8(versionBuf[0])
119+
if version != 1 {
120+
return fmt.Errorf("ERROR: Only version 1 is supported, received version %d", version)
121+
}
122+
123+
gzipReader, err := gzip.NewReader(b)
124+
if err != nil {
125+
return fmt.Errorf("ERROR: Creating Gzip reader: %q", err)
126+
}
127+
128+
err = msgp.Decode(bufio.NewReader(gzipReader), a)
129+
if err != nil {
130+
return fmt.Errorf("ERROR: Unmarshaling Raw: %q", err)
131+
}
132+
gzipReader.Close()
133+
134+
return nil
135+
}

0 commit comments

Comments
 (0)