Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a0a00c4

Browse files
authored
Merge pull request #555 from grafana/clear_cache_api
Clear cache api
2 parents d5ca2bc + b168558 commit a0a00c4

25 files changed

+1008
-164
lines changed

api/ccache.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"sync"
7+
8+
"github.com/grafana/metrictank/api/middleware"
9+
"github.com/grafana/metrictank/api/models"
10+
"github.com/grafana/metrictank/api/response"
11+
"github.com/grafana/metrictank/cluster"
12+
"github.com/raintank/worldping-api/pkg/log"
13+
)
14+
15+
func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete) {
16+
res := models.CCacheDeleteResp{}
17+
code := 200
18+
19+
if req.Propagate {
20+
res.Peers = s.ccacheDeletePropagate(ctx.Req.Context(), &req)
21+
for _, peer := range res.Peers {
22+
if peer.Errors > 0 {
23+
code = 500
24+
}
25+
}
26+
}
27+
28+
fullFlush := false
29+
for _, pattern := range req.Patterns {
30+
if pattern == "**" {
31+
fullFlush = true
32+
}
33+
}
34+
35+
if fullFlush {
36+
delSeries, delArchives := s.Cache.Reset()
37+
res.DeletedSeries += delSeries
38+
res.DeletedArchives += delArchives
39+
} else {
40+
for _, pattern := range req.Patterns {
41+
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, 0)
42+
if err != nil {
43+
if res.Errors == 0 {
44+
res.FirstError = err.Error()
45+
}
46+
res.Errors += 1
47+
code = 500
48+
} else {
49+
for _, node := range nodes {
50+
for _, def := range node.Defs {
51+
delSeries, delArchives := s.Cache.DelMetric(def.NameWithTags())
52+
res.DeletedSeries += delSeries
53+
res.DeletedArchives += delArchives
54+
}
55+
}
56+
}
57+
}
58+
}
59+
response.Write(ctx, response.NewJson(code, res, ""))
60+
}
61+
62+
func (s *Server) ccacheDeletePropagate(ctx context.Context, req *models.CCacheDelete) map[string]models.CCacheDeleteResp {
63+
// we never want to propagate more than once to avoid loops
64+
req.Propagate = false
65+
66+
peers := cluster.Manager.MemberList()
67+
peerResults := make(map[string]models.CCacheDeleteResp)
68+
var mu sync.Mutex
69+
var wg sync.WaitGroup
70+
for _, peer := range peers {
71+
if peer.IsLocal() {
72+
continue
73+
}
74+
wg.Add(1)
75+
go func(peer cluster.Node) {
76+
mu.Lock()
77+
defer mu.Unlock()
78+
peerResults[peer.GetName()] = s.ccacheDeleteRemote(ctx, req, peer)
79+
wg.Done()
80+
}(peer)
81+
}
82+
wg.Wait()
83+
84+
return peerResults
85+
}
86+
87+
func (s *Server) ccacheDeleteRemote(ctx context.Context, req *models.CCacheDelete, peer cluster.Node) models.CCacheDeleteResp {
88+
var res models.CCacheDeleteResp
89+
90+
log.Debug("HTTP metricDelete calling %s/ccache/delete", peer.GetName())
91+
buf, err := peer.Post(ctx, "ccacheDeleteRemote", "/ccache/delete", *req)
92+
if err != nil {
93+
log.Error(4, "HTTP ccacheDelete error querying %s/ccache/delete: %q", peer.GetName(), err)
94+
res.FirstError = err.Error()
95+
res.Errors++
96+
return res
97+
}
98+
99+
err = json.Unmarshal(buf, &res)
100+
if err != nil {
101+
log.Error(4, "HTTP ccacheDelete error unmarshaling body from %s/ccache/delete: %q", peer.GetName(), err)
102+
res.FirstError = err.Error()
103+
res.Errors++
104+
}
105+
106+
return res
107+
}

