Skip to content

Commit

Permalink
delete series api and purger to purge data requested for deletion (#2103
Browse files Browse the repository at this point in the history
)

* delete series api and purger to purge data requested for deletion

delete_store manages delete requests and purge plan records in stores
purger builds delete plans(delete requests sharded by day) and executes them paralelly
only one requests per user would be in execution phase at a time
delete requests gets picked up for deletion after they get older by more than a day

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* moved delete store creation from initStore to initPurger, which is the only component that needs it

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* implemented new methods in MockStorage for writing tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* removed DeleteClient and using IndexClient in DeleteStore

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* refactored some code, added some tests for chunk store and purger

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* add some tests and fixed some issues found during tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* rebased and fixed conflicts

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated route for delete handler to look same as prometheus

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* added test for purger restarts and fixed some issues

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* suggested changes from PR review and fixed linter, tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* fixed panic in modules when stopping purger

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* config changes suggested in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated config doc

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* updated changelog

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* some changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* made init in Purger public to call it from modules to fail early

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Mar 3, 2020
1 parent de30605 commit 1a9d546
Show file tree
Hide file tree
Showing 16 changed files with 2,924 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [FEATURE] Add /config HTTP endpoint which exposes the current Cortex configuration as YAML. #2165
* [FEATURE] Allow Prometheus remote write directly to ingesters. #1491
* [FEATURE] Add flag `-experimental.tsdb.stripe-size` to expose TSDB stripe size option. #2185
* [FEATURE] Experimental Delete Series: Added support for Deleting Series with Prometheus style API. Needs to be enabled first by setting `--purger.enable` to `true`. Deletion only supported when using `boltdb` and `filesystem` as index and object store respectively. Support for other stores to follow in separate PRs #2103
* [ENHANCEMENT] Alertmanager: Expose Per-tenant alertmanager metrics #2124
* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to gauge the number of valid and invalid configs. #2125
* [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093
Expand Down
31 changes: 31 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ Where default_value is the value to use if the environment variable is undefined
# storage.
[compactor: <compactor_config>]

# The purger_config configures the purger which takes care of delete requests
[purger: <purger_config>]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down Expand Up @@ -1523,6 +1526,15 @@ index_queries_cache_config:
# The fifo_cache_config configures the local in-memory cache.
# The CLI flags prefix for this block config is: store.index-cache-read
[fifocache: <fifo_cache_config>]

delete_store:
# Store for keeping delete request
# CLI flag: -deletes.store
[store: <string> | default = ""]

# Name of the table which stores delete requests
# CLI flag: -deletes.requests-table-name
[requests_table_name: <string> | default = "delete_requests"]
```
### `chunk_store_config`
Expand Down Expand Up @@ -2337,3 +2349,22 @@ sharding_ring:
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m0s]
```

### `purger_config`

The `purger_config` configures the purger which takes care of delete requests

```yaml
# Enable purger to allow deletion of series. Be aware that Delete series feature
# is still experimental
# CLI flag: -purger.enable
[enable: <boolean> | default = false]
# Number of workers executing delete plans in parallel
# CLI flag: -purger.num-workers
[num_workers: <int> | default = 2]
# Name of the object store to use for storing delete plans
# CLI flag: -purger.object-store-type
[object_store_type: <string> | default = ""]
```
6 changes: 6 additions & 0 deletions docs/configuration/single-process-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@ storage:

filesystem:
directory: /tmp/cortex/chunks

delete_store:
store: boltdb

purger:
object_store_type: filesystem
3 changes: 2 additions & 1 deletion pkg/chunk/aws/dynamodb_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aws
import (
"context"
"testing"
"time"

"github.com/prometheus/common/model"

Expand All @@ -29,7 +30,7 @@ func TestChunksPartialError(t *testing.T) {
}
ctx := context.Background()
// Create more chunks than we can read in one batch
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now())
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now().Add(-time.Hour), model.Now())
require.NoError(t, err)
err = client.PutChunks(ctx, chunks)
require.NoError(t, err)
Expand Down
283 changes: 283 additions & 0 deletions pkg/chunk/delete_requests_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package chunk

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
"hash/fnv"
"strconv"
"strings"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)

type DeleteRequestStatus string

const (
StatusReceived DeleteRequestStatus = "received"
StatusBuildingPlan DeleteRequestStatus = "buildingPlan"
StatusDeleting DeleteRequestStatus = "deleting"
StatusProcessed DeleteRequestStatus = "processed"

separator = "\000" // separator for series selectors in delete requests
)

var (
pendingDeleteRequestStatuses = []DeleteRequestStatus{StatusReceived, StatusBuildingPlan, StatusDeleting}

ErrDeleteRequestNotFound = errors.New("could not find matching delete request")
)

// DeleteRequest holds all the details about a delete request
type DeleteRequest struct {
RequestID string `json:"request_id"`
UserID string `json:"-"`
StartTime model.Time `json:"start_time"`
EndTime model.Time `json:"end_time"`
Selectors []string `json:"selectors"`
Status DeleteRequestStatus `json:"status"`
Matchers [][]*labels.Matcher `json:"-"`
CreatedAt model.Time `json:"created_at"`
}

// DeleteStore provides all the methods required to manage lifecycle of delete request and things related to it
type DeleteStore struct {
cfg DeleteStoreConfig
indexClient IndexClient
}

// DeleteStoreConfig holds configuration for delete store
type DeleteStoreConfig struct {
Store string `yaml:"store"`
RequestsTableName string `yaml:"requests_table_name"`
}

// RegisterFlags adds the flags required to configure this flag set.
func (cfg *DeleteStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Store, "deletes.store", "", "Store for keeping delete request")
f.StringVar(&cfg.RequestsTableName, "deletes.requests-table-name", "delete_requests", "Name of the table which stores delete requests")
}

// NewDeleteStore creates a store for managing delete requests
func NewDeleteStore(cfg DeleteStoreConfig, indexClient IndexClient) (*DeleteStore, error) {
ds := DeleteStore{
cfg: cfg,
indexClient: indexClient,
}

return &ds, nil
}

// Add creates entries for a new delete request
func (ds *DeleteStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
requestID := generateUniqueID(userID, selectors)

for {
_, err := ds.GetDeleteRequest(ctx, userID, string(requestID))
if err != nil {
if err == ErrDeleteRequestNotFound {
break
}
return err
}

// we have a collision here, lets recreate a new requestID and check for collision
time.Sleep(time.Millisecond)
requestID = generateUniqueID(userID, selectors)
}

// userID, requestID
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
// We don't want to set anything in hash key here since we would want to find delete requests by just status
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(ds.cfg.RequestsTableName, "", []byte(userIDAndRequestID), []byte(StatusReceived))

// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(model.Now()), int64(startTime), int64(endTime))
writeBatch.Add(ds.cfg.RequestsTableName, userIDAndRequestID, []byte(rangeValue), []byte(strings.Join(selectors, separator)))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

// GetDeleteRequestsByStatus returns all delete requests for given status
func (ds *DeleteStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{{TableName: ds.cfg.RequestsTableName, ValueEqual: []byte(status)}})
}

// GetDeleteRequestsForUserByStatus returns all delete requests for a user with given status
func (ds *DeleteStore) GetDeleteRequestsForUserByStatus(ctx context.Context, userID string, status DeleteRequestStatus) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userID), ValueEqual: []byte(status)},
})
}

// GetAllDeleteRequestsForUser returns all delete requests for a user
func (ds *DeleteStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
return ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userID)},
})
}

// UpdateStatus updates status of a delete request
func (ds *DeleteStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(ds.cfg.RequestsTableName, "", []byte(userIDAndRequestID), []byte(newStatus))

return ds.indexClient.BatchWrite(ctx, writeBatch)
}

// GetDeleteRequest returns delete request with given requestID
func (ds *DeleteStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)

deleteRequests, err := ds.queryDeleteRequests(ctx, []IndexQuery{
{TableName: ds.cfg.RequestsTableName, RangeValuePrefix: []byte(userIDAndRequestID)},
})

if err != nil {
return nil, err
}

if len(deleteRequests) == 0 {
return nil, ErrDeleteRequestNotFound
}

return &deleteRequests[0], nil
}

// GetPendingDeleteRequestsForUser returns all delete requests for a user which are not processed
func (ds *DeleteStore) GetPendingDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
pendingDeleteRequests := []DeleteRequest{}
for _, status := range pendingDeleteRequestStatuses {
deleteRequests, err := ds.GetDeleteRequestsForUserByStatus(ctx, userID, status)
if err != nil {
return nil, err
}

pendingDeleteRequests = append(pendingDeleteRequests, deleteRequests...)
}

return pendingDeleteRequests, nil
}

func (ds *DeleteStore) queryDeleteRequests(ctx context.Context, deleteQuery []IndexQuery) ([]DeleteRequest, error) {
deleteRequests := []DeleteRequest{}
err := ds.indexClient.QueryPages(ctx, deleteQuery, func(query IndexQuery, batch ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
for itr.Next() {
userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue()))

deleteRequests = append(deleteRequests, DeleteRequest{
UserID: userID,
RequestID: requestID,
Status: DeleteRequestStatus(itr.Value()),
})
}
return true
})
if err != nil {
return nil, err
}

for i, deleteRequest := range deleteRequests {
deleteRequestQuery := []IndexQuery{{TableName: ds.cfg.RequestsTableName, HashValue: fmt.Sprintf("%s:%s", deleteRequest.UserID, deleteRequest.RequestID)}}

var parseError error
err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query IndexQuery, batch ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
itr.Next()

deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest)
if err != nil {
parseError = err
return false
}

deleteRequest.Selectors = strings.Split(string(itr.Value()), separator)
deleteRequests[i] = deleteRequest

return true
})

if err != nil {
return nil, err
}

if parseError != nil {
return nil, parseError
}
}

return deleteRequests, nil
}

func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest) (DeleteRequest, error) {
hexParts := strings.Split(string(rangeValue), ":")
if len(hexParts) != 3 {
return deleteRequest, errors.New("invalid key in parsing delete request lookup response")
}

createdAt, err := strconv.ParseInt(hexParts[0], 16, 64)
if err != nil {
return deleteRequest, err
}

from, err := strconv.ParseInt(hexParts[1], 16, 64)
if err != nil {
return deleteRequest, err

}
through, err := strconv.ParseInt(hexParts[2], 16, 64)
if err != nil {
return deleteRequest, err

}

deleteRequest.CreatedAt = model.Time(createdAt)
deleteRequest.StartTime = model.Time(from)
deleteRequest.EndTime = model.Time(through)

return deleteRequest, nil
}

// An id is useful in managing delete requests
func generateUniqueID(orgID string, selectors []string) []byte {
uniqueID := fnv.New32()
_, _ = uniqueID.Write([]byte(orgID))

timeNow := make([]byte, 8)
binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano()))
_, _ = uniqueID.Write(timeNow)

for _, selector := range selectors {
_, _ = uniqueID.Write([]byte(selector))
}

return encodeUniqueID(uniqueID.Sum32())
}

func encodeUniqueID(t uint32) []byte {
throughBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throughBytes, t)
encodedThroughBytes := make([]byte, 8)
hex.Encode(encodedThroughBytes, throughBytes)
return encodedThroughBytes
}

func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) {
lastIndex := strings.LastIndex(rangeValue, ":")

userID = rangeValue[:lastIndex]
requestID = rangeValue[lastIndex+1:]

return
}
Loading

0 comments on commit 1a9d546

Please sign in to comment.