Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit d2aaaf2

Browse files
committed
fixes according to comments
1 parent 84217de commit d2aaaf2

File tree

3 files changed

+35
-51
lines changed

3 files changed

+35
-51
lines changed

cmd/mt-whisper-importer-reader/main.go

+22-24
Original file line numberDiff line numberDiff line change
@@ -269,40 +269,43 @@ func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Metho
269269
}
270270
}
271271

272-
func getMetric(w *whisper.Whisper, file, name string) (mdata.ArchiveRequest, error) {
273-
res := mdata.ArchiveRequest{
274-
MetricData: schema.MetricData{
275-
Name: name,
276-
Value: 0,
277-
Unit: "unknown",
278-
Time: 0,
279-
Mtype: "gauge",
280-
Tags: []string{},
281-
OrgId: *orgId,
282-
},
283-
}
284-
272+
func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, error) {
285273
if len(w.Header.Archives) == 0 {
286-
return res, fmt.Errorf("Whisper file contains no archives: %q", file)
274+
return nil, fmt.Errorf("Whisper file contains no archives: %q", file)
287275
}
288276

289277
method, err := convertWhisperMethod(w.Header.Metadata.AggregationMethod)
290278
if err != nil {
291-
return res, err
279+
return nil, err
292280
}
293281

294-
res.MetricData.Interval = int(w.Header.Archives[0].SecondsPerPoint)
295-
res.MetricData.SetId()
296-
297282
points := make(map[int][]whisper.Point)
298283
for i := range w.Header.Archives {
299284
p, err := w.DumpArchive(i)
300285
if err != nil {
301-
return res, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
286+
return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
302287
}
303288
points[i] = p
304289
}
305290

291+
res := &mdata.ArchiveRequest{
292+
MetricData: schema.MetricData{
293+
Name: name,
294+
Value: 0,
295+
Interval: int(w.Header.Archives[0].SecondsPerPoint),
296+
Unit: "unknown",
297+
Time: 0,
298+
Mtype: "gauge",
299+
Tags: []string{},
300+
OrgId: *orgId,
301+
},
302+
}
303+
res.MetricData.SetId()
304+
mkey, err := schema.MKeyFromString(res.MetricData.Id)
305+
if err != nil {
306+
panic(err)
307+
}
308+
306309
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
307310
conversion := newConversion(w.Header.Archives, points, method)
308311
for retIdx, retention := range selectedSchema.Retentions {
@@ -312,11 +315,6 @@ func getMetric(w *whisper.Whisper, file, name string) (mdata.ArchiveRequest, err
312315
continue
313316
}
314317

315-
mkey, err := schema.MKeyFromString(res.MetricData.Id)
316-
if err != nil {
317-
panic(err)
318-
}
319-
320318
encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
321319
for _, chunk := range encodedChunks {
322320
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(

mdata/cwr.go

+11-24
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
"errors"
88
"fmt"
99
"io"
10-
"io/ioutil"
1110
"time"
1211

12+
"github.com/tinylib/msgp/msgp"
13+
1314
"github.com/raintank/schema"
1415
)
1516

@@ -42,21 +43,12 @@ type ArchiveRequest struct {
4243
func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {
4344
var buf bytes.Buffer
4445

45-
// prefix the buffer with version number 1
4646
buf.WriteByte(byte(uint8(1)))
4747

48-
b, err := a.MarshalMsg(nil)
49-
if err != nil {
50-
return &buf, errors.New(fmt.Sprintf("ERROR: Marshalling metric: %q", err))
51-
}
52-
5348
g := gzip.NewWriter(&buf)
54-
_, err = g.Write(b)
55-
if err != nil {
56-
return &buf, errors.New(fmt.Sprintf("ERROR: Compressing MSGP data: %q", err))
57-
}
49+
msgp.Encode(g, a)
5850

59-
err = g.Close()
51+
err := g.Close()
6052
if err != nil {
6153
return &buf, errors.New(fmt.Sprintf("ERROR: Compressing MSGP data: %q", err))
6254
}
@@ -65,28 +57,23 @@ func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {
6557
}
6658

6759
func (a *ArchiveRequest) UnmarshalCompressed(b io.Reader) error {
68-
reader := bufio.NewReader(b)
69-
versionBuf, err := reader.ReadByte()
70-
if err != nil {
71-
return errors.New(fmt.Sprintf("ERROR: Failed to read received data: %q", err))
60+
versionBuf := make([]byte, 1)
61+
readBytes, err := b.Read(versionBuf)
62+
if err != nil || readBytes != 1 {
63+
return errors.New(fmt.Sprintf("ERROR: Failed to read one byte: %s", err))
7264
}
7365

74-
version := uint8(versionBuf)
66+
version := uint8(versionBuf[0])
7567
if version != 1 {
7668
return errors.New(fmt.Sprintf("ERROR: Only version 1 is supported, received version %d", version))
7769
}
7870

79-
gzipReader, err := gzip.NewReader(reader)
71+
gzipReader, err := gzip.NewReader(b)
8072
if err != nil {
8173
return errors.New(fmt.Sprintf("ERROR: Creating Gzip reader: %q", err))
8274
}
8375

84-
raw, err := ioutil.ReadAll(gzipReader)
85-
if err != nil {
86-
return errors.New(fmt.Sprintf("ERROR: Decompressing Gzip: %q", err))
87-
}
88-
89-
_, err = a.UnmarshalMsg(raw)
76+
err = msgp.Decode(bufio.NewReader(gzipReader), a)
9077
if err != nil {
9178
return errors.New(fmt.Sprintf("ERROR: Unmarshaling Raw: %q", err))
9279
}

mdata/cwr_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ func TestArchiveRequestEncodingDecoding(t *testing.T) {
2121
}
2222

2323
encoded, err := originalRequest.MarshalCompressed()
24-
reader := bytes.NewReader(encoded.Bytes())
2524
if err != nil {
26-
t.Fatalf("Expected no error when encoding request, got %q", err)
25+
t.Fatalf("Failed when encoding the request: %s", err)
2726
}
2827

2928
decodedRequest := ArchiveRequest{}
30-
err = decodedRequest.UnmarshalCompressed(reader)
29+
err = decodedRequest.UnmarshalCompressed(encoded)
3130
if err != nil {
3231
t.Fatalf("Expected no error when decoding request, got %q", err)
3332
}

0 commit comments

Comments
 (0)