Skip to content

Commit

Permalink
Create success tasks for nested directories
Browse files Browse the repository at this point in the history
  • Loading branch information
arielshaqed committed Oct 14, 2020
1 parent e406b8e commit fc2b960
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 18 deletions.
89 changes: 73 additions & 16 deletions export/tasks_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"

rinqueue "github.com/erikdubbelboer/ringqueue"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/parade"
)
Expand Down Expand Up @@ -48,18 +49,41 @@ func dirname(path string) string {
return path[0:i]
}

type matchCounter struct {
pred func(s string) bool
m map[string]int
type dirMatchCache struct {
pred func(path string) bool
upMatchCache map[string]*string
}

// matchAndCount tests s and returns true and increments its counter if it matches.
func (mc *matchCounter) matchAndCount(s string) bool {
if !mc.pred(s) {
return false
func (dmc *dirMatchCache) lookup(filename string) (string, bool) {
dir := filename
var ret *string
for {
dir = dirname(dir)
var ok bool
if ret, ok = dmc.upMatchCache[dir]; ok {
break
}
if dmc.pred(dir) {
copy := dir
ret = &copy
break
}
if dir == "" {
break
}
}
mc.m[s]++
return true
for dir = dirname(filename); dir != "" && (ret == nil || dir != *ret); dir = dirname(dir) {
dmc.upMatchCache[dir] = ret
}

if ret == nil {
return "", false
}
return *ret, true
}

func makeDirMatchCache(pred func(path string) bool) *dirMatchCache {
return &dirMatchCache{pred: pred, upMatchCache: make(map[string]*string)}
}

// generateTasksFromDiffs converts diffs into many tasks that depend on startTaskID, with a
Expand Down Expand Up @@ -90,7 +114,11 @@ func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Dif
}
totalTasks := 0

successForDirectory := matchCounter{pred: generateSuccessFor, m: make(map[string]int)}
successDirectoriesCache := makeDirMatchCache(generateSuccessFor)
successForDirectory := make(map[string]struct {
count int
toSignal []parade.TaskID
})

makeTaskForDiff := func(diff *catalog.Difference) (parade.TaskData, error) {
var (
Expand All @@ -101,11 +129,16 @@ func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Dif
err error
)

d := dirname(diff.Path)
if successForDirectory.matchAndCount(d) {
if d, ok := successDirectoriesCache.lookup(diff.Path); ok {
s := successForDirectory[d]
s.count++
successForDirectory[d] = s

toSignal = append(toSignal, makeSuccessTaskID(d))
}
toSignal = append(toSignal, finishedTaskID)
if len(toSignal) == 0 {
toSignal = []parade.TaskID{finishedTaskID}
}

switch diff.Type {
case catalog.DifferenceTypeAdded:
Expand Down Expand Up @@ -163,8 +196,26 @@ func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Dif
ret = append(ret, task)
}

// Add higher-level success directories, e.g. "a/b-success" for "a/b-success/c/d-success/x".
q := rinqueue.NewRingqueue()
for successDirectory := range successForDirectory {
q.Add(successDirectory)
}
for d, ok := q.Remove(); ok; d, ok = q.Remove() {
if upD, ok := successDirectoriesCache.lookup(d.(string)); ok {
s := successForDirectory[upD]
s.count++
successForDirectory[upD] = s

s = successForDirectory[d.(string)]
s.toSignal = append(s.toSignal, makeSuccessTaskID(upD))
successForDirectory[d.(string)] = s
}
}

// Create any needed "success file" tasks
for successDirectory, td := range successForDirectory.m {
for successDirectory, td := range successForDirectory {
fmt.Println("[DEBUG] success directory", successDirectory, td)
successPath := fmt.Sprintf("%s/%s", successDirectory, successFilename)
data := SuccessData{
File: makeDestination(successPath),
Expand All @@ -176,13 +227,19 @@ func GenerateTasksFromDiffs(exportID string, dstPrefix string, diffs catalog.Dif
bodyStr := string(body)
totalDependencies := td // copy to get new address each time

toSignal := totalDependencies.toSignal
if len(toSignal) == 0 {
toSignal = []parade.TaskID{finishedTaskID}
}

ret = append(ret, parade.TaskData{
ID: parade.TaskID(makeSuccessTaskID(successPath)),
ID: parade.TaskID(makeSuccessTaskID(successDirectory)),
Action: TouchAction,
Body: &bodyStr,
StatusCode: parade.TaskPending,
MaxTries: &numTries,
TotalDependencies: &totalDependencies,
TotalDependencies: &totalDependencies.count,
ToSignalAfter: toSignal,
})
totalTasks++
}
Expand Down
31 changes: 29 additions & 2 deletions export/tasks_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ func TestTasksGenerator_SuccessFiles(t *testing.T) {
}, {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "a/plain/1", PhysicalAddress: "/remove2"},
}, {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "a/success/sub/success/11", PhysicalAddress: "/remove11"},
}, {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "a/success/sub/success/12", PhysicalAddress: "/remove12"},
}, {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "b/success/1", PhysicalAddress: "/remove3"},
Expand All @@ -276,7 +282,18 @@ func TestTasksGenerator_SuccessFiles(t *testing.T) {
}{
{before: "foo:delete:/remove1", after: "foo:make-success:a/success"},
{before: "foo:delete:/remove1", after: "foo:finished"},
{before: "foo:delete:/remove2", after: "foo:make-success/a/plain", avoid: true},

{before: "foo:delete:/remove2", after: "foo:make-success:a/plain", avoid: true},
{before: "foo:delete:/remove2", after: "foo:finished"},

{before: "foo:delete:/remove11", after: "foo:make-success:a/success/sub/success"},
{before: "foo:delete:/remove11", after: "foo:finished"},

{before: "foo:delete:/remove12", after: "foo:make-success:a/success/sub/success"},
{before: "foo:delete:/remove12", after: "foo:finished"},

{before: "foo:make-success:a/success/sub/success", after: "foo:make-success:a/success"},

{before: "foo:delete:/remove3", after: "foo:make-success:b/success"},
{before: "foo:delete:/remove3", after: "foo:finished"},
{before: "foo:delete:/remove4", after: "foo:make-success:a/success"},
Expand Down Expand Up @@ -311,5 +328,15 @@ func TestTasksGenerator_SuccessFiles(t *testing.T) {
}
}

// TODO(ariels): Verify no _other_ deps?
// Verify the right number of dependencies appeared. Above we verified all indirect
// dependencies. Direct dependencies exactly form a tree, so every task signals one
// other task... except for the finishing task.
for _, task := range tasksWithIDs {
if task.Action != export.DoneAction && len(task.ToSignalAfter) != 1 {
t.Errorf("expected single task to signal after %+v", task)
}
if task.Action == export.DoneAction && len(task.ToSignalAfter) != 0 {
t.Errorf("expected no tasks to signal after %+v", task)
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654 // indirect
github.com/dlmiddlecote/sqlstats v1.0.1
github.com/erikdubbelboer/ringqueue v0.0.0-20160827083827-3b6233a3f71d
github.com/go-openapi/errors v0.19.6
github.com/go-openapi/loads v0.19.5
github.com/go-openapi/runtime v0.19.20
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/erikdubbelboer/ringqueue v0.0.0-20160827083827-3b6233a3f71d h1:wl1qJ9Srzwnz4LBLMeiR2aOe1bthOQUnYNYV6gRhDp8=
github.com/erikdubbelboer/ringqueue v0.0.0-20160827083827-3b6233a3f71d/go.mod h1:WmwDlji5j/ObfYLTskvSTMJg/29nXOeBd8Ei1NQ+lew=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
Expand Down

0 comments on commit fc2b960

Please sign in to comment.