Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add export action and action manager #807

Merged
merged 14 commits into from
Oct 21, 2020
143 changes: 0 additions & 143 deletions action/action_manager.go

This file was deleted.

116 changes: 74 additions & 42 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ package export
import (
"encoding/json"
"fmt"
act "github.com/treeverse/lakefs/action"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/parade"
"net/url"
"strings"
)

const actorName parade.ActorID = "EXPORT"
const (
actionCopy = "export-copy"
actionDelete = "export-delete"
actionNext = "next-export"
actionStart = "start-export"
actionDone = "done-export"
)

type Handler struct {
adapter block.Adapter
Expand All @@ -34,56 +29,93 @@ type TaskBody struct {
SourceID string
}

// todo(guys): add logs
func (h *Handler) Handle(action string, body *string) act.HandlerResult {
var params TaskBody
err := json.Unmarshal([]byte(*body), &params)
func PathToPointer(path string) (block.ObjectPointer, error) {
u, err := url.Parse(path) //TODO: add verify path on create task
if err != nil {
return act.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
return block.ObjectPointer{}, err
}
destinationPointer := block.ObjectPointer{
StorageNamespace: params.DestinationNamespace,
Identifier: params.DestinationID,
return block.ObjectPointer{
StorageNamespace: fmt.Sprintf("%s://%s", u.Scheme, u.Host),
Identifier: u.Path,
}, err
}
func (h *Handler) copy(body *string) error {
var copyData CopyData
err := json.Unmarshal([]byte(*body), &copyData)
if err != nil {
return err
}
sourcePointer := block.ObjectPointer{
StorageNamespace: params.SourceNamespace,
Identifier: params.SourceID,
from, err := PathToPointer(copyData.From)
if err != nil {
return err
}
to, err := PathToPointer(copyData.To)
if err != nil {
return err
}
return h.adapter.Copy(from, to) // todo(guys): add wait for copy in handler
}

func (h *Handler) remove(body *string) error {
var deleteData DeleteData
err := json.Unmarshal([]byte(*body), &deleteData)
if err != nil {
return err
}
path, err := PathToPointer(deleteData.File)
if err != nil {
return err
}
return h.adapter.Remove(path)
}

func (h *Handler) touch(body *string) error {
var successData SuccessData
err := json.Unmarshal([]byte(*body), &successData)
if err != nil {
return err
}
path, err := PathToPointer(successData.File)
if err != nil {
return err
}
return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{})
}
func (h *Handler) Handle(action string, body *string) parade.HandlerResult {

var err error
switch action {
case actionCopy:
err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler
if err != nil {
return act.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
}
case actionDelete:
err = h.adapter.Remove(destinationPointer)
if err != nil {
return act.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
}
case CopyAction:
err = h.copy(body)
case DeleteAction:
err = h.remove(body)
case TouchAction:
err = h.touch(body)
case DoneAction:
//todo(guys): handle done action
default:
return act.HandlerResult{
Status: "UNKNOWN ACTION",
err = fmt.Errorf("unknown action")
}

if err != nil {
logging.Default().WithFields(logging.Fields{
"actor": actorName,
"action": action,
}).WithError(err).Error("touch failed")

return parade.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
}
return act.HandlerResult{
return parade.HandlerResult{
Status: fmt.Sprintf("Completed"),
StatusCode: parade.TaskCompleted,
}
}

func (h *Handler) Actions() []string {
return []string{actionCopy, actionDelete}
return []string{CopyAction, DeleteAction, TouchAction, DoneAction}
}

func (h *Handler) Actor() parade.ActorID {
Expand Down
Loading