Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 61673e0

Browse files
committed
importer transfers cwrs without org
1 parent 450b376 commit 61673e0

File tree

11 files changed

+794
-155
lines changed

11 files changed

+794
-155
lines changed

cmd/mt-whisper-importer-reader/main.go

+6-19
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
var (
3131
httpEndpoint = flag.String(
3232
"http-endpoint",
33-
"http://127.0.0.1:8080/chunks",
33+
"http://127.0.0.1:8080/metrics/import",
3434
"The http endpoint to send the data to",
3535
)
3636
namePrefix = flag.String(
@@ -48,11 +48,6 @@ var (
4848
false,
4949
"Defines if chunks that have not completed their chunk span should be written",
5050
)
51-
orgId = flag.Int(
52-
"orgid",
53-
1,
54-
"Organization ID the data belongs to ",
55-
)
5651
insecureSSL = flag.Bool(
5752
"insecure-ssl",
5853
false,
@@ -305,14 +300,9 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
305300
Time: 0,
306301
Mtype: "gauge",
307302
Tags: []string{},
308-
OrgId: *orgId,
309303
},
310304
}
311305
res.MetricData.SetId()
312-
mkey, err := schema.MKeyFromString(res.MetricData.Id)
313-
if err != nil {
314-
panic(err)
315-
}
316306

317307
_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
318308
conversion := newConversion(w.Header.Archives, points, method)
@@ -323,18 +313,15 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
323313
continue
324314
}
325315

326-
var amkey schema.AMKey
327-
if retIdx == 0 {
328-
amkey = schema.AMKey{MKey: mkey}
329-
} else {
330-
amkey = schema.GetAMKey(mkey, m, retention.ChunkSpan)
316+
var archive schema.Archive
317+
if retIdx > 0 {
318+
archive = schema.NewArchive(m, retention.ChunkSpan)
331319
}
332320

333321
encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
334322
for _, chunk := range encodedChunks {
335-
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(
336-
nil,
337-
amkey,
323+
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequestWithoutOrg(
324+
archive,
338325
uint32(retention.MaxRetention()),
339326
chunk.Series.T0,
340327
chunk.Encode(retention.ChunkSpan),

cmd/mt-whisper-importer-writer/main.go

+33-9
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package main
22

33
import (
4+
"errors"
45
"flag"
56
"fmt"
67
"net/http"
78
"os"
9+
"strconv"
810
"strings"
911
"time"
1012

@@ -27,7 +29,8 @@ import (
2729
var (
2830
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
2931
exitOnError = flag.Bool("exit-on-error", false, "Exit with a message when there's an error")
30-
httpEndpoint = flag.String("http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on")
32+
listenAddress = flag.String("listen-address", "127.0.0.1", "The address to listen on")
33+
listenPort = flag.Int("listen-port", 8080, "The port to listen on")
3134
ttlsStr = flag.String("ttls", "35d", "list of ttl strings used by MT separated by ','")
3235
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
3336
uriPath = flag.String("uri-path", "/chunks", "the URI on which we expect chunks to get posted")
@@ -137,21 +140,22 @@ func main() {
137140

138141
index.Init()
139142

143+
httpEndpoint := fmt.Sprintf("%s:%d", *listenAddress, *listenPort)
140144
server := &Server{
141145
partitioner: p,
142146
index: index,
143147
store: store,
144148
HTTPServer: &http.Server{
145-
Addr: *httpEndpoint,
149+
Addr: httpEndpoint,
146150
ReadTimeout: 10 * time.Minute,
147151
},
148152
}
149153

150154
http.HandleFunc(*uriPath, server.chunksHandler)
151155
http.HandleFunc("/healthz", server.healthzHandler)
152156

153-
log.Infof("Listening on %q", *httpEndpoint)
154-
err = http.ListenAndServe(*httpEndpoint, nil)
157+
log.Infof("Listening on %q", httpEndpoint)
158+
err = http.ListenAndServe(httpEndpoint, nil)
155159
if err != nil {
156160
panic(fmt.Sprintf("Error creating listener: %q", err))
157161
}
@@ -173,8 +177,15 @@ func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
173177
}
174178

175179
func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
180+
orgId, err := getOrgId(req)
181+
if err != nil {
182+
w.WriteHeader(http.StatusForbidden)
183+
w.Write([]byte(err.Error()))
184+
return
185+
}
186+
176187
data := mdata.ArchiveRequest{}
177-
err := data.UnmarshalCompressed(req.Body)
188+
err = data.UnmarshalCompressed(req.Body)
178189
if err != nil {
179190
throwError(w, fmt.Sprintf("Error decoding cwr stream: %q", err))
180191
return
@@ -189,13 +200,15 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
189200
"Received %d cwrs for metric %s. The first has Key: %s, T0: %d, TTL: %d. The last has Key: %s, T0: %d, TTL: %d",
190201
len(data.ChunkWriteRequests),
191202
data.MetricData.Name,
192-
data.ChunkWriteRequests[0].Key.String(),
203+
data.ChunkWriteRequests[0].Archive.String(),
193204
data.ChunkWriteRequests[0].T0,
194205
data.ChunkWriteRequests[0].TTL,
195-
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Key.String(),
206+
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Archive.String(),
196207
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].T0,
197208
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].TTL)
198209

210+
data.MetricData.OrgId = orgId
211+
data.MetricData.SetId()
199212
partition, err := s.partitioner.Partition(&data.MetricData, int32(*numPartitions))
200213
if err != nil {
201214
throwError(w, fmt.Sprintf("Error partitioning: %q", err))
@@ -210,7 +223,18 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
210223

211224
s.index.AddOrUpdate(mkey, &data.MetricData, partition)
212225
for _, cwr := range data.ChunkWriteRequests {
213-
cwr := cwr // important because we pass by reference and this var will get overwritten in the next loop
214-
s.store.Add(&cwr)
226+
cwrWithOrg := cwr.GetChunkWriteRequest(nil, mkey)
227+
s.store.Add(&cwrWithOrg)
228+
}
229+
}
230+
231+
func getOrgId(req *http.Request) (int, error) {
232+
if orgIdStr := req.Header.Get("X-Org-Id"); len(orgIdStr) > 0 {
233+
if orgId, err := strconv.Atoi(orgIdStr); err == nil {
234+
return orgId, nil
235+
} else {
236+
return 0, fmt.Errorf("Invalid value in X-Org-Id header (%s): %s", orgIdStr, err)
237+
}
215238
}
239+
return 0, errors.New("Missing X-Org-Id header")
216240
}

mdata/cwr.go

+28-10
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,48 @@ import (
1313
"github.com/raintank/schema"
1414
)
1515

16+
//go:generate msgp
17+
//msgp:ignore ChunkWriteRequest
18+
1619
type ChunkSaveCallback func()
1720

1821
// ChunkWriteRequest is a request to write a chunk into a store
19-
//go:generate msgp
2022
type ChunkWriteRequest struct {
21-
Callback ChunkSaveCallback `msg:"-"`
22-
Key schema.AMKey `msg:"key,extension"`
23+
ChunkWriteRequestPayload
24+
Callback ChunkSaveCallback
25+
Key schema.AMKey
26+
}
27+
28+
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
29+
return ChunkWriteRequest{ChunkWriteRequestPayload{ttl, t0, data, ts}, callback, key}
30+
}
31+
32+
// ChunkWriteRequestWithoutOrg is used by the importer utility to send cwrs over the network
33+
type ChunkWriteRequestWithoutOrg struct {
34+
ChunkWriteRequestPayload
35+
Archive schema.Archive
36+
}
37+
38+
func NewChunkWriteRequestWithoutOrg(archive schema.Archive, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequestWithoutOrg {
39+
return ChunkWriteRequestWithoutOrg{ChunkWriteRequestPayload{ttl, t0, data, ts}, archive}
40+
}
41+
42+
func (c *ChunkWriteRequestWithoutOrg) GetChunkWriteRequest(callback ChunkSaveCallback, key schema.MKey) ChunkWriteRequest {
43+
return ChunkWriteRequest{c.ChunkWriteRequestPayload, callback, schema.AMKey{MKey: key, Archive: c.Archive}}
44+
}
45+
46+
type ChunkWriteRequestPayload struct {
2347
TTL uint32
2448
T0 uint32
2549
Data []byte
2650
Timestamp time.Time
2751
}
2852

29-
// NewChunkWriteRequest creates a new ChunkWriteRequest
30-
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
31-
return ChunkWriteRequest{callback, key, ttl, t0, data, ts}
32-
}
33-
3453
// ArchiveRequest is a complete representation of a Metric together with some
3554
// chunk write requests containing data that shall be written into this metric
36-
//go:generate msgp
3755
type ArchiveRequest struct {
3856
MetricData schema.MetricData
39-
ChunkWriteRequests []ChunkWriteRequest
57+
ChunkWriteRequests []ChunkWriteRequestWithoutOrg
4058
}
4159

4260
func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {

0 commit comments

Comments
 (0)