api/ccache_test.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package api
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/grafana/metrictank/api/models"
12+
"github.com/grafana/metrictank/api/response"
13+
"github.com/grafana/metrictank/cluster"
14+
"github.com/grafana/metrictank/conf"
15+
"github.com/grafana/metrictank/idx/memory"
16+
"github.com/grafana/metrictank/mdata"
17+
"github.com/grafana/metrictank/mdata/cache"
18+
"gopkg.in/raintank/schema.v1"
19+
)
20+
21+
func newSrv(delSeries, delArchives int, key string) (*Server, *cache.MockCache) {
22+
srv, _ := NewServer()
23+
srv.RegisterRoutes()
24+
25+
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
26+
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))
27+
28+
store := mdata.NewDevnullStore()
29+
srv.BindBackendStore(store)
30+
31+
mockCache := cache.NewMockCache()
32+
mockCache.DelMetricSeries = delSeries
33+
mockCache.DelMetricArchives = delArchives
34+
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0)
35+
srv.BindMemoryStore(metrics)
36+
srv.BindCache(mockCache)
37+
38+
metricIndex := memory.New()
39+
metricIndex.AddOrUpdate(
40+
&schema.MetricData{
41+
Id: "123",
42+
OrgId: 1,
43+
Name: key,
44+
Metric: key,
45+
Interval: 10,
46+
Value: 1,
47+
},
48+
0,
49+
)
50+
srv.BindMetricIndex(metricIndex)
51+
return srv, mockCache
52+
}
53+
54+
func TestMetricDelete(t *testing.T) {
55+
cluster.Init("default", "test", time.Now(), "http", 6060)
56+
57+
delSeries := 1
58+
delArchives := 3
59+
testKey := "test.key"
60+
61+
srv, cache := newSrv(delSeries, delArchives, testKey)
62+
req, _ := json.Marshal(models.CCacheDelete{
63+
Patterns: []string{"test.*"},
64+
OrgId: 1,
65+
Propagate: false,
66+
})
67+
68+
ts := httptest.NewServer(srv.Macaron)
69+
defer ts.Close()
70+
71+
res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
72+
if err != nil {
73+
t.Fatalf("There was an error in the request: %s", err)
74+
}
75+
76+
respParsed := models.CCacheDeleteResp{}
77+
buf := new(bytes.Buffer)
78+
buf.ReadFrom(res.Body)
79+
json.Unmarshal(buf.Bytes(), &respParsed)
80+
81+
if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
82+
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
83+
}
84+
85+
if respParsed.DeletedSeries != delSeries || respParsed.DeletedArchives != delArchives {
86+
t.Fatalf("Expected %d series and %d archives to get deleted, but got %d and %d", delSeries, delArchives, respParsed.DeletedSeries, respParsed.DeletedArchives)
87+
}
88+
}
89+
90+
func TestMetricDeleteWithErrorInPropagation(t *testing.T) {
91+
manager := cluster.InitMock()
92+
93+
// define how many series/archives are getting deleted by peer 0
94+
resp := models.CCacheDeleteResp{
95+
DeletedSeries: 1,
96+
DeletedArchives: 1,
97+
Errors: 1,
98+
}
99+
100+
respEncoded := response.NewJson(500, resp, "")
101+
buf, _ := respEncoded.Body()
102+
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "0", buf))
103+
104+
// define how many series/archives are going to get deleted by this server
105+
delSeries := 1
106+
delArchives := 3
107+
testKey := "test.key"
108+
109+
srv, _ := newSrv(delSeries, delArchives, testKey)
110+
req, err := json.Marshal(models.CCacheDelete{
111+
Patterns: []string{"test.*"},
112+
OrgId: 1,
113+
Propagate: true,
114+
})
115+
if err != nil {
116+
t.Fatalf("Unexpected error when marshaling json: %s", err)
117+
}
118+
119+
ts := httptest.NewServer(srv.Macaron)
120+
defer ts.Close()
121+
122+
res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
123+
if err != nil {
124+
t.Fatalf("There was an error in the request: %s", err)
125+
}
126+
127+
expectedCode := 500
128+
if res.StatusCode != expectedCode {
129+
buf2 := new(bytes.Buffer)
130+
buf2.ReadFrom(res.Body)
131+
respParsed := models.CCacheDeleteResp{}
132+
json.Unmarshal(buf2.Bytes(), &respParsed)
133+
t.Fatalf("Expected status code %d, but got %d:\n%+v", expectedCode, res.StatusCode, respParsed)
134+
}
135+
}
136+
137+
func TestMetricDeletePropagation(t *testing.T) {
138+
manager := cluster.InitMock()
139+
140+
expectedDeletedSeries, expectedDeletedArchives := 0, 0
141+
for _, peer := range []string{"Peer1", "Peer2", "Peer3"} {
142+
// define how many series/archives are getting deleted by this peer
143+
resp := models.CCacheDeleteResp{
144+
DeletedSeries: 2,
145+
DeletedArchives: 5,
146+
}
147+
expectedDeletedSeries += resp.DeletedSeries
148+
expectedDeletedArchives += resp.DeletedArchives
149+
respEncoded := response.NewJson(200, resp, "")
150+
buf, _ := respEncoded.Body()
151+
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, peer, buf))
152+
}
153+
154+
// define how many series/archives are going to get deleted by this server
155+
delSeries := 1
156+
delArchives := 3
157+
testKey := "test.key"
158+
159+
// add up how many series/archives are expected to be deleted
160+
expectedDeletedSeries += delSeries
161+
expectedDeletedArchives += delArchives
162+
163+
srv, cache := newSrv(delSeries, delArchives, testKey)
164+
req, err := json.Marshal(models.CCacheDelete{
165+
Patterns: []string{"test.*"},
166+
OrgId: 1,
167+
Propagate: true,
168+
})
169+
if err != nil {
170+
t.Fatalf("Unexpected error when marshaling json: %s", err)
171+
}
172+
173+
ts := httptest.NewServer(srv.Macaron)
174+
defer ts.Close()
175+
176+
res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
177+
if err != nil {
178+
t.Fatalf("There was an error in the request: %s", err)
179+
}
180+
181+
buf2 := new(bytes.Buffer)
182+
buf2.ReadFrom(res.Body)
183+
respParsed := models.CCacheDeleteResp{}
184+
json.Unmarshal(buf2.Bytes(), &respParsed)
185+
186+
if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
187+
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
188+
}
189+
190+
deletedArchives := respParsed.DeletedArchives
191+
deletedSeries := respParsed.DeletedSeries
192+
for _, peer := range respParsed.Peers {
193+
deletedArchives += peer.DeletedArchives
194+
deletedSeries += peer.DeletedSeries
195+
}
196+
197+
if deletedSeries != expectedDeletedSeries || deletedArchives != expectedDeletedArchives {
198+
t.Fatalf(
199+
"Expected %d series and %d archives to get deleted, but got %d and %d",
200+
expectedDeletedSeries, expectedDeletedArchives, respParsed.DeletedSeries, respParsed.DeletedArchives,
201+
)
202+
}
203+
}

api/cluster.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (s *Server) appStatus(ctx *middleware.Context) {
4747
func (s *Server) getClusterStatus(ctx *middleware.Context) {
4848
status := models.ClusterStatus{
4949
ClusterName: cluster.ClusterName,
50-
NodeName: cluster.Manager.ThisNode().Name,
50+
NodeName: cluster.Manager.ThisNode().GetName(),
5151
Members: cluster.Manager.MemberList(),
5252
}
5353
response.Write(ctx, response.NewJson(200, status, ""))
@@ -58,7 +58,7 @@ func (s *Server) postClusterMembers(ctx *middleware.Context, req models.ClusterM
5858
var toJoin []string
5959

6060
for _, memberNode := range cluster.Manager.MemberList() {
61-
memberNames[memberNode.Name] = struct{}{}
61+
memberNames[memberNode.GetName()] = struct{}{}
6262
}
6363

6464
for _, peerName := range req.Members {
@@ -231,11 +231,11 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
231231
wg.Add(1)
232232
go func(peer cluster.Node) {
233233
defer wg.Done()
234-
log.Debug("HTTP Render querying %s%s", peer.Name, path)
234+
log.Debug("HTTP Render querying %s%s", peer.GetName(), path)
235235
buf, err := peer.Post(reqCtx, name, path, data)
236236
if err != nil {
237237
cancel()
238-
log.Error(4, "HTTP Render error querying %s%s: %q", peer.Name, path, err)
238+
log.Error(4, "HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
239239
}
240240
responses <- struct {
241241
data PeerResponse

api/dataprocessor.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
145145
if req.Node.IsLocal() {
146146
localReqs = append(localReqs, req)
147147
} else {
148-
remoteReqs[req.Node.Name] = append(remoteReqs[req.Node.Name], req)
148+
remoteReqs[req.Node.GetName()] = append(remoteReqs[req.Node.GetName()], req)
149149
}
150150
}
151151

@@ -204,7 +204,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
204204
wg := sync.WaitGroup{}
205205
wg.Add(len(remoteReqs))
206206
for _, nodeReqs := range remoteReqs {
207-
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name)
207+
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.GetName())
208208
go func(reqs []models.Req) {
209209
defer wg.Done()
210210
node := reqs[0].Node
@@ -218,11 +218,11 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
218218
_, err = resp.UnmarshalMsg(buf)
219219
if err != nil {
220220
cancel()
221-
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err)
221+
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.GetName(), err)
222222
responses <- getTargetsResp{nil, err}
223223
return
224224
}
225-
log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series))
225+
log.Debug("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
226226
responses <- getTargetsResp{resp.Series, nil}
227227
}(nodeReqs)
228228
}
@@ -563,7 +563,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
563563
}
564564
// it's important that the itgens get added in chronological order,
565565
// currently we rely on cassandra returning results in order
566-
s.Cache.Add(key, prevts, itgen)
566+
s.Cache.Add(key, ctx.Key, prevts, itgen)
567567
prevts = itgen.Ts
568568
iters = append(iters, *it)
569569
}

0 commit comments

Comments
 (0)