Skip to content

Commit

Permalink
delete series api and purger to purge data requested for deletion (gr…
Browse files Browse the repository at this point in the history
…afana#2103)

* delete series api and purger to purge data requested for deletion

delete_store manages delete requests and purge plan records in stores
purger builds delete plans(delete requests sharded by day) and executes them paralelly
only one requests per user would be in execution phase at a time
delete requests gets picked up for deletion after they get older by more than a day

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* moved delete store creation from initStore to initPurger, which is the only component that needs it

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* implemented new methods in MockStorage for writing tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* removed DeleteClient and using IndexClient in DeleteStore

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* refactored some code, added some tests for chunk store and purger

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* add some tests and fixed some issues found during tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* rebased and fixed conflicts

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated route for delete handler to look same as prometheus

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* added test for purger restarts and fixed some issues

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* suggested changes from PR review and fixed linter, tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* fixed panic in modules when stopping purger

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* config changes suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated config doc

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated changelog

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* some changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* made init in Purger public to call it from modules to fail early

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Mar 3, 2020
1 parent 874ded1 commit e4a94bc
Show file tree
Hide file tree
Showing 10 changed files with 2,786 additions and 14 deletions.
3 changes: 2 additions & 1 deletion aws/dynamodb_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aws
import (
"context"
"testing"
"time"

"github.com/prometheus/common/model"

Expand All @@ -29,7 +30,7 @@ func TestChunksPartialError(t *testing.T) {
}
ctx := context.Background()
// Create more chunks than we can read in one batch
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now())
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now().Add(-time.Hour), model.Now())
require.NoError(t, err)
err = client.PutChunks(ctx, chunks)
require.NoError(t, err)
Expand Down
283 changes: 283 additions & 0 deletions delete_requests_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package chunk

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
"hash/fnv"
"strconv"
"strings"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)

type DeleteRequestStatus string

const (
StatusReceived DeleteRequestStatus = "received"
StatusBuildingPlan DeleteRequestStatus = "buildingPlan"
StatusDeleting DeleteRequestStatus = "deleting"
StatusProcessed DeleteRequestStatus = "processed"

separator = "\000" // separator for series selectors in delete requests
)

var (
pendingDeleteRequestStatuses = []DeleteRequestStatus{StatusReceived, StatusBuildingPlan, StatusDeleting}

ErrDeleteRequestNotFound = errors.New("could not find matching delete request")
)

// DeleteRequest holds all the details about a delete request
type DeleteRequest struct {
RequestID string `json:"request_id"`
UserID string `json:"-"`
StartTime model.Time `json:"start_time"`
EndTime model.Time `json:"end_time"`
Selectors []string `json:"selectors"`
Status DeleteRequestStatus `json:"status"`
Matchers [][]*labels.Matcher `json:"-"`
CreatedAt model.Time `json:"created_at"`
}

// DeleteStore provides all the methods required to manage lifecycle of delete request and things related to it
type DeleteStore struct {
cfg DeleteStoreConfig
indexClient IndexClient
}

// DeleteStoreConfig holds configuration for delete store
type DeleteStoreConfig struct {
Store string `yaml:"store"`
RequestsTableName string `yaml:"requests_table_name"`
}

// RegisterFlags adds the flags required to configure this flag set.
func (cfg *DeleteStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Store, "deletes.store", "", "Store for keeping delete request")
f.StringVar(&cfg.RequestsTableName, "deletes.requests-table-name", "delete_requests", "Name of the table which stores delete requests")
}

// NewDeleteStore creates a store for managing delete requests
func NewDeleteStore(cfg DeleteStoreConfig, indexClient IndexClient) (*DeleteStore, error) {
ds := DeleteStore{
cfg: cfg,
indexClient: indexClient,
}

return &ds, nil
}

// Add creates entries for a new delete request
func (ds *DeleteStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
requestID := generateUniqueID(userID, selectors)

for {
_, err := ds.GetDeleteRequest(ctx, userID, string(requestID))
if err != nil {
if err == ErrDeleteRequestNotFound {
break
}
return err
}

// we have a collision here, lets recreate a new requestID and check for collision
time.Sleep(time.Millisecond)
requestID = generateUniqueID(userID, selectors)
}

// userID, requestID
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
// We don't want to set anything in hash key here since we would want to find delete requests by just status
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(ds.cfg.RequestsTableName, "", []byte(userIDAndRequestID), []byte(StatusReceived))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(model.Now()), int64(startTime), int64(endTime))
writeBatch.Add(ds.cfg.RequestsTableName, userIDAndRequestID, []byte(rangeValue), []byte(strings.Join(selectors, separator)))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

// GetDeleteRequestsByStatus returns all delete requests for given status
func (ds *DeleteStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{{TableName: ds.cfg.RequestsTableName, ValueEqual: []byte(status)}})
}

// GetDeleteRequestsForUserByStatus returns all delete requests for a user with given status
func (ds *DeleteStore) GetDeleteRequestsForUserByStatus(ctx context.Context, userID string, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userID), ValueEqual: []byte(status)},
})
}

// GetAllDeleteRequestsForUser returns all delete requests for a user
func (ds *DeleteStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userID)},
})
}

// UpdateStatus updates status of a delete request
func (ds *DeleteStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(ds.cfg.RequestsTableName, "", []byte(userIDAndRequestID), []byte(newStatus))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

// GetDeleteRequest returns delete request with given requestID
func (ds *DeleteStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

deleteRequests, err := ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userIDAndRequestID)},
})

if err != nil {
return nil, err
}

if len(deleteRequests) == 0 {
return nil, ErrDeleteRequestNotFound
}

return &deleteRequests[0], nil
}

// GetPendingDeleteRequestsForUser returns all delete requests for a user which are not processed
func (ds *DeleteStore) GetPendingDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
pendingDeleteRequests := []DeleteRequest{}
for _, status := range pendingDeleteRequestStatuses {
deleteRequests, err := ds.GetDeleteRequestsForUserByStatus(ctx, userID, status)
if err != nil {
return nil, err
}

pendingDeleteRequests = append(pendingDeleteRequests, deleteRequests...)
}

return pendingDeleteRequests, nil
}

func (ds *DeleteStore) queryDeleteRequests(ctx context.Context, deleteQuery []IndexQuery) ([]DeleteRequest, error) {
deleteRequests := []DeleteRequest{}
err := ds.indexClient.QueryPages(ctx, deleteQuery, func(query IndexQuery, batch ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
for itr.Next() {
userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue()))

deleteRequests = append(deleteRequests, DeleteRequest{
UserID: userID,
RequestID: requestID,
Status: DeleteRequestStatus(itr.Value()),
})
}
return true
})
if err != nil {
return nil, err
}

for i, deleteRequest := range deleteRequests {
deleteRequestQuery := []IndexQuery{{TableName: ds.cfg.RequestsTableName, HashValue: fmt.Sprintf("%s:%s", deleteRequest.UserID, deleteRequest.RequestID)}}

var parseError error
err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query IndexQuery, batch ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
itr.Next()

deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest)
if err != nil {
parseError = err
return false
}

deleteRequest.Selectors = strings.Split(string(itr.Value()), separator)
deleteRequests[i] = deleteRequest

return true
})

if err != nil {
return nil, err
}

if parseError != nil {
return nil, parseError
}
}

return deleteRequests, nil
}

func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest) (DeleteRequest, error) {
hexParts := strings.Split(string(rangeValue), ":")
if len(hexParts) != 3 {
return deleteRequest, errors.New("invalid key in parsing delete request lookup response")
}

createdAt, err := strconv.ParseInt(hexParts[0], 16, 64)
if err != nil {
return deleteRequest, err
}

from, err := strconv.ParseInt(hexParts[1], 16, 64)
if err != nil {
return deleteRequest, err

}
through, err := strconv.ParseInt(hexParts[2], 16, 64)
if err != nil {
return deleteRequest, err

}

deleteRequest.CreatedAt = model.Time(createdAt)
deleteRequest.StartTime = model.Time(from)
deleteRequest.EndTime = model.Time(through)

return deleteRequest, nil
}

// An id is useful in managing delete requests
func generateUniqueID(orgID string, selectors []string) []byte {
uniqueID := fnv.New32()
_, _ = uniqueID.Write([]byte(orgID))

timeNow := make([]byte, 8)
binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano()))
_, _ = uniqueID.Write(timeNow)

for _, selector := range selectors {
_, _ = uniqueID.Write([]byte(selector))
}

return encodeUniqueID(uniqueID.Sum32())
}

func encodeUniqueID(t uint32) []byte {
throughBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throughBytes, t)
encodedThroughBytes := make([]byte, 8)
hex.Encode(encodedThroughBytes, throughBytes)
return encodedThroughBytes
}

func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) {
lastIndex := strings.LastIndex(rangeValue, ":")

userID = rangeValue[:lastIndex]
requestID = rangeValue[lastIndex+1:]

return
}
Loading

0 comments on commit e4a94bc

Please sign in to comment.