Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Clear cache api #555

Merged
merged 24 commits into from
Jan 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions api/ccache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package api

import (
"context"
"encoding/json"
"sync"

"github.com/grafana/metrictank/api/middleware"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/api/response"
"github.com/grafana/metrictank/cluster"
"github.com/raintank/worldping-api/pkg/log"
)

func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete) {
res := models.CCacheDeleteResp{}
code := 200

if req.Propagate {
res.Peers = s.ccacheDeletePropagate(ctx.Req.Context(), &req)
for _, peer := range res.Peers {
if peer.Errors > 0 {
code = 500
}
}
}

fullFlush := false
for _, pattern := range req.Patterns {
if pattern == "**" {
fullFlush = true
}
}

if fullFlush {
delSeries, delArchives := s.Cache.Reset()
res.DeletedSeries += delSeries
res.DeletedArchives += delArchives
} else {
for _, pattern := range req.Patterns {
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, 0)
if err != nil {
if res.Errors == 0 {
res.FirstError = err.Error()
}
res.Errors += 1
code = 500
} else {
for _, node := range nodes {
for _, def := range node.Defs {
delSeries, delArchives := s.Cache.DelMetric(def.NameWithTags())
res.DeletedSeries += delSeries
res.DeletedArchives += delArchives
}
}
}
}
}
response.Write(ctx, response.NewJson(code, res, ""))
}

func (s *Server) ccacheDeletePropagate(ctx context.Context, req *models.CCacheDelete) map[string]models.CCacheDeleteResp {
// we never want to propagate more than once to avoid loops
req.Propagate = false

peers := cluster.Manager.MemberList()
peerResults := make(map[string]models.CCacheDeleteResp)
var mu sync.Mutex
var wg sync.WaitGroup
for _, peer := range peers {
if peer.IsLocal() {
continue
}
wg.Add(1)
go func(peer cluster.Node) {
mu.Lock()
defer mu.Unlock()
peerResults[peer.GetName()] = s.ccacheDeleteRemote(ctx, req, peer)
wg.Done()
}(peer)
}
wg.Wait()

return peerResults
}

func (s *Server) ccacheDeleteRemote(ctx context.Context, req *models.CCacheDelete, peer cluster.Node) models.CCacheDeleteResp {
var res models.CCacheDeleteResp

log.Debug("HTTP metricDelete calling %s/ccache/delete", peer.GetName())
buf, err := peer.Post(ctx, "ccacheDeleteRemote", "/ccache/delete", *req)
if err != nil {
log.Error(4, "HTTP ccacheDelete error querying %s/ccache/delete: %q", peer.GetName(), err)
res.FirstError = err.Error()
res.Errors++
return res
}

err = json.Unmarshal(buf, &res)
if err != nil {
log.Error(4, "HTTP ccacheDelete error unmarshaling body from %s/ccache/delete: %q", peer.GetName(), err)
res.FirstError = err.Error()
res.Errors++
}

return res
}
203 changes: 203 additions & 0 deletions api/ccache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package api

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/api/response"
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"gopkg.in/raintank/schema.v1"
)

func newSrv(delSeries, delArchives int, key string) (*Server, *cache.MockCache) {
srv, _ := NewServer()
srv.RegisterRoutes()

mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

store := mdata.NewDevnullStore()
srv.BindBackendStore(store)

mockCache := cache.NewMockCache()
mockCache.DelMetricSeries = delSeries
mockCache.DelMetricArchives = delArchives
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0)
srv.BindMemoryStore(metrics)
srv.BindCache(mockCache)

metricIndex := memory.New()
metricIndex.AddOrUpdate(
&schema.MetricData{
Id: "123",
OrgId: 1,
Name: key,
Metric: key,
Interval: 10,
Value: 1,
},
0,
)
srv.BindMetricIndex(metricIndex)
return srv, mockCache
}

func TestMetricDelete(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)

delSeries := 1
delArchives := 3
testKey := "test.key"

srv, cache := newSrv(delSeries, delArchives, testKey)
req, _ := json.Marshal(models.CCacheDelete{
Patterns: []string{"test.*"},
OrgId: 1,
Propagate: false,
})

ts := httptest.NewServer(srv.Macaron)
defer ts.Close()

res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
if err != nil {
t.Fatalf("There was an error in the request: %s", err)
}

respParsed := models.CCacheDeleteResp{}
buf := new(bytes.Buffer)
buf.ReadFrom(res.Body)
json.Unmarshal(buf.Bytes(), &respParsed)

if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
}

if respParsed.DeletedSeries != delSeries || respParsed.DeletedArchives != delArchives {
t.Fatalf("Expected %d series and %d archives to get deleted, but got %d and %d", delSeries, delArchives, respParsed.DeletedSeries, respParsed.DeletedArchives)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm confused about why we tell the cache to return these fake, made up numbers and then check them, while we also check the real numbers above. can this be simplified ? for example have the mockcache return that deleted series == deleted metric keys, and deleted archives is maybe pinned to 3x the number of deleted keys? does this idea make sense? if not, add a comment somewhere that explains this please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only testing the request handler, not the cache. It's testing one request with propagation disabled (TestMetricDelete) and one with propagation enabled (TestMetricDeletePropagation). I'm not sure how creating additional rules like archives = 3 * series would simplify things, that would rather just make it more complicated because in order to understand the test a reader would then first need to be aware of this rule.
Maybe I should rename the tests to TestMetricDeleteRequestHandlerWithoutPropagation() and TestMetricDeleteRequestHandlerWithPropagation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's confusing because the mock cache claims to have deleted a number series and archives that has nothing to do with the delete request we actually issued on it. the numbers don't seem to make sense. we assert that it only received 1 metric key delete , then how could any cache have deleted 3 series if there was only 1 key? i know it's a mock and not a real cache, but the numbers should make more sense I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would clear things up if the DelMetric method took patterns.
but according to the argument names or the interface function and its implementations
(and the docs for CCache.DelMetric) it only takes 1 metric key, not a pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found a bug while looking at that just now: b168558

}

func TestMetricDeleteWithErrorInPropagation(t *testing.T) {
manager := cluster.InitMock()

// define how many series/archives are getting deleted by peer 0
resp := models.CCacheDeleteResp{
DeletedSeries: 1,
DeletedArchives: 1,
Errors: 1,
}

respEncoded := response.NewJson(500, resp, "")
buf, _ := respEncoded.Body()
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "0", buf))

// define how many series/archives are going to get deleted by this server
delSeries := 1
delArchives := 3
testKey := "test.key"

srv, _ := newSrv(delSeries, delArchives, testKey)
req, err := json.Marshal(models.CCacheDelete{
Patterns: []string{"test.*"},
OrgId: 1,
Propagate: true,
})
if err != nil {
t.Fatalf("Unexpected error when marshaling json: %s", err)
}

ts := httptest.NewServer(srv.Macaron)
defer ts.Close()

res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
if err != nil {
t.Fatalf("There was an error in the request: %s", err)
}

expectedCode := 500
if res.StatusCode != expectedCode {
buf2 := new(bytes.Buffer)
buf2.ReadFrom(res.Body)
respParsed := models.CCacheDeleteResp{}
json.Unmarshal(buf2.Bytes(), &respParsed)
t.Fatalf("Expected status code %d, but got %d:\n%+v", expectedCode, res.StatusCode, respParsed)
}
}

func TestMetricDeletePropagation(t *testing.T) {
manager := cluster.InitMock()

expectedDeletedSeries, expectedDeletedArchives := 0, 0
for _, peer := range []string{"Peer1", "Peer2", "Peer3"} {
// define how many series/archives are getting deleted by this peer
resp := models.CCacheDeleteResp{
DeletedSeries: 2,
DeletedArchives: 5,
}
expectedDeletedSeries += resp.DeletedSeries
expectedDeletedArchives += resp.DeletedArchives
respEncoded := response.NewJson(200, resp, "")
buf, _ := respEncoded.Body()
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, peer, buf))
}

// define how many series/archives are going to get deleted by this server
delSeries := 1
delArchives := 3
testKey := "test.key"

// add up how many series/archives are expected to be deleted
expectedDeletedSeries += delSeries
expectedDeletedArchives += delArchives

srv, cache := newSrv(delSeries, delArchives, testKey)
req, err := json.Marshal(models.CCacheDelete{
Patterns: []string{"test.*"},
OrgId: 1,
Propagate: true,
})
if err != nil {
t.Fatalf("Unexpected error when marshaling json: %s", err)
}

ts := httptest.NewServer(srv.Macaron)
defer ts.Close()

res, err := http.Post(ts.URL+"/ccache/delete", "application/json", bytes.NewReader(req))
if err != nil {
t.Fatalf("There was an error in the request: %s", err)
}

buf2 := new(bytes.Buffer)
buf2.ReadFrom(res.Body)
respParsed := models.CCacheDeleteResp{}
json.Unmarshal(buf2.Bytes(), &respParsed)

if len(cache.DelMetricKeys) != 1 || cache.DelMetricKeys[0] != testKey {
t.Fatalf("Expected that key %s has been deleted, but it has not", testKey)
}

deletedArchives := respParsed.DeletedArchives
deletedSeries := respParsed.DeletedSeries
for _, peer := range respParsed.Peers {
deletedArchives += peer.DeletedArchives
deletedSeries += peer.DeletedSeries
}

if deletedSeries != expectedDeletedSeries || deletedArchives != expectedDeletedArchives {
t.Fatalf(
"Expected %d series and %d archives to get deleted, but got %d and %d",
expectedDeletedSeries, expectedDeletedArchives, respParsed.DeletedSeries, respParsed.DeletedArchives,
)
}
}
8 changes: 4 additions & 4 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Server) appStatus(ctx *middleware.Context) {
func (s *Server) getClusterStatus(ctx *middleware.Context) {
status := models.ClusterStatus{
ClusterName: cluster.ClusterName,
NodeName: cluster.Manager.ThisNode().Name,
NodeName: cluster.Manager.ThisNode().GetName(),
Members: cluster.Manager.MemberList(),
}
response.Write(ctx, response.NewJson(200, status, ""))
Expand All @@ -58,7 +58,7 @@ func (s *Server) postClusterMembers(ctx *middleware.Context, req models.ClusterM
var toJoin []string

for _, memberNode := range cluster.Manager.MemberList() {
memberNames[memberNode.Name] = struct{}{}
memberNames[memberNode.GetName()] = struct{}{}
}

for _, peerName := range req.Members {
Expand Down Expand Up @@ -231,11 +231,11 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
wg.Add(1)
go func(peer cluster.Node) {
defer wg.Done()
log.Debug("HTTP Render querying %s%s", peer.Name, path)
log.Debug("HTTP Render querying %s%s", peer.GetName(), path)
buf, err := peer.Post(reqCtx, name, path, data)
if err != nil {
cancel()
log.Error(4, "HTTP Render error querying %s%s: %q", peer.Name, path, err)
log.Error(4, "HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
}
responses <- struct {
data PeerResponse
Expand Down
10 changes: 5 additions & 5 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
if req.Node.IsLocal() {
localReqs = append(localReqs, req)
} else {
remoteReqs[req.Node.Name] = append(remoteReqs[req.Node.Name], req)
remoteReqs[req.Node.GetName()] = append(remoteReqs[req.Node.GetName()], req)
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
wg := sync.WaitGroup{}
wg.Add(len(remoteReqs))
for _, nodeReqs := range remoteReqs {
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.Name)
log.Debug("DP getTargetsRemote: handling %d reqs from %s", len(nodeReqs), nodeReqs[0].Node.GetName())
go func(reqs []models.Req) {
defer wg.Done()
node := reqs[0].Node
Expand All @@ -218,11 +218,11 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
_, err = resp.UnmarshalMsg(buf)
if err != nil {
cancel()
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.Name, err)
log.Error(3, "DP getTargetsRemote: error unmarshaling body from %s/getdata: %q", node.GetName(), err)
responses <- getTargetsResp{nil, err}
return
}
log.Debug("DP getTargetsRemote: %s returned %d series", node.Name, len(resp.Series))
log.Debug("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
responses <- getTargetsResp{resp.Series, nil}
}(nodeReqs)
}
Expand Down Expand Up @@ -563,7 +563,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
}
// it's important that the itgens get added in chronological order,
// currently we rely on cassandra returning results in order
s.Cache.Add(key, prevts, itgen)
s.Cache.Add(key, ctx.Key, prevts, itgen)
prevts = itgen.Ts
iters = append(iters, *it)
}
Expand Down
Loading