Skip to content

Commit

Permalink
staging manager: drop by prefix (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Dec 14, 2020
1 parent 2a70506 commit 77ca99e
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 47 deletions.
11 changes: 7 additions & 4 deletions graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,17 @@ type StagingManager interface {
// Set writes a value under the given staging token and key.
Set(ctx context.Context, st StagingToken, key Key, value Value) error

// Delete deletes a value by staging token and key
Delete(ctx context.Context, st StagingToken, key Key) error

// List returns a ValueIterator for the given staging token
List(ctx context.Context, st StagingToken) (ValueIterator, error)

// DropKey clears a value by staging token and key
DropKey(ctx context.Context, st StagingToken, key Key) error

// Drop clears the given staging area
Drop(ctx context.Context, st StagingToken) error

// DropByPrefix drops all keys starting with the given prefix, from the given staging area
DropByPrefix(ctx context.Context, st StagingToken, prefix Key) error
}

var (
Expand Down Expand Up @@ -679,7 +682,7 @@ func (g *graveler) Delete(ctx context.Context, repositoryID RepositoryID, branch
if err != nil {
return err
}
return g.StagingManager.Delete(ctx, branch.stagingToken, key)
return g.StagingManager.DropKey(ctx, branch.stagingToken, key)
}

func (g *graveler) List(ctx context.Context, repositoryID RepositoryID, ref Ref, prefix, from, delimiter Key) (ListingIterator, error) {
Expand Down
6 changes: 5 additions & 1 deletion graveler/graveler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type stagingMock struct {
stagingToken graveler.StagingToken
}

func (s *stagingMock) DropByPrefix(_ context.Context, _ graveler.StagingToken, _ graveler.Key) error {
return nil
}

func (s *stagingMock) Drop(_ context.Context, _ graveler.StagingToken) error {
if s.err != nil {
return s.err
Expand All @@ -82,7 +86,7 @@ func (s *stagingMock) Set(_ context.Context, _ graveler.StagingToken, _ graveler
return nil
}

func (s *stagingMock) Delete(_ context.Context, _ graveler.StagingToken, _ graveler.Key) error {
func (s *stagingMock) DropKey(_ context.Context, _ graveler.StagingToken, _ graveler.Key) error {
return nil
}

Expand Down
34 changes: 33 additions & 1 deletion graveler/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package graveler

import (
"context"
"math"

sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)
Expand Down Expand Up @@ -46,7 +48,7 @@ func (p *stagingManager) Set(ctx context.Context, st StagingToken, key Key, valu
return err
}

func (p *stagingManager) Delete(ctx context.Context, st StagingToken, key Key) error {
func (p *stagingManager) DropKey(ctx context.Context, st StagingToken, key Key) error {
_, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
return tx.Exec("DELETE FROM kv_staging WHERE staging_token=$1 AND key=$2", st, key)
}, p.txOpts(ctx)...)
Expand All @@ -64,6 +66,36 @@ func (p *stagingManager) Drop(ctx context.Context, st StagingToken) error {
return err
}

func (p *stagingManager) DropByPrefix(ctx context.Context, st StagingToken, prefix Key) error {
upperBound := getUpperBoundForPrefix(prefix)
builder := sq.Delete("kv_staging").Where(sq.Eq{"staging_token": st}).Where("key >= ?::bytea", prefix)
_, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
if upperBound != nil {
builder = builder.Where("key < ?::bytea", upperBound)
}
query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql()
if err != nil {
return nil, err
}
return tx.Exec(query, args...)
}, p.txOpts(ctx)...)
return err
}

func getUpperBoundForPrefix(prefix Key) Key {
idx := len(prefix) - 1
for idx >= 0 && prefix[idx] == math.MaxUint8 {
idx--
}
if idx == -1 {
return nil
}
upperBound := make(Key, idx+1)
copy(upperBound, prefix[:idx+1])
upperBound[idx]++
return upperBound
}

func (p *stagingManager) txOpts(ctx context.Context, opts ...db.TxOpt) []db.TxOpt {
o := []db.TxOpt{
db.WithContext(ctx),
Expand Down
Loading

0 comments on commit 77ca99e

Please sign in to comment.