Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 91675a2

Browse files
committedSep 8, 2017
add api to clear metrics from cache
1 parent b3c1245 commit 91675a2

27 files changed

+909
-213
lines changed
 

‎api/ccache.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"sync"
8+
9+
"github.com/raintank/metrictank/api/middleware"
10+
"github.com/raintank/metrictank/api/models"
11+
"github.com/raintank/metrictank/api/response"
12+
"github.com/raintank/metrictank/cluster"
13+
"github.com/raintank/worldping-api/pkg/log"
14+
)
15+
16+
func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete) {
17+
res := models.CCacheDeleteResp{}
18+
19+
if req.Propagate {
20+
res.Peers = s.ccacheDeletePropagate(ctx.Req.Context(), &req)
21+
}
22+
23+
for _, pattern := range req.Patterns {
24+
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, 0)
25+
if err != nil {
26+
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
27+
return
28+
}
29+
for _, node := range nodes {
30+
for _, def := range node.Defs {
31+
delResult := s.Cache.DelMetric(def.Id)
32+
res.DeletedSeries += delResult.Series
33+
res.DeletedArchives += delResult.Archives
34+
}
35+
}
36+
}
37+
response.Write(ctx, response.NewJson(200, res, ""))
38+
}
39+
40+
func (s *Server) ccacheDeletePropagate(ctx context.Context, req *models.CCacheDelete) map[string]models.CCacheDeleteResp {
41+
// we never want to propagate more than once to avoid loops
42+
req.Propagate = false
43+
44+
peers := cluster.Manager.MemberList()
45+
peerResults := make(map[string]models.CCacheDeleteResp)
46+
var mu sync.Mutex
47+
var wg sync.WaitGroup
48+
for _, peer := range peers {
49+
if peer.IsLocal() {
50+
continue
51+
}
52+
wg.Add(1)
53+
go func(peer cluster.NodeIf) {
54+
mu.Lock()
55+
defer mu.Unlock()
56+
peerResults[peer.GetName()] = s.ccacheDeleteRemote(ctx, req, peer)
57+
wg.Done()
58+
}(peer)
59+
}
60+
wg.Wait()
61+
62+
return peerResults
63+
}
64+
65+
func (s *Server) ccacheDeleteRemote(ctx context.Context, req *models.CCacheDelete, peer cluster.NodeIf) models.CCacheDeleteResp {
66+
var res models.CCacheDeleteResp
67+
68+
log.Debug("HTTP metricDelete calling %s/ccache/delete", peer.GetName())
69+
buf, err := peer.Post(ctx, "ccacheDeleteRemote", "/ccache/delete", *req)
70+
if err != nil {
71+
log.Error(4, "HTTP ccacheDelete error querying %s/ccache/delete: %q", peer.GetName(), err)
72+
res.Errors++
73+
return res
74+
}
75+
76+
err = json.Unmarshal(buf, &res)
77+
if err != nil {
78+
log.Error(4, "HTTP ccacheDelete error unmarshaling body from %s/ccache/delete: %q", peer.GetName(), err)
79+
res.Errors++
80+
return res
81+
}
82+
83+
return res
84+
}

‎api/ccache_test.go

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package api
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/raintank/metrictank/api/models"
12+
"github.com/raintank/metrictank/api/response"
13+
"github.com/raintank/metrictank/cluster"
14+
"github.com/raintank/metrictank/conf"
15+
"github.com/raintank/metrictank/idx/memory"
16+
"github.com/raintank/metrictank/mdata"
17+
"github.com/raintank/metrictank/mdata/cache"
18+
"gopkg.in/raintank/schema.v1"
19+
)
20+
21+
func newSrv(delSeries, delArchives int, key string) (*Server, *cache.MockCache) {
22+
srv, _ := NewServer()
23+
srv.RegisterRoutes()
24+
25+
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
26+
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))
27+
28+
store := mdata.NewDevnullStore()
29+
srv.BindBackendStore(store)
30+
31+
mockCache := cache.NewMockCache()
32+
mockCache.DelMetricRes = cache.CCDelMetricResult{Series: delSeries, Archives: delArchives}
33+
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0)
34+
srv.BindMemoryStore(metrics)
35+
srv.BindCache(mockCache)
36+
37+
metricIndex := memory.New()
38+
metricIndex.AddOrUpdate(
39+
&schema.MetricData{
40+
Id: key,
41+
OrgId: 1,
42+
Name: "test.key",
43+
Metric: "test.key",
44+
Interval: 10,
45+
Value: 1,
46+
},
47+
0,
48+
)
49+
srv.BindMetricIndex(metricIndex)
50+
return srv, mockCache
51+
}
52+
53+
func TestMetricDelete(t *testing.T) {
54+
cluster.Init("default", "test", time.Now(), "http", 6060)
55+
56+
delSeries := 3
57+
delArchives := 10
58+
testKey := "12345"
59+
60+
srv, cache := newSrv(delSeries, delArchives, testKey)
61+
req, _ := json.Marshal(models.CCacheDelete{
62+
Patterns: []string{"test.*"},
63+
OrgId: 1,
64+
Propagate: false,
65+
})
66+
67+
ts := httptest.NewServer(srv.Macaron)
68+
defer ts.Close()
69+
70+
res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
71+
if err != nil {
72+
t.Fatalf("There was an error in the request: %s", err)
73+
}
74+
75+
respParsed := models.CCacheDeleteResp{}
76+
buf := new(bytes.Buffer)
77+
buf.ReadFrom(res.Body)
78+
json.Unmarshal(buf.Bytes(), &respParsed)
79+
80+
if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
81+
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
82+
}
83+
84+
if respParsed.DeletedSeries != delSeries || respParsed.DeletedArchives != delArchives {
85+
t.Fatalf("Expected %d series and %d archives to get deleted, but got %d and %d", delSeries, delArchives, respParsed.DeletedSeries, respParsed.DeletedArchives)
86+
}
87+
}
88+
89+
func TestMetricDeletePropagation(t *testing.T) {
90+
manager := cluster.InitMock()
91+
92+
expectedDeletedSeries, expectedDeletedArchives := 0, 0
93+
94+
// define how many series/archives are getting deleted by peer 0
95+
resp := models.CCacheDeleteResp{
96+
DeletedSeries: 2,
97+
DeletedArchives: 5,
98+
}
99+
expectedDeletedSeries += resp.DeletedSeries
100+
expectedDeletedArchives += resp.DeletedArchives
101+
respEncoded := response.NewJson(200, resp, "")
102+
buf, _ := respEncoded.Body()
103+
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "1", buf))
104+
105+
// define how many series/archives are getting deleted by peer 1
106+
resp = models.CCacheDeleteResp{
107+
Peers: map[string]models.CCacheDeleteResp{"2": {Errors: 1}},
108+
DeletedSeries: 1,
109+
DeletedArchives: 1,
110+
}
111+
112+
respEncoded = response.NewJson(200, resp, "")
113+
buf, _ = respEncoded.Body()
114+
// should be ignored because peer.IsLocal() is true
115+
manager.Peers = append(manager.Peers, cluster.NewMockNode(true, "2", buf))
116+
117+
// define how many series/archives are getting deleted by peer 2
118+
resp = models.CCacheDeleteResp{
119+
Peers: map[string]models.CCacheDeleteResp{"3": {Errors: 1}},
120+
DeletedSeries: 1,
121+
DeletedArchives: 3,
122+
}
123+
expectedDeletedSeries += resp.DeletedSeries
124+
expectedDeletedArchives += resp.DeletedArchives
125+
respEncoded = response.NewJson(200, resp, "")
126+
buf, _ = respEncoded.Body()
127+
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "3", buf))
128+
129+
// define how many series/archives are going to get deleted by this server
130+
delSeries := 3
131+
delArchives := 10
132+
testKey := "12345"
133+
134+
// add up how many series/archives are expected to be deleted
135+
expectedDeletedSeries += delSeries
136+
expectedDeletedArchives += delArchives
137+
138+
srv, cache := newSrv(delSeries, delArchives, testKey)
139+
req, err := json.Marshal(models.CCacheDelete{
140+
Patterns: []string{"test.*"},
141+
OrgId: 1,
142+
Propagate: true,
143+
})
144+
145+
ts := httptest.NewServer(srv.Macaron)
146+
defer ts.Close()
147+
148+
res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
149+
if err != nil {
150+
t.Fatalf("There was an error in the request: %s", err)
151+
}
152+
153+
buf2 := new(bytes.Buffer)
154+
buf2.ReadFrom(res.Body)
155+
respParsed := models.CCacheDeleteResp{}
156+
json.Unmarshal(buf2.Bytes(), &respParsed)
157+
158+
if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
159+
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
160+
}
161+
162+
deletedArchives := respParsed.DeletedArchives
163+
deletedSeries := respParsed.DeletedSeries
164+
for _, peer := range respParsed.Peers {
165+
deletedArchives += peer.DeletedArchives
166+
deletedSeries += peer.DeletedSeries
167+
}
168+
169+
if deletedSeries != expectedDeletedSeries || deletedArchives != expectedDeletedArchives {
170+
t.Fatalf(
171+
"Expected %d series and %d archives to get deleted, but got %d and %d",
172+
expectedDeletedSeries, expectedDeletedArchives, respParsed.DeletedSeries, respParsed.DeletedArchives,
173+
)
174+
}
175+
}

‎api/cluster.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (s *Server) appStatus(ctx *middleware.Context) {
4545
func (s *Server) getClusterStatus(ctx *middleware.Context) {
4646
status := models.ClusterStatus{
4747
ClusterName: cluster.ClusterName,
48-
NodeName: cluster.Manager.ThisNode().Name,
48+
NodeName: cluster.Manager.ThisNode().GetName(),
4949
Members: cluster.Manager.MemberList(),
5050
}
5151
response.Write(ctx, response.NewJson(200, status, ""))
@@ -56,7 +56,7 @@ func (s *Server) postClusterMembers(ctx *middleware.Context, req models.ClusterM
5656
var toJoin []string
5757

5858
for _, memberNode := range cluster.Manager.MemberList() {
59-
memberNames[memberNode.Name] = struct{}{}
59+
memberNames[memberNode.GetName()] = struct{}{}
6060
}
6161

6262
for _, peerName := range req.Members {

‎api/dataprocessor.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
120120
if req.Node.IsLocal() {
121121
localReqs = append(localReqs, req)
122122
} else {
123-
remoteReqs[req.Node.Name] = append(remoteReqs[req.Node.Name], req)
123+
remoteReqs[req.Node.GetName()] = append(remoteReqs[req.Node.GetName()], req)
124124
}
125125
}
126126

@@ -178,7 +178,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
178178
wg := sync.WaitGroup{}
179179
wg.Add(len(remoteReqs))
180180
for _, nodeReqs := range remoteReqs {
181-
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name)
181+
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.GetName())
182182
go func(ctx context.Context, reqs []models.Req) {
183183
defer wg.Done()
184184
node := reqs[0].Node
@@ -190,11 +190,11 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
190190
var resp models.GetDataResp
191191
_, err = resp.UnmarshalMsg(buf)
192192
if err != nil {
193-
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err)
193+
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.GetName(), err)
194194
errorsChan <- err
195195
return
196196
}
197-
log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series))
197+
log.Debug("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
198198
seriesChan <- resp.Series
199199
}(ctx, nodeReqs)
200200
}
@@ -446,7 +446,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) []chunk
446446
}
447447
// it's important that the itgens get added in chronological order,
448448
// currently we rely on cassandra returning results in order
449-
go s.Cache.Add(key, prevts, itgen)
449+
go s.Cache.Add(key, ctx.Key, ctx.Cons, prevts, itgen)
450450
prevts = itgen.Ts
451451
iters = append(iters, *it)
452452
}

‎api/dataprocessor_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func compareReqEqual(a, b models.Req) bool {
416416
if a.Consolidator != b.Consolidator {
417417
return false
418418
}
419-
if a.Node.Name != b.Node.Name {
419+
if a.Node.GetName() != b.Node.GetName() {
420420
return false
421421
}
422422
if a.Archive != b.Archive {
@@ -624,7 +624,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
624624
for i := 0; i < len(tc.Pattern); i++ {
625625
itgen = chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span)
626626
if pattern[i] == 'c' || pattern[i] == 'b' {
627-
c.Add(metric, prevts, *itgen)
627+
c.Add(metric, metric, 0, prevts, *itgen)
628628
}
629629
if pattern[i] == 's' || pattern[i] == 'b' {
630630
cwr := mdata.NewChunkWriteRequest(nil, metric, &chunks[i], 0, span, time.Now())

0 commit comments

Comments
 (0)
This repository has been archived.