Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 25f1457

Browse files
committed
updated format of cwrs to transfer in importer
1 parent 450b376 commit 25f1457

File tree

10 files changed

+775
-134
lines changed

10 files changed

+775
-134
lines changed

cmd/mt-whisper-importer-reader/main.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,6 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
309309
},
310310
}
311311
res.MetricData.SetId()
312-
mkey, err := schema.MKeyFromString(res.MetricData.Id)
313-
if err != nil {
314-
panic(err)
315-
}
316312

317313
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
318314
conversion := newConversion(w.Header.Archives, points, method)
@@ -323,18 +319,10 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
323319
continue
324320
}
325321

326-
var amkey schema.AMKey
327-
if retIdx == 0 {
328-
amkey = schema.AMKey{MKey: mkey}
329-
} else {
330-
amkey = schema.GetAMKey(mkey, m, retention.ChunkSpan)
331-
}
332-
333322
encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
334323
for _, chunk := range encodedChunks {
335-
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(
336-
nil,
337-
amkey,
324+
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequestWithoutOrg(
325+
schema.NewArchive(m, retention.ChunkSpan),
338326
uint32(retention.MaxRetention()),
339327
chunk.Series.T0,
340328
chunk.Encode(retention.ChunkSpan),

cmd/mt-whisper-importer-writer/main.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package main
22

33
import (
4+
"errors"
45
"flag"
56
"fmt"
67
"net/http"
78
"os"
9+
"strconv"
810
"strings"
911
"time"
1012

@@ -173,8 +175,15 @@ func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
173175
}
174176

175177
func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
178+
orgId, err := getOrgId(req)
179+
if err != nil {
180+
w.WriteHeader(http.StatusForbidden)
181+
w.Write([]byte(err.Error()))
182+
return
183+
}
184+
176185
data := mdata.ArchiveRequest{}
177-
err := data.UnmarshalCompressed(req.Body)
186+
err = data.UnmarshalCompressed(req.Body)
178187
if err != nil {
179188
throwError(w, fmt.Sprintf("Error decoding cwr stream: %q", err))
180189
return
@@ -189,13 +198,15 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
189198
"Received %d cwrs for metric %s. The first has Key: %s, T0: %d, TTL: %d. The last has Key: %s, T0: %d, TTL: %d",
190199
len(data.ChunkWriteRequests),
191200
data.MetricData.Name,
192-
data.ChunkWriteRequests[0].Key.String(),
201+
data.ChunkWriteRequests[0].Archive.String(),
193202
data.ChunkWriteRequests[0].T0,
194203
data.ChunkWriteRequests[0].TTL,
195-
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Key.String(),
204+
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Archive.String(),
196205
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].T0,
197206
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].TTL)
198207

208+
data.MetricData.OrgId = orgId
209+
data.MetricData.SetId()
199210
partition, err := s.partitioner.Partition(&data.MetricData, int32(*numPartitions))
200211
if err != nil {
201212
throwError(w, fmt.Sprintf("Error partitioning: %q", err))
@@ -210,7 +221,18 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
210221

211222
s.index.AddOrUpdate(mkey, &data.MetricData, partition)
212223
for _, cwr := range data.ChunkWriteRequests {
213-
cwr := cwr // important because we pass by reference and this var will get overwritten in the next loop
214-
s.store.Add(&cwr)
224+
cwrWithOrg := cwr.GetChunkWriteRequest(nil, mkey)
225+
s.store.Add(&cwrWithOrg)
226+
}
227+
}
228+
229+
func getOrgId(req *http.Request) (int, error) {
230+
if orgIdStr := req.Header.Get("X-Org-Id"); len(orgIdStr) > 0 {
231+
if orgId, err := strconv.Atoi(orgIdStr); err == nil {
232+
return orgId, nil
233+
} else {
234+
return 0, fmt.Errorf("Invalid value in X-Org-Id header (%s): %s", orgIdStr, err)
235+
}
215236
}
237+
return 0, errors.New("Missing X-Org-Id header")
216238
}

mdata/cwr.go

+28-10
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,48 @@ import (
1313
"github.com/raintank/schema"
1414
)
1515

16+
//go:generate msgp
17+
//msgp:ignore ChunkWriteRequest
18+
1619
type ChunkSaveCallback func()
1720

1821
// ChunkWriteRequest is a request to write a chunk into a store
19-
//go:generate msgp
2022
type ChunkWriteRequest struct {
21-
Callback ChunkSaveCallback `msg:"-"`
22-
Key schema.AMKey `msg:"key,extension"`
23+
ChunkWriteRequestPayload
24+
Callback ChunkSaveCallback
25+
Key schema.AMKey
26+
}
27+
28+
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
29+
return ChunkWriteRequest{ChunkWriteRequestPayload{ttl, t0, data, ts}, callback, key}
30+
}
31+
32+
// ChunkWriteRequestWithoutOrg is used by the importer utility to send cwrs over the network
33+
type ChunkWriteRequestWithoutOrg struct {
34+
ChunkWriteRequestPayload
35+
Archive schema.Archive
36+
}
37+
38+
func NewChunkWriteRequestWithoutOrg(archive schema.Archive, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequestWithoutOrg {
39+
return ChunkWriteRequestWithoutOrg{ChunkWriteRequestPayload{ttl, t0, data, ts}, archive}
40+
}
41+
42+
func (c *ChunkWriteRequestWithoutOrg) GetChunkWriteRequest(callback ChunkSaveCallback, key schema.MKey) ChunkWriteRequest {
43+
return ChunkWriteRequest{c.ChunkWriteRequestPayload, callback, schema.AMKey{MKey: key, Archive: c.Archive}}
44+
}
45+
46+
type ChunkWriteRequestPayload struct {
2347
TTL uint32
2448
T0 uint32
2549
Data []byte
2650
Timestamp time.Time
2751
}
2852

29-
// NewChunkWriteRequest creates a new ChunkWriteRequest
30-
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
31-
return ChunkWriteRequest{callback, key, ttl, t0, data, ts}
32-
}
33-
3453
// ArchiveRequest is a complete representation of a Metric together with some
3554
// chunk write requests containing data that shall be written into this metric
36-
//go:generate msgp
3755
type ArchiveRequest struct {
3856
MetricData schema.MetricData
39-
ChunkWriteRequests []ChunkWriteRequest
57+
ChunkWriteRequests []ChunkWriteRequestWithoutOrg
4058
}
4159

4260
func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {

0 commit comments

Comments
 (0)