From bb27f4699a3c51f55d31ff245adce817cfb7dbec Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 13 Oct 2020 10:45:28 +0300 Subject: [PATCH 01/13] add export action and action manager draft --- action/action_manager.go | 143 +++++++++++++++++++++++++++ action/action_manager_test.go | 179 ++++++++++++++++++++++++++++++++++ action/handler.go | 14 +++ export/export_handler.go | 91 +++++++++++++++++ export/export_handler_test.go | 81 +++++++++++++++ 5 files changed, 508 insertions(+) create mode 100644 action/action_manager.go create mode 100644 action/action_manager_test.go create mode 100644 action/handler.go create mode 100644 export/export_handler.go create mode 100644 export/export_handler_test.go diff --git a/action/action_manager.go b/action/action_manager.go new file mode 100644 index 00000000000..582934ca27a --- /dev/null +++ b/action/action_manager.go @@ -0,0 +1,143 @@ +package action + +import ( + "github.com/treeverse/lakefs/logging" + "github.com/treeverse/lakefs/parade" + "sync" + "time" +) + +const ( + defaultWorkers = 5 + defaultChannelSize = 10000 + defaultMaxTasks = 100000 + defaultWaitTime = time.Millisecond * 10 + defaultMaxDuration = time.Minute * 30 // Todo(guys): change this +) + +type Properties struct { + workers int + channelSize int + maxTasks int + waitTime *time.Duration + maxDuration *time.Duration +} + +type Action struct { + properties *Properties + handler TaskHandler + parade parade.Parade + quit chan bool + actionGroup *workerPool +} + +func setDefaultProperties(properties *Properties) *Properties { + if properties == nil { + properties = &Properties{} + } + if properties.workers == 0 { + properties.workers = defaultWorkers + } + if properties.channelSize == 0 { + properties.channelSize = defaultChannelSize + } + if properties.maxTasks == 0 { + properties.maxTasks = defaultMaxTasks + } + if properties.waitTime == nil { + waitTime := defaultWaitTime + properties.waitTime = &waitTime + } + if properties.maxDuration == nil { + maxDuration := defaultMaxDuration + properties.maxDuration = &maxDuration + } + return properties +} + +func NewAction(handler TaskHandler, parade parade.Parade, properties *Properties) *Action { + a := &Action{ + handler: handler, + parade: parade, + properties: setDefaultProperties(properties), + quit: nil, + } + a.start() + return a +} + +func (a *Action) Close() { + a.quit <- true + a.actionGroup.Close() +} + +func (a *Action) start() { + taskChannel := make(chan parade.OwnedTaskData, a.properties.channelSize) + a.quit = make(chan bool) + a.actionGroup = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.workers) + go func() { + for { + select { + case <-a.quit: + return + default: + ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.maxTasks, a.handler.Actions(), a.properties.maxDuration) + if err != nil { + logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err) + // Todo(guys): handle error case better ( with growing sleep periods and returning eventually + time.Sleep(*a.properties.waitTime) + } + for _, ot := range ownedTasks { + a.actionGroup.ch <- ot + } + if len(ownedTasks) == 0 { + time.Sleep(*a.properties.waitTime) + } + } + } + }() +} + +type workerPool struct { + handler TaskHandler + ch chan parade.OwnedTaskData + exit chan int + workers int + wg sync.WaitGroup + parade parade.Parade +} + +func newWorkerPool(handler TaskHandler, ch chan parade.OwnedTaskData, parade parade.Parade, workers int) *workerPool { + a := &workerPool{ + handler: handler, + ch: ch, + exit: make(chan int), + workers: workers, + wg: sync.WaitGroup{}, + parade: parade, + } + a.start() + return a +} + +func (a *workerPool) Close() { + close(a.exit) + close(a.ch) + a.wg.Wait() +} + +func (a *workerPool) start() { + a.wg.Add(a.workers) + for i := 0; i < a.workers; i++ { + go func() { + defer a.wg.Done() + for task := range a.ch { + res := a.handler.Handle(task.Action, task.Body) + err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode) + if err != nil { + logging.Default().WithField("action", task.Action).Errorf("failed to return task: %w", err) + } + } + }() + } +} diff --git a/action/action_manager_test.go b/action/action_manager_test.go new file mode 100644 index 00000000000..bc067897b98 --- /dev/null +++ b/action/action_manager_test.go @@ -0,0 +1,179 @@ +package action + +import ( + "github.com/treeverse/lakefs/parade" + "sync/atomic" + "testing" + "time" +) + +type ownTaskResult struct { + tasks int + action string +} + +type mockParade struct { + parade.Parade + ownCalled *int32 + returnCalled *int32 + tasks []ownTaskResult +} + +func newMockParade(tasks []ownTaskResult) mockParade { + ownCalled := int32(0) + returnCalled := int32(0) + return mockParade{ + ownCalled: &ownCalled, + returnCalled: &returnCalled, + tasks: tasks, + } +} + +func getKTasks(k int, action string) []parade.OwnedTaskData { + var tasks []parade.OwnedTaskData + for i := 0; i < k; i++ { + task := parade.OwnedTaskData{ + ID: parade.TaskID(i), + Token: parade.PerformanceToken{}, + Action: action, + Body: nil, + } + tasks = append(tasks, task) + } + return tasks +} + +func (m mockParade) OwnTasks(_ parade.ActorID, _ int, _ []string, _ *time.Duration) ([]parade.OwnedTaskData, error) { + cur := atomic.AddInt32(m.ownCalled, 1) + if cur <= int32(len(m.tasks)) { + return getKTasks(m.tasks[cur-1].tasks, m.tasks[cur-1].action), nil + } + return []parade.OwnedTaskData{}, nil +} + +func (m mockParade) ReturnTask(_ parade.TaskID, _ parade.PerformanceToken, _ string, _ parade.TaskStatusCodeValue) error { + atomic.AddInt32(m.returnCalled, 1) + return nil +} + +type mockHandler struct { + handleCalled *int32 +} + +func newMockHandler() mockHandler { + handleCalled := int32(0) + return mockHandler{ + handleCalled: &handleCalled, + } +} + +func (m mockHandler) Handle(_ string, _ *string) HandlerResult { + atomic.AddInt32(m.handleCalled, 1) + return HandlerResult{} +} + +func (m mockHandler) Actions() []string { + return []string{"one", "two"} +} + +func (m mockHandler) Actor() parade.ActorID { + return "mock" +} + +func durationPointer(d time.Duration) *time.Duration { + return &d +} + +func TestManager(t *testing.T) { + tests := []struct { + name string + mp mockParade + mh mockHandler + sleepTime time.Duration + expectedReturnTaskCalls int32 + expectedHandleCalled int32 + properties *Properties + }{ + { + name: "no tasks", + mp: newMockParade([]ownTaskResult{}), + mh: newMockHandler(), + sleepTime: 100 * time.Millisecond, + expectedReturnTaskCalls: int32(0), + expectedHandleCalled: int32(0), + properties: nil, + }, + { + name: "50 tasks in one call", + mp: newMockParade([]ownTaskResult{ + { + tasks: 50, + action: "first action", + }, + }), + mh: newMockHandler(), + sleepTime: 100 * time.Millisecond, + expectedReturnTaskCalls: int32(50), + expectedHandleCalled: int32(50), + properties: nil, + }, + { + name: "80 tasks in two calls", + mp: newMockParade([]ownTaskResult{ + { + tasks: 50, + action: "first action", + }, + { + tasks: 30, + action: "second action", + }, + }), + mh: newMockHandler(), + sleepTime: 100 * time.Millisecond, + expectedReturnTaskCalls: int32(80), + expectedHandleCalled: int32(80), + properties: nil, + }, + { + name: "exit before second call", + mp: newMockParade([]ownTaskResult{ + { + tasks: 50, + action: "first action", + }, + { + tasks: 0, + action: "force sleep", + }, + { + tasks: 30, + action: "second action", + }, + }), + mh: newMockHandler(), + sleepTime: 50 * time.Millisecond, + expectedReturnTaskCalls: int32(50), + expectedHandleCalled: int32(50), + properties: &Properties{ + waitTime: durationPointer(time.Millisecond * 100), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := NewAction(tt.mh, tt.mp, tt.properties) + time.Sleep(tt.sleepTime) + a.Close() + returnCalled := atomic.LoadInt32(tt.mp.returnCalled) + if returnCalled != tt.expectedReturnTaskCalls { + t.Fatalf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedReturnTaskCalls, returnCalled) + } + handleCalled := atomic.LoadInt32(tt.mh.handleCalled) + if handleCalled != tt.expectedHandleCalled { + t.Fatalf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedHandleCalled, handleCalled) + } + }) + } +} diff --git a/action/handler.go b/action/handler.go new file mode 100644 index 00000000000..0450a02a4c8 --- /dev/null +++ b/action/handler.go @@ -0,0 +1,14 @@ +package action + +import "github.com/treeverse/lakefs/parade" + +type HandlerResult struct { + Status string + StatusCode parade.TaskStatusCodeValue +} + +type TaskHandler interface { + Handle(action string, body *string) HandlerResult + Actions() []string + Actor() parade.ActorID +} diff --git a/export/export_handler.go b/export/export_handler.go new file mode 100644 index 00000000000..a869a5ab2f0 --- /dev/null +++ b/export/export_handler.go @@ -0,0 +1,91 @@ +package export + +import ( + "encoding/json" + "fmt" + act "github.com/treeverse/lakefs/action" + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/parade" +) + +const actorName parade.ActorID = "EXPORT" +const ( + actionCopy = "export-copy" + actionDelete = "export-delete" + actionNext = "next-export" + actionStart = "start-export" + actionDone = "done-export" +) + +type Handler struct { + adapter block.Adapter +} + +func NewHandler(adapter block.Adapter) *Handler { + return &Handler{ + adapter: adapter, + } +} + +type TaskBody struct { + DestinationNamespace string + DestinationID string + SourceNamespace string + SourceID string +} + +// todo(guys): add logs +func (h *Handler) Handle(action string, body *string) act.HandlerResult { + var params TaskBody + err := json.Unmarshal([]byte(*body), ¶ms) + if err != nil { + return act.HandlerResult{ + Status: err.Error(), + StatusCode: parade.TaskInvalid, + } + } + destinationPointer := block.ObjectPointer{ + StorageNamespace: params.DestinationNamespace, + Identifier: params.DestinationID, + } + sourcePointer := block.ObjectPointer{ + StorageNamespace: params.SourceNamespace, + Identifier: params.SourceID, + } + + switch action { + case actionCopy: + err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler + if err != nil { + return act.HandlerResult{ + Status: err.Error(), + StatusCode: parade.TaskInvalid, + } + } + case actionDelete: + err = h.adapter.Remove(destinationPointer) + if err != nil { + return act.HandlerResult{ + Status: err.Error(), + StatusCode: parade.TaskInvalid, + } + } + default: + return act.HandlerResult{ + Status: "UNKNOWN ACTION", + StatusCode: parade.TaskInvalid, + } + } + return act.HandlerResult{ + Status: fmt.Sprintf("Completed"), + StatusCode: parade.TaskCompleted, + } +} + +func (h *Handler) Actions() []string { + return []string{actionCopy, actionDelete} +} + +func (h *Handler) Actor() parade.ActorID { + return actorName +} diff --git a/export/export_handler_test.go b/export/export_handler_test.go new file mode 100644 index 00000000000..b22d49f8b29 --- /dev/null +++ b/export/export_handler_test.go @@ -0,0 +1,81 @@ +package export + +import ( + "encoding/json" + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/block/mem" + "github.com/treeverse/lakefs/parade" + "github.com/treeverse/lakefs/testutil" + "io/ioutil" + "strings" + "testing" +) + +func TestExportHandler_Handle(t *testing.T) { + tests := []struct { + name string + Body TaskBody + Action string + blockstoreType string + }{ + { + name: "copy on mem", + Action: actionCopy, + Body: TaskBody{ + DestinationNamespace: "local://external-bucket", + DestinationID: "one/two", + SourceNamespace: "local://lakefs-buck", + SourceID: "one/two", + }, + blockstoreType: mem.BlockstoreType, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, tt.blockstoreType) + sourcePointer := block.ObjectPointer{ + StorageNamespace: tt.Body.SourceNamespace, + Identifier: tt.Body.SourceID, + } + destinationPointer := block.ObjectPointer{ + StorageNamespace: tt.Body.DestinationNamespace, + Identifier: tt.Body.DestinationID, + } + // add to + testData := "this is the test Data" + testReader := strings.NewReader(testData) + + err := adapter.Put(sourcePointer, testReader.Size(), testReader, block.PutOpts{}) + if err != nil { + t.Fatal(err) + } + h := NewHandler(adapter) + taskBody, err := json.Marshal(tt.Body) + if err != nil { + t.Fatal(err) + } + taskBodyStr := string(taskBody) + task := parade.OwnedTaskData{ + Action: tt.Action, + Body: &taskBodyStr, + } + if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) + } + + // read Destination + reader, err := adapter.Get(destinationPointer, testReader.Size()) + if err != nil { + t.Error(err) + } + val, err := ioutil.ReadAll(reader) + if err != nil { + t.Error(err) + } + if string(val) != testData { + t.Errorf("expected %s, got %s\n", testData, string(val)) + } + // todo(guys): add tests delete + }) + } +} From 3cb0660f8f8940244e825700b4c7ff9d3d600bd1 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 13 Oct 2020 18:23:39 +0300 Subject: [PATCH 02/13] add touch handle add tests for delete and touch --- export/export_handler.go | 7 +++++- export/export_handler_test.go | 47 ++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/export/export_handler.go b/export/export_handler.go index a869a5ab2f0..47f7fd78074 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -6,12 +6,14 @@ import ( act "github.com/treeverse/lakefs/action" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/parade" + "strings" ) const actorName parade.ActorID = "EXPORT" const ( actionCopy = "export-copy" actionDelete = "export-delete" + actionTouch = "export-touch" actionNext = "next-export" actionStart = "start-export" actionDone = "done-export" @@ -70,6 +72,9 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { StatusCode: parade.TaskInvalid, } } + case actionTouch: + err = h.adapter.Put(destinationPointer, 0, strings.NewReader(""), block.PutOpts{}) + //todo(guys): add cases for other actions or remove them from Actions function default: return act.HandlerResult{ Status: "UNKNOWN ACTION", @@ -83,7 +88,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { } func (h *Handler) Actions() []string { - return []string{actionCopy, actionDelete} + return []string{actionCopy, actionDelete, actionNext, actionStart, actionDone} } func (h *Handler) Actor() parade.ActorID { diff --git a/export/export_handler_test.go b/export/export_handler_test.go index b22d49f8b29..47cd5a7fa4c 100644 --- a/export/export_handler_test.go +++ b/export/export_handler_test.go @@ -29,6 +29,24 @@ func TestExportHandler_Handle(t *testing.T) { }, blockstoreType: mem.BlockstoreType, }, + { + name: "delete on mem", + Action: actionDelete, + Body: TaskBody{ + DestinationNamespace: "local://external-bucket", + DestinationID: "one/two", + }, + blockstoreType: mem.BlockstoreType, + }, + { + name: "touch on mem", + Action: actionTouch, + Body: TaskBody{ + DestinationNamespace: "local://external-bucket", + DestinationID: "one/two", + }, + blockstoreType: mem.BlockstoreType, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -44,11 +62,13 @@ func TestExportHandler_Handle(t *testing.T) { // add to testData := "this is the test Data" testReader := strings.NewReader(testData) - - err := adapter.Put(sourcePointer, testReader.Size(), testReader, block.PutOpts{}) - if err != nil { - t.Fatal(err) + if tt.Action == actionCopy { + err := adapter.Put(sourcePointer, testReader.Size(), testReader, block.PutOpts{}) + if err != nil { + t.Fatal(err) + } } + h := NewHandler(adapter) taskBody, err := json.Marshal(tt.Body) if err != nil { @@ -62,20 +82,29 @@ func TestExportHandler_Handle(t *testing.T) { if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) } - // read Destination reader, err := adapter.Get(destinationPointer, testReader.Size()) + if err != nil { - t.Error(err) + if tt.Action == actionDelete { + return + } + t.Fatal(err) + } + if tt.Action == actionDelete { + t.Fatalf("expected to get error on get in action delete") } val, err := ioutil.ReadAll(reader) if err != nil { - t.Error(err) + t.Fatal(err) + } + expect := testData + if tt.Action == actionTouch { + expect = "" } - if string(val) != testData { + if string(val) != expect { t.Errorf("expected %s, got %s\n", testData, string(val)) } - // todo(guys): add tests delete }) } } From 9188e6f9e4a410a1e811abd5212ec6ee1b978aaa Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 13 Oct 2020 18:53:33 +0300 Subject: [PATCH 03/13] add logs to export handler --- export/export_handler.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/export/export_handler.go b/export/export_handler.go index 47f7fd78074..31673ea796a 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -5,6 +5,7 @@ import ( "fmt" act "github.com/treeverse/lakefs/action" "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" "strings" ) @@ -36,11 +37,15 @@ type TaskBody struct { SourceID string } -// todo(guys): add logs func (h *Handler) Handle(action string, body *string) act.HandlerResult { var params TaskBody + lg := logging.Default().WithFields(logging.Fields{ + "actor": actorName, + "action": action, + }) err := json.Unmarshal([]byte(*body), ¶ms) if err != nil { + lg.WithError(err).Error("unmarshal failed") return act.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, @@ -59,6 +64,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { case actionCopy: err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler if err != nil { + lg.WithError(err).Error("copy failed") return act.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, @@ -67,6 +73,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { case actionDelete: err = h.adapter.Remove(destinationPointer) if err != nil { + lg.WithError(err).Error("delete failed") return act.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, @@ -74,8 +81,16 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { } case actionTouch: err = h.adapter.Put(destinationPointer, 0, strings.NewReader(""), block.PutOpts{}) + if err != nil { + lg.WithError(err).Error("touch failed") + return act.HandlerResult{ + Status: err.Error(), + StatusCode: parade.TaskInvalid, + } + } //todo(guys): add cases for other actions or remove them from Actions function default: + lg.Error("unknown action") return act.HandlerResult{ Status: "UNKNOWN ACTION", StatusCode: parade.TaskInvalid, From 53b99e3b358e45b9ee187b7c1f22f2598324fc4c Mon Sep 17 00:00:00 2001 From: guy-har <60321938+guy-har@users.noreply.github.com> Date: Wed, 14 Oct 2020 18:56:43 +0300 Subject: [PATCH 04/13] Update action/action_manager.go Co-authored-by: arielshaqed --- action/action_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/action/action_manager.go b/action/action_manager.go index 582934ca27a..6f5156e5ffb 100644 --- a/action/action_manager.go +++ b/action/action_manager.go @@ -9,8 +9,8 @@ import ( const ( defaultWorkers = 5 - defaultChannelSize = 10000 - defaultMaxTasks = 100000 + defaultChannelSize = 1000 + defaultMaxTasks = 500 defaultWaitTime = time.Millisecond * 10 defaultMaxDuration = time.Minute * 30 // Todo(guys): change this ) From 1f38d2492027addec65dacff3d1724941233a334 Mon Sep 17 00:00:00 2001 From: guy-har <60321938+guy-har@users.noreply.github.com> Date: Wed, 14 Oct 2020 19:32:38 +0300 Subject: [PATCH 05/13] Update action/action_manager_test.go Co-authored-by: arielshaqed --- action/action_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/action/action_manager_test.go b/action/action_manager_test.go index bc067897b98..86044f4667c 100644 --- a/action/action_manager_test.go +++ b/action/action_manager_test.go @@ -30,7 +30,7 @@ func newMockParade(tasks []ownTaskResult) mockParade { } func getKTasks(k int, action string) []parade.OwnedTaskData { - var tasks []parade.OwnedTaskData + tasks := make([]parade.OwnedTaskData, 0, k) for i := 0; i < k; i++ { task := parade.OwnedTaskData{ ID: parade.TaskID(i), From 7d7d6119173cc766cdfe1c56f2784c2a07178fbb Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 15 Oct 2020 09:16:24 +0300 Subject: [PATCH 06/13] move action_manager to parade document action_manager add data to logs --- action/action_manager.go | 143 -------------------- export/export_handler.go | 15 +-- export/export_handler_test.go | 8 +- parade/action_manager.go | 156 ++++++++++++++++++++++ {action => parade}/action_manager_test.go | 16 +-- parade/handler.go | 12 ++ 6 files changed, 187 insertions(+), 163 deletions(-) delete mode 100644 action/action_manager.go create mode 100644 parade/action_manager.go rename {action => parade}/action_manager_test.go (91%) create mode 100644 parade/handler.go diff --git a/action/action_manager.go b/action/action_manager.go deleted file mode 100644 index 6f5156e5ffb..00000000000 --- a/action/action_manager.go +++ /dev/null @@ -1,143 +0,0 @@ -package action - -import ( - "github.com/treeverse/lakefs/logging" - "github.com/treeverse/lakefs/parade" - "sync" - "time" -) - -const ( - defaultWorkers = 5 - defaultChannelSize = 1000 - defaultMaxTasks = 500 - defaultWaitTime = time.Millisecond * 10 - defaultMaxDuration = time.Minute * 30 // Todo(guys): change this -) - -type Properties struct { - workers int - channelSize int - maxTasks int - waitTime *time.Duration - maxDuration *time.Duration -} - -type Action struct { - properties *Properties - handler TaskHandler - parade parade.Parade - quit chan bool - actionGroup *workerPool -} - -func setDefaultProperties(properties *Properties) *Properties { - if properties == nil { - properties = &Properties{} - } - if properties.workers == 0 { - properties.workers = defaultWorkers - } - if properties.channelSize == 0 { - properties.channelSize = defaultChannelSize - } - if properties.maxTasks == 0 { - properties.maxTasks = defaultMaxTasks - } - if properties.waitTime == nil { - waitTime := defaultWaitTime - properties.waitTime = &waitTime - } - if properties.maxDuration == nil { - maxDuration := defaultMaxDuration - properties.maxDuration = &maxDuration - } - return properties -} - -func NewAction(handler TaskHandler, parade parade.Parade, properties *Properties) *Action { - a := &Action{ - handler: handler, - parade: parade, - properties: setDefaultProperties(properties), - quit: nil, - } - a.start() - return a -} - -func (a *Action) Close() { - a.quit <- true - a.actionGroup.Close() -} - -func (a *Action) start() { - taskChannel := make(chan parade.OwnedTaskData, a.properties.channelSize) - a.quit = make(chan bool) - a.actionGroup = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.workers) - go func() { - for { - select { - case <-a.quit: - return - default: - ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.maxTasks, a.handler.Actions(), a.properties.maxDuration) - if err != nil { - logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err) - // Todo(guys): handle error case better ( with growing sleep periods and returning eventually - time.Sleep(*a.properties.waitTime) - } - for _, ot := range ownedTasks { - a.actionGroup.ch <- ot - } - if len(ownedTasks) == 0 { - time.Sleep(*a.properties.waitTime) - } - } - } - }() -} - -type workerPool struct { - handler TaskHandler - ch chan parade.OwnedTaskData - exit chan int - workers int - wg sync.WaitGroup - parade parade.Parade -} - -func newWorkerPool(handler TaskHandler, ch chan parade.OwnedTaskData, parade parade.Parade, workers int) *workerPool { - a := &workerPool{ - handler: handler, - ch: ch, - exit: make(chan int), - workers: workers, - wg: sync.WaitGroup{}, - parade: parade, - } - a.start() - return a -} - -func (a *workerPool) Close() { - close(a.exit) - close(a.ch) - a.wg.Wait() -} - -func (a *workerPool) start() { - a.wg.Add(a.workers) - for i := 0; i < a.workers; i++ { - go func() { - defer a.wg.Done() - for task := range a.ch { - res := a.handler.Handle(task.Action, task.Body) - err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode) - if err != nil { - logging.Default().WithField("action", task.Action).Errorf("failed to return task: %w", err) - } - } - }() - } -} diff --git a/export/export_handler.go b/export/export_handler.go index 31673ea796a..4fe2de08d8a 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -3,7 +3,6 @@ package export import ( "encoding/json" "fmt" - act "github.com/treeverse/lakefs/action" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" @@ -37,7 +36,7 @@ type TaskBody struct { SourceID string } -func (h *Handler) Handle(action string, body *string) act.HandlerResult { +func (h *Handler) Handle(action string, body *string) parade.HandlerResult { var params TaskBody lg := logging.Default().WithFields(logging.Fields{ "actor": actorName, @@ -46,7 +45,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { err := json.Unmarshal([]byte(*body), ¶ms) if err != nil { lg.WithError(err).Error("unmarshal failed") - return act.HandlerResult{ + return parade.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, } @@ -65,7 +64,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler if err != nil { lg.WithError(err).Error("copy failed") - return act.HandlerResult{ + return parade.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, } @@ -74,7 +73,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { err = h.adapter.Remove(destinationPointer) if err != nil { lg.WithError(err).Error("delete failed") - return act.HandlerResult{ + return parade.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, } @@ -83,7 +82,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { err = h.adapter.Put(destinationPointer, 0, strings.NewReader(""), block.PutOpts{}) if err != nil { lg.WithError(err).Error("touch failed") - return act.HandlerResult{ + return parade.HandlerResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, } @@ -91,12 +90,12 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult { //todo(guys): add cases for other actions or remove them from Actions function default: lg.Error("unknown action") - return act.HandlerResult{ + return parade.HandlerResult{ Status: "UNKNOWN ACTION", StatusCode: parade.TaskInvalid, } } - return act.HandlerResult{ + return parade.HandlerResult{ Status: fmt.Sprintf("Completed"), StatusCode: parade.TaskCompleted, } diff --git a/export/export_handler_test.go b/export/export_handler_test.go index 47cd5a7fa4c..a9e807db51b 100644 --- a/export/export_handler_test.go +++ b/export/export_handler_test.go @@ -22,9 +22,9 @@ func TestExportHandler_Handle(t *testing.T) { name: "copy on mem", Action: actionCopy, Body: TaskBody{ - DestinationNamespace: "local://external-bucket", + DestinationNamespace: "mem://external-bucket", DestinationID: "one/two", - SourceNamespace: "local://lakefs-buck", + SourceNamespace: "mem://lakefs-buck", SourceID: "one/two", }, blockstoreType: mem.BlockstoreType, @@ -33,7 +33,7 @@ func TestExportHandler_Handle(t *testing.T) { name: "delete on mem", Action: actionDelete, Body: TaskBody{ - DestinationNamespace: "local://external-bucket", + DestinationNamespace: "mem://external-bucket", DestinationID: "one/two", }, blockstoreType: mem.BlockstoreType, @@ -42,7 +42,7 @@ func TestExportHandler_Handle(t *testing.T) { name: "touch on mem", Action: actionTouch, Body: TaskBody{ - DestinationNamespace: "local://external-bucket", + DestinationNamespace: "mem://external-bucket", DestinationID: "one/two", }, blockstoreType: mem.BlockstoreType, diff --git a/parade/action_manager.go b/parade/action_manager.go new file mode 100644 index 00000000000..c884be6efb1 --- /dev/null +++ b/parade/action_manager.go @@ -0,0 +1,156 @@ +package parade + +import ( + "github.com/google/uuid" + "github.com/treeverse/lakefs/logging" + "sync" + "time" +) + +const ( + defaultWorkers = 5 + defaultChannelSize = 1000 + defaultMaxTasks = 500 + defaultWaitTime = time.Millisecond * 300 + defaultErrWaitTime = time.Millisecond * 300 + defaultMaxDuration = time.Minute * 30 // Todo(guys): change this +) + +// ManagerProperties defines the configuration properties of an ActionManager +type ManagerProperties struct { + Workers int // number of goroutines handling tasks + ChannelSize int // size of the channel containing tasks for workers + MaxTasks int // max tasks requested in every ownTasks request + WaitTime *time.Duration // time to wait if OwnTasks returned no tasks. + ErrWaitTime *time.Duration // time to wait if OwnTasks returned err. + MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks +} + +// A ActionManager manages the process of requesting and returning tasks for a specific TaskHandler +// The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it +type ActionManager struct { + properties *ManagerProperties + handler TaskHandler + parade Parade + quit chan struct{} + wp *workerPool +} + +func setDefaultProperties(properties *ManagerProperties) *ManagerProperties { + if properties == nil { + properties = &ManagerProperties{} + } + if properties.Workers == 0 { + properties.Workers = defaultWorkers + } + if properties.ChannelSize == 0 { + properties.ChannelSize = defaultChannelSize + } + if properties.MaxTasks == 0 { + properties.MaxTasks = defaultMaxTasks + } + if properties.WaitTime == nil { + waitTime := defaultWaitTime + properties.WaitTime = &waitTime + } + if properties.ErrWaitTime == nil { + errWaitTime := defaultErrWaitTime + properties.ErrWaitTime = &errWaitTime + } + if properties.MaxDuration == nil { + maxDuration := defaultMaxDuration + properties.MaxDuration = &maxDuration + } + return properties +} + +// NewActionManager initiates an ActionManager with workers and returns a +func NewActionManager(handler TaskHandler, parade Parade, properties *ManagerProperties) *ActionManager { + a := &ActionManager{ + handler: handler, + parade: parade, + properties: setDefaultProperties(properties), + quit: nil, + } + a.start() + return a +} + +func (a *ActionManager) Close() { + close(a.quit) + a.wp.Close() +} + +func (a *ActionManager) start() { + taskChannel := make(chan OwnedTaskData, a.properties.ChannelSize) + a.quit = make(chan struct{}) + a.wp = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.Workers) + go func() { + for { + select { + case <-a.quit: + return + default: + ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.MaxTasks, a.handler.Actions(), a.properties.MaxDuration) + if err != nil { + logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err) + time.Sleep(*a.properties.WaitTime) + } + for _, ot := range ownedTasks { + a.wp.ch <- ot + } + if len(ownedTasks) == 0 { + time.Sleep(*a.properties.WaitTime) + } + } + } + }() +} + +type workerPool struct { + handler TaskHandler + ch chan OwnedTaskData + workers int + wg sync.WaitGroup + parade Parade +} + +func newWorkerPool(handler TaskHandler, ch chan OwnedTaskData, parade Parade, workers int) *workerPool { + a := &workerPool{ + handler: handler, + ch: ch, + workers: workers, + wg: sync.WaitGroup{}, + parade: parade, + } + a.start() + return a +} + +func (a *workerPool) Close() { + close(a.ch) + a.wg.Wait() +} + +func (a *workerPool) start() { + a.wg.Add(a.workers) + for i := 0; i < a.workers; i++ { + go func() { + workerID := uuid.New() + defer a.wg.Done() + for task := range a.ch { + res := a.handler.Handle(task.Action, task.Body) + err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode) + if err != nil { + logging.Default().WithFields(logging.Fields{ + "action": task.Action, + "task workerID": task.ID, + "status": res.Status, + "status code:": res.StatusCode, + "worker workerID": workerID, + }).Errorf("failed to return task: %w", err) + } + } + }() + } +} diff --git a/action/action_manager_test.go b/parade/action_manager_test.go similarity index 91% rename from action/action_manager_test.go rename to parade/action_manager_test.go index 86044f4667c..b918db9da51 100644 --- a/action/action_manager_test.go +++ b/parade/action_manager_test.go @@ -1,4 +1,4 @@ -package action +package parade_test import ( "github.com/treeverse/lakefs/parade" @@ -67,9 +67,9 @@ func newMockHandler() mockHandler { } } -func (m mockHandler) Handle(_ string, _ *string) HandlerResult { +func (m mockHandler) Handle(_ string, _ *string) parade.HandlerResult { atomic.AddInt32(m.handleCalled, 1) - return HandlerResult{} + return parade.HandlerResult{} } func (m mockHandler) Actions() []string { @@ -84,7 +84,7 @@ func durationPointer(d time.Duration) *time.Duration { return &d } -func TestManager(t *testing.T) { +func TestActionManager(t *testing.T) { tests := []struct { name string mp mockParade @@ -92,7 +92,7 @@ func TestManager(t *testing.T) { sleepTime time.Duration expectedReturnTaskCalls int32 expectedHandleCalled int32 - properties *Properties + properties *parade.ManagerProperties }{ { name: "no tasks", @@ -155,15 +155,15 @@ func TestManager(t *testing.T) { sleepTime: 50 * time.Millisecond, expectedReturnTaskCalls: int32(50), expectedHandleCalled: int32(50), - properties: &Properties{ - waitTime: durationPointer(time.Millisecond * 100), + properties: ¶de.ManagerProperties{ + WaitTime: durationPointer(time.Millisecond * 100), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := NewAction(tt.mh, tt.mp, tt.properties) + a := parade.NewActionManager(tt.mh, tt.mp, tt.properties) time.Sleep(tt.sleepTime) a.Close() returnCalled := atomic.LoadInt32(tt.mp.returnCalled) diff --git a/parade/handler.go b/parade/handler.go new file mode 100644 index 00000000000..c98e96aca90 --- /dev/null +++ b/parade/handler.go @@ -0,0 +1,12 @@ +package parade + +type HandlerResult struct { + Status string + StatusCode TaskStatusCodeValue +} + +type TaskHandler interface { + Handle(action string, body *string) HandlerResult + Actions() []string + Actor() ActorID +} From 42dbc8de370e690d23807f84e691b0d75241179c Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 15 Oct 2020 14:46:32 +0300 Subject: [PATCH 07/13] align export handler with task_generator --- export/export_handler.go | 127 ++++++++++--------- export/export_handler_test.go | 231 ++++++++++++++++++++-------------- 2 files changed, 206 insertions(+), 152 deletions(-) diff --git a/export/export_handler.go b/export/export_handler.go index 4fe2de08d8a..9a2d732317f 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -6,18 +6,11 @@ import ( "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" + "net/url" "strings" ) const actorName parade.ActorID = "EXPORT" -const ( - actionCopy = "export-copy" - actionDelete = "export-delete" - actionTouch = "export-touch" - actionNext = "next-export" - actionStart = "start-export" - actionDone = "done-export" -) type Handler struct { adapter block.Adapter @@ -36,62 +29,82 @@ type TaskBody struct { SourceID string } -func (h *Handler) Handle(action string, body *string) parade.HandlerResult { - var params TaskBody - lg := logging.Default().WithFields(logging.Fields{ - "actor": actorName, - "action": action, - }) - err := json.Unmarshal([]byte(*body), ¶ms) +func PathToPointer(path string) (block.ObjectPointer, error) { + u, err := url.Parse(path) //TODO: add verify path on create task if err != nil { - lg.WithError(err).Error("unmarshal failed") - return parade.HandlerResult{ - Status: err.Error(), - StatusCode: parade.TaskInvalid, - } + return block.ObjectPointer{}, err } - destinationPointer := block.ObjectPointer{ - StorageNamespace: params.DestinationNamespace, - Identifier: params.DestinationID, + return block.ObjectPointer{ + StorageNamespace: fmt.Sprintf("%s://%s", u.Scheme, u.Host), + Identifier: u.Path, + }, err +} +func (h *Handler) copy(body *string) error { + var copyData CopyData + err := json.Unmarshal([]byte(*body), ©Data) + if err != nil { + return err } - sourcePointer := block.ObjectPointer{ - StorageNamespace: params.SourceNamespace, - Identifier: params.SourceID, + from, err := PathToPointer(copyData.From) + if err != nil { + return err } + to, err := PathToPointer(copyData.To) + if err != nil { + return err + } + return h.adapter.Copy(from, to) // todo(guys): add wait for copy in handler +} +func (h *Handler) remove(body *string) error { + var deleteData DeleteData + err := json.Unmarshal([]byte(*body), &deleteData) + if err != nil { + return err + } + path, err := PathToPointer(deleteData.File) + if err != nil { + return err + } + return h.adapter.Remove(path) +} + +func (h *Handler) touch(body *string) error { + var successData SuccessData + err := json.Unmarshal([]byte(*body), &successData) + if err != nil { + return err + } + path, err := PathToPointer(successData.File) + if err != nil { + return err + } + return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{}) +} +func (h *Handler) Handle(action string, body *string) parade.HandlerResult { + + var err error switch action { - case actionCopy: - err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler - if err != nil { - lg.WithError(err).Error("copy failed") - return parade.HandlerResult{ - Status: err.Error(), - StatusCode: parade.TaskInvalid, - } - } - case actionDelete: - err = h.adapter.Remove(destinationPointer) - if err != nil { - lg.WithError(err).Error("delete failed") - return parade.HandlerResult{ - Status: err.Error(), - StatusCode: parade.TaskInvalid, - } - } - case actionTouch: - err = h.adapter.Put(destinationPointer, 0, strings.NewReader(""), block.PutOpts{}) - if err != nil { - lg.WithError(err).Error("touch failed") - return parade.HandlerResult{ - Status: err.Error(), - StatusCode: parade.TaskInvalid, - } - } - //todo(guys): add cases for other actions or remove them from Actions function + case CopyAction: + err = h.copy(body) + case DeleteAction: + err = h.remove(body) + case TouchAction: + err = h.touch(body) + case DoneAction: + //todo(guys): handle done action default: - lg.Error("unknown action") + err = fmt.Errorf("unknown action") + } + + if err != nil { + logging.Default().WithFields(logging.Fields{ + "actor": actorName, + "action": action, + }).WithError(err).Error("touch failed") + return parade.HandlerResult{ - Status: "UNKNOWN ACTION", + Status: err.Error(), StatusCode: parade.TaskInvalid, } } @@ -102,7 +115,7 @@ func (h *Handler) Handle(action string, body *string) parade.HandlerResult { } func (h *Handler) Actions() []string { - return []string{actionCopy, actionDelete, actionNext, actionStart, actionDone} + return []string{CopyAction, DeleteAction, TouchAction, DoneAction} } func (h *Handler) Actor() parade.ActorID { diff --git a/export/export_handler_test.go b/export/export_handler_test.go index a9e807db51b..56e8da5e952 100644 --- a/export/export_handler_test.go +++ b/export/export_handler_test.go @@ -11,100 +11,141 @@ import ( "testing" ) -func TestExportHandler_Handle(t *testing.T) { - tests := []struct { - name string - Body TaskBody - Action string - blockstoreType string - }{ - { - name: "copy on mem", - Action: actionCopy, - Body: TaskBody{ - DestinationNamespace: "mem://external-bucket", - DestinationID: "one/two", - SourceNamespace: "mem://lakefs-buck", - SourceID: "one/two", - }, - blockstoreType: mem.BlockstoreType, - }, - { - name: "delete on mem", - Action: actionDelete, - Body: TaskBody{ - DestinationNamespace: "mem://external-bucket", - DestinationID: "one/two", - }, - blockstoreType: mem.BlockstoreType, - }, - { - name: "touch on mem", - Action: actionTouch, - Body: TaskBody{ - DestinationNamespace: "mem://external-bucket", - DestinationID: "one/two", - }, - blockstoreType: mem.BlockstoreType, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, tt.blockstoreType) - sourcePointer := block.ObjectPointer{ - StorageNamespace: tt.Body.SourceNamespace, - Identifier: tt.Body.SourceID, - } - destinationPointer := block.ObjectPointer{ - StorageNamespace: tt.Body.DestinationNamespace, - Identifier: tt.Body.DestinationID, - } - // add to - testData := "this is the test Data" - testReader := strings.NewReader(testData) - if tt.Action == actionCopy { - err := adapter.Put(sourcePointer, testReader.Size(), testReader, block.PutOpts{}) - if err != nil { - t.Fatal(err) - } - } - - h := NewHandler(adapter) - taskBody, err := json.Marshal(tt.Body) - if err != nil { - t.Fatal(err) - } - taskBodyStr := string(taskBody) - task := parade.OwnedTaskData{ - Action: tt.Action, - Body: &taskBodyStr, - } - if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { - t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) - } - // read Destination - reader, err := adapter.Get(destinationPointer, testReader.Size()) - - if err != nil { - if tt.Action == actionDelete { - return - } - t.Fatal(err) - } - if tt.Action == actionDelete { - t.Fatalf("expected to get error on get in action delete") - } - val, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatal(err) - } - expect := testData - if tt.Action == actionTouch { - expect = "" - } - if string(val) != expect { - t.Errorf("expected %s, got %s\n", testData, string(val)) - } - }) +func TestCopy(t *testing.T) { + adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType) + sourcePointer := block.ObjectPointer{ + StorageNamespace: "mem://lakeFS-bucket", + Identifier: "/one/two", + } + destinationPointer := block.ObjectPointer{ + StorageNamespace: "mem://external-bucket", + Identifier: "/one/two", + } + from := sourcePointer.StorageNamespace + sourcePointer.Identifier + to := destinationPointer.StorageNamespace + destinationPointer.Identifier + + testData := "this is the test Data" + testReader := strings.NewReader(testData) + err := adapter.Put(sourcePointer, testReader.Size(), testReader, block.PutOpts{}) + if err != nil { + t.Fatal(err) + } + + h := NewHandler(adapter) + taskBody, err := json.Marshal(&CopyData{ + From: from, + To: to, + }) + if err != nil { + t.Fatal(err) + } + taskBodyStr := string(taskBody) + task := parade.OwnedTaskData{ + Action: CopyAction, + Body: &taskBodyStr, + } + if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) + } + // read Destination + reader, err := adapter.Get(destinationPointer, testReader.Size()) + + if err != nil { + t.Fatal(err) + } + + val, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + expect := testData + if string(val) != expect { + t.Errorf("expected %s, got %s\n", testData, string(val)) + } +} + +func TestDelete(t *testing.T) { + adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType) + + destinationPointer := block.ObjectPointer{ + StorageNamespace: "mem://external-bucket", + Identifier: "/one/two", + } + path := destinationPointer.StorageNamespace + destinationPointer.Identifier + + testData := "this is the test Data" + testReader := strings.NewReader(testData) + err := adapter.Put(destinationPointer, testReader.Size(), testReader, block.PutOpts{}) + if err != nil { + t.Fatal(err) + } + + h := NewHandler(adapter) + taskBody, err := json.Marshal(&DeleteData{ + File: path, + }) + if err != nil { + t.Fatal(err) + } + taskBodyStr := string(taskBody) + task := parade.OwnedTaskData{ + Action: DeleteAction, + Body: &taskBodyStr, + } + if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) + } + // read Destination + _, err = adapter.Get(destinationPointer, testReader.Size()) + if err == nil { + t.Errorf("expected path get err file not found") + } + +} + +func TestTouch(t *testing.T) { + adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType) + destinationPointer := block.ObjectPointer{ + StorageNamespace: "mem://external-bucket", + Identifier: "/one/two", + } + path := destinationPointer.StorageNamespace + destinationPointer.Identifier + + testData := "this is the test Data" + testReader := strings.NewReader(testData) + err := adapter.Put(destinationPointer, testReader.Size(), testReader, block.PutOpts{}) + if err != nil { + t.Fatal(err) + } + + h := NewHandler(adapter) + taskBody, err := json.Marshal(&SuccessData{ + File: path, + }) + if err != nil { + t.Fatal(err) + } + taskBodyStr := string(taskBody) + task := parade.OwnedTaskData{ + Action: TouchAction, + Body: &taskBodyStr, + } + if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) + } + // read Destination + reader, err := adapter.Get(destinationPointer, testReader.Size()) + + if err != nil { + t.Fatal(err) + } + + val, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + expect := "" + if string(val) != expect { + t.Errorf("expected %s, got %s\n", testData, string(val)) } } From 59ce972a9df00a384de0698b1b80ec2d24c5e966 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Sun, 18 Oct 2020 12:21:52 +0300 Subject: [PATCH 08/13] action_manager clean up --- parade/action_manager.go | 20 +++++-- parade/action_manager_test.go | 101 ++++++++++++++-------------------- 2 files changed, 55 insertions(+), 66 deletions(-) diff --git a/parade/action_manager.go b/parade/action_manager.go index c884be6efb1..62024a95f82 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -33,6 +33,7 @@ type ActionManager struct { handler TaskHandler parade Parade quit chan struct{} + wg sync.WaitGroup wp *workerPool } @@ -79,28 +80,35 @@ func NewActionManager(handler TaskHandler, parade Parade, properties *ManagerPro func (a *ActionManager) Close() { close(a.quit) a.wp.Close() + a.wg.Wait() } func (a *ActionManager) start() { taskChannel := make(chan OwnedTaskData, a.properties.ChannelSize) a.quit = make(chan struct{}) a.wp = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.Workers) + a.wg.Add(1) go func() { + defer a.wg.Done() + d := time.Duration(0) for { select { case <-a.quit: return - default: + case <-time.After(d): ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.MaxTasks, a.handler.Actions(), a.properties.MaxDuration) if err != nil { logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err) - time.Sleep(*a.properties.WaitTime) - } - for _, ot := range ownedTasks { - a.wp.ch <- ot + d = *a.properties.ErrWaitTime + continue } if len(ownedTasks) == 0 { - time.Sleep(*a.properties.WaitTime) + d = *a.properties.WaitTime + } else { + d = 0 + for _, ot := range ownedTasks { + a.wp.ch <- ot + } } } } diff --git a/parade/action_manager_test.go b/parade/action_manager_test.go index b918db9da51..de041565d17 100644 --- a/parade/action_manager_test.go +++ b/parade/action_manager_test.go @@ -2,6 +2,7 @@ package parade_test import ( "github.com/treeverse/lakefs/parade" + "strconv" "sync/atomic" "testing" "time" @@ -14,26 +15,16 @@ type ownTaskResult struct { type mockParade struct { parade.Parade - ownCalled *int32 - returnCalled *int32 - tasks []ownTaskResult -} - -func newMockParade(tasks []ownTaskResult) mockParade { - ownCalled := int32(0) - returnCalled := int32(0) - return mockParade{ - ownCalled: &ownCalled, - returnCalled: &returnCalled, - tasks: tasks, - } + ownCalled int32 + returnCalled int32 + Tasks []ownTaskResult } func getKTasks(k int, action string) []parade.OwnedTaskData { tasks := make([]parade.OwnedTaskData, 0, k) for i := 0; i < k; i++ { task := parade.OwnedTaskData{ - ID: parade.TaskID(i), + ID: parade.TaskID(strconv.Itoa(i)), Token: parade.PerformanceToken{}, Action: action, Body: nil, @@ -43,40 +34,33 @@ func getKTasks(k int, action string) []parade.OwnedTaskData { return tasks } -func (m mockParade) OwnTasks(_ parade.ActorID, _ int, _ []string, _ *time.Duration) ([]parade.OwnedTaskData, error) { - cur := atomic.AddInt32(m.ownCalled, 1) - if cur <= int32(len(m.tasks)) { - return getKTasks(m.tasks[cur-1].tasks, m.tasks[cur-1].action), nil +func (m *mockParade) OwnTasks(_ parade.ActorID, _ int, _ []string, _ *time.Duration) ([]parade.OwnedTaskData, error) { + cur := atomic.AddInt32(&m.ownCalled, 1) + if cur <= int32(len(m.Tasks)) { + return getKTasks(m.Tasks[cur-1].tasks, m.Tasks[cur-1].action), nil } return []parade.OwnedTaskData{}, nil } -func (m mockParade) ReturnTask(_ parade.TaskID, _ parade.PerformanceToken, _ string, _ parade.TaskStatusCodeValue) error { - atomic.AddInt32(m.returnCalled, 1) +func (m *mockParade) ReturnTask(_ parade.TaskID, _ parade.PerformanceToken, _ string, _ parade.TaskStatusCodeValue) error { + atomic.AddInt32(&m.returnCalled, 1) return nil } type mockHandler struct { - handleCalled *int32 + handleCalled int32 } -func newMockHandler() mockHandler { - handleCalled := int32(0) - return mockHandler{ - handleCalled: &handleCalled, - } -} - -func (m mockHandler) Handle(_ string, _ *string) parade.HandlerResult { - atomic.AddInt32(m.handleCalled, 1) +func (m *mockHandler) Handle(_ string, _ *string) parade.HandlerResult { + atomic.AddInt32(&m.handleCalled, 1) return parade.HandlerResult{} } -func (m mockHandler) Actions() []string { +func (m *mockHandler) Actions() []string { return []string{"one", "two"} } -func (m mockHandler) Actor() parade.ActorID { +func (m *mockHandler) Actor() parade.ActorID { return "mock" } @@ -87,8 +71,7 @@ func durationPointer(d time.Duration) *time.Duration { func TestActionManager(t *testing.T) { tests := []struct { name string - mp mockParade - mh mockHandler + Tasks []ownTaskResult sleepTime time.Duration expectedReturnTaskCalls int32 expectedHandleCalled int32 @@ -96,30 +79,25 @@ func TestActionManager(t *testing.T) { }{ { name: "no tasks", - mp: newMockParade([]ownTaskResult{}), - mh: newMockHandler(), - sleepTime: 100 * time.Millisecond, + sleepTime: 50 * time.Millisecond, expectedReturnTaskCalls: int32(0), expectedHandleCalled: int32(0), - properties: nil, }, { name: "50 tasks in one call", - mp: newMockParade([]ownTaskResult{ + Tasks: []ownTaskResult{ { tasks: 50, action: "first action", }, - }), - mh: newMockHandler(), - sleepTime: 100 * time.Millisecond, + }, + sleepTime: 50 * time.Millisecond, expectedReturnTaskCalls: int32(50), expectedHandleCalled: int32(50), - properties: nil, }, { - name: "80 tasks in two calls", - mp: newMockParade([]ownTaskResult{ + name: "80 Tasks in two calls", + Tasks: []ownTaskResult{ { tasks: 50, action: "first action", @@ -128,16 +106,14 @@ func TestActionManager(t *testing.T) { tasks: 30, action: "second action", }, - }), - mh: newMockHandler(), - sleepTime: 100 * time.Millisecond, + }, + sleepTime: 50 * time.Millisecond, expectedReturnTaskCalls: int32(80), expectedHandleCalled: int32(80), - properties: nil, }, { name: "exit before second call", - mp: newMockParade([]ownTaskResult{ + Tasks: []ownTaskResult{ { tasks: 50, action: "first action", @@ -146,33 +122,38 @@ func TestActionManager(t *testing.T) { tasks: 0, action: "force sleep", }, + { + tasks: 0, + action: "force sleep", + }, { tasks: 30, action: "second action", }, - }), - mh: newMockHandler(), + }, sleepTime: 50 * time.Millisecond, expectedReturnTaskCalls: int32(50), expectedHandleCalled: int32(50), properties: ¶de.ManagerProperties{ - WaitTime: durationPointer(time.Millisecond * 100), + WaitTime: durationPointer(time.Millisecond * 30), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := parade.NewActionManager(tt.mh, tt.mp, tt.properties) + mh := &mockHandler{} + mp := &mockParade{ + Tasks: tt.Tasks, + } + a := parade.NewActionManager(mh, mp, tt.properties) time.Sleep(tt.sleepTime) a.Close() - returnCalled := atomic.LoadInt32(tt.mp.returnCalled) - if returnCalled != tt.expectedReturnTaskCalls { - t.Fatalf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedReturnTaskCalls, returnCalled) + if mp.returnCalled != tt.expectedReturnTaskCalls { + t.Errorf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedReturnTaskCalls, mp.returnCalled) } - handleCalled := atomic.LoadInt32(tt.mh.handleCalled) - if handleCalled != tt.expectedHandleCalled { - t.Fatalf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedHandleCalled, handleCalled) + if mh.handleCalled != tt.expectedHandleCalled { + t.Errorf("expected ownedTasks to be called: %d times, called %d\n", tt.expectedHandleCalled, mh.handleCalled) } }) } From 9889d43e4045e9861bcae6286551a67583838bb6 Mon Sep 17 00:00:00 2001 From: guy-har <60321938+guy-har@users.noreply.github.com> Date: Sun, 18 Oct 2020 12:27:02 +0300 Subject: [PATCH 09/13] Update parade/action_manager.go Co-authored-by: arielshaqed --- parade/action_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parade/action_manager.go b/parade/action_manager.go index 62024a95f82..69fed450fee 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -26,8 +26,8 @@ type ManagerProperties struct { MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks } -// A ActionManager manages the process of requesting and returning tasks for a specific TaskHandler -// The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it +// ActionManager manages the process of requesting and returning tasks for a specific TaskHandler. +// The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it. type ActionManager struct { properties *ManagerProperties handler TaskHandler From 1ab468414e9a11c00e072dcba01729fd538c3b6e Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Sun, 18 Oct 2020 17:16:30 +0300 Subject: [PATCH 10/13] change handler to actor document actor --- action/handler.go | 14 -------------- export/export_handler.go | 8 ++++---- parade/action_manager.go | 24 +++++++++++++----------- parade/action_manager_test.go | 6 +++--- parade/actor.go | 16 ++++++++++++++++ parade/handler.go | 12 ------------ 6 files changed, 36 insertions(+), 44 deletions(-) delete mode 100644 action/handler.go create mode 100644 parade/actor.go delete mode 100644 parade/handler.go diff --git a/action/handler.go b/action/handler.go deleted file mode 100644 index 0450a02a4c8..00000000000 --- a/action/handler.go +++ /dev/null @@ -1,14 +0,0 @@ -package action - -import "github.com/treeverse/lakefs/parade" - -type HandlerResult struct { - Status string - StatusCode parade.TaskStatusCodeValue -} - -type TaskHandler interface { - Handle(action string, body *string) HandlerResult - Actions() []string - Actor() parade.ActorID -} diff --git a/export/export_handler.go b/export/export_handler.go index 9a2d732317f..a53acf166bd 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -81,7 +81,7 @@ func (h *Handler) touch(body *string) error { } return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{}) } -func (h *Handler) Handle(action string, body *string) parade.HandlerResult { +func (h *Handler) Handle(action string, body *string) parade.ActorResult { var err error switch action { @@ -103,12 +103,12 @@ func (h *Handler) Handle(action string, body *string) parade.HandlerResult { "action": action, }).WithError(err).Error("touch failed") - return parade.HandlerResult{ + return parade.ActorResult{ Status: err.Error(), StatusCode: parade.TaskInvalid, } } - return parade.HandlerResult{ + return parade.ActorResult{ Status: fmt.Sprintf("Completed"), StatusCode: parade.TaskCompleted, } @@ -118,6 +118,6 @@ func (h *Handler) Actions() []string { return []string{CopyAction, DeleteAction, TouchAction, DoneAction} } -func (h *Handler) Actor() parade.ActorID { +func (h *Handler) ActorID() parade.ActorID { return actorName } diff --git a/parade/action_manager.go b/parade/action_manager.go index 69fed450fee..f1f317c3163 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -26,11 +26,11 @@ type ManagerProperties struct { MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks } -// ActionManager manages the process of requesting and returning tasks for a specific TaskHandler. +// ActionManager manages the process of requesting and returning tasks for a specific Actor. // The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it. type ActionManager struct { properties *ManagerProperties - handler TaskHandler + actor Actor parade Parade quit chan struct{} wg sync.WaitGroup @@ -66,9 +66,9 @@ func setDefaultProperties(properties *ManagerProperties) *ManagerProperties { } // NewActionManager initiates an ActionManager with workers and returns a -func NewActionManager(handler TaskHandler, parade Parade, properties *ManagerProperties) *ActionManager { +func NewActionManager(actor Actor, parade Parade, properties *ManagerProperties) *ActionManager { a := &ActionManager{ - handler: handler, + actor: actor, parade: parade, properties: setDefaultProperties(properties), quit: nil, @@ -86,8 +86,10 @@ func (a *ActionManager) Close() { func (a *ActionManager) start() { taskChannel := make(chan OwnedTaskData, a.properties.ChannelSize) a.quit = make(chan struct{}) - a.wp = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.Workers) + a.wp = newWorkerPool(a.actor, taskChannel, a.parade, a.properties.Workers) a.wg.Add(1) + actorID := a.actor.ActorID() + actions := a.actor.Actions() go func() { defer a.wg.Done() d := time.Duration(0) @@ -96,9 +98,9 @@ func (a *ActionManager) start() { case <-a.quit: return case <-time.After(d): - ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.MaxTasks, a.handler.Actions(), a.properties.MaxDuration) + ownedTasks, err := a.parade.OwnTasks(actorID, a.properties.MaxTasks, actions, a.properties.MaxDuration) if err != nil { - logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err) + logging.Default().WithField("actor", actorID).Errorf("manager failed to receive tasks: %s", err) d = *a.properties.ErrWaitTime continue } @@ -116,16 +118,16 @@ func (a *ActionManager) start() { } type workerPool struct { - handler TaskHandler + actor Actor ch chan OwnedTaskData workers int wg sync.WaitGroup parade Parade } -func newWorkerPool(handler TaskHandler, ch chan OwnedTaskData, parade Parade, workers int) *workerPool { +func newWorkerPool(handler Actor, ch chan OwnedTaskData, parade Parade, workers int) *workerPool { a := &workerPool{ - handler: handler, + actor: handler, ch: ch, workers: workers, wg: sync.WaitGroup{}, @@ -147,7 +149,7 @@ func (a *workerPool) start() { workerID := uuid.New() defer a.wg.Done() for task := range a.ch { - res := a.handler.Handle(task.Action, task.Body) + res := a.actor.Handle(task.Action, task.Body) err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode) if err != nil { logging.Default().WithFields(logging.Fields{ diff --git a/parade/action_manager_test.go b/parade/action_manager_test.go index de041565d17..2ed16ae5f9e 100644 --- a/parade/action_manager_test.go +++ b/parade/action_manager_test.go @@ -51,16 +51,16 @@ type mockHandler struct { handleCalled int32 } -func (m *mockHandler) Handle(_ string, _ *string) parade.HandlerResult { +func (m *mockHandler) Handle(_ string, _ *string) parade.ActorResult { atomic.AddInt32(&m.handleCalled, 1) - return parade.HandlerResult{} + return parade.ActorResult{} } func (m *mockHandler) Actions() []string { return []string{"one", "two"} } -func (m *mockHandler) Actor() parade.ActorID { +func (m *mockHandler) ActorID() parade.ActorID { return "mock" } diff --git a/parade/actor.go b/parade/actor.go new file mode 100644 index 00000000000..ac736e00821 --- /dev/null +++ b/parade/actor.go @@ -0,0 +1,16 @@ +package parade + +type ActorResult struct { + Status string + StatusCode TaskStatusCodeValue +} + +// Actor handles an action or a group of actions +type Actor interface { + // Handle performs actions with the given body and return the ActorResult + Handle(action string, body *string) ActorResult + // Actions returns the list of actions that could be performed by the Actor + Actions() []string + // ActorID returns the ID of the actor + ActorID() ActorID +} diff --git a/parade/handler.go b/parade/handler.go deleted file mode 100644 index c98e96aca90..00000000000 --- a/parade/handler.go +++ /dev/null @@ -1,12 +0,0 @@ -package parade - -type HandlerResult struct { - Status string - StatusCode TaskStatusCodeValue -} - -type TaskHandler interface { - Handle(action string, body *string) HandlerResult - Actions() []string - Actor() ActorID -} From 1cc4ec16cd5ff6c88186cb7f14f63a41ca7f922e Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Mon, 19 Oct 2020 12:51:30 +0300 Subject: [PATCH 11/13] fix imports --- export/export_handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/export/export_handler.go b/export/export_handler.go index a53acf166bd..356fc4d6381 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -3,11 +3,12 @@ package export import ( "encoding/json" "fmt" + "net/url" + "strings" + "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" - "net/url" - "strings" ) const actorName parade.ActorID = "EXPORT" From 53b0028eeb3b49879b2d4ab78ce3b98fffa2a1e7 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Mon, 19 Oct 2020 12:58:27 +0300 Subject: [PATCH 12/13] fix linter errors --- export/export_handler.go | 13 ++++++++----- parade/action_manager.go | 6 ++++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/export/export_handler.go b/export/export_handler.go index 356fc4d6381..0645abfba04 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -2,6 +2,7 @@ package export import ( "encoding/json" + "errors" "fmt" "net/url" "strings" @@ -31,7 +32,7 @@ type TaskBody struct { } func PathToPointer(path string) (block.ObjectPointer, error) { - u, err := url.Parse(path) //TODO: add verify path on create task + u, err := url.Parse(path) // TODO: add verify path on create task if err != nil { return block.ObjectPointer{}, err } @@ -82,8 +83,10 @@ func (h *Handler) touch(body *string) error { } return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{}) } -func (h *Handler) Handle(action string, body *string) parade.ActorResult { +var errUnknownAction = errors.New("unknown action") + +func (h *Handler) Handle(action string, body *string) parade.ActorResult { var err error switch action { case CopyAction: @@ -93,9 +96,9 @@ func (h *Handler) Handle(action string, body *string) parade.ActorResult { case TouchAction: err = h.touch(body) case DoneAction: - //todo(guys): handle done action + // todo(guys): handle done action default: - err = fmt.Errorf("unknown action") + err = errUnknownAction } if err != nil { @@ -110,7 +113,7 @@ func (h *Handler) Handle(action string, body *string) parade.ActorResult { } } return parade.ActorResult{ - Status: fmt.Sprintf("Completed"), + Status: "Completed", StatusCode: parade.TaskCompleted, } } diff --git a/parade/action_manager.go b/parade/action_manager.go index f1f317c3163..5ce875f0f54 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -1,10 +1,12 @@ package parade import ( - "github.com/google/uuid" - "github.com/treeverse/lakefs/logging" "sync" "time" + + "github.com/google/uuid" + + "github.com/treeverse/lakefs/logging" ) const ( From 0abbda080fa7bad75ff555514c8cd4c1f29cfbc1 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Mon, 19 Oct 2020 13:53:56 +0300 Subject: [PATCH 13/13] fix race condition --- parade/action_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parade/action_manager.go b/parade/action_manager.go index 5ce875f0f54..7903721eb93 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -81,8 +81,8 @@ func NewActionManager(actor Actor, parade Parade, properties *ManagerProperties) func (a *ActionManager) Close() { close(a.quit) - a.wp.Close() a.wg.Wait() + a.wp.Close() } func (a *ActionManager) start() { @@ -93,6 +93,7 @@ func (a *ActionManager) start() { actorID := a.actor.ActorID() actions := a.actor.Actions() go func() { + defer close(taskChannel) defer a.wg.Done() d := time.Duration(0) for { @@ -140,7 +141,6 @@ func newWorkerPool(handler Actor, ch chan OwnedTaskData, parade Parade, workers } func (a *workerPool) Close() { - close(a.ch) a.wg.Wait() }