Skip to content

Commit

Permalink
Merge conflicts with latest workflow branch
Browse files Browse the repository at this point in the history
  • Loading branch information
parauliya committed Jan 13, 2020
2 parents db81df3 + 1639339 commit 2220479
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 133 deletions.
61 changes: 61 additions & 0 deletions cmd/rover/cmd/workflow/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package workflow

import (
"context"
"fmt"
"log"
"strconv"

"github.com/packethost/rover/client"
"github.com/packethost/rover/protos/workflow"
uuid "github.com/satori/go.uuid"
"github.com/spf13/cobra"
)

var (
version string
fVersion = "version"
)

// dataCmd represents the data subcommand for workflow command
var dataCmd = &cobra.Command{
Use: "data [id]",
Short: "get workflow data",
Example: "rover workflow data [id] [flags]",
Args: func(c *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("%v requires an argument", c.UseLine())
}
for _, arg := range args {
if _, err := uuid.FromString(arg); err != nil {
return fmt.Errorf("invalid uuid: %s", arg)
}
}
return nil
},
Run: func(c *cobra.Command, args []string) {
for _, arg := range args {
req := &workflow.GetWorkflowDataRequest{WorkflowID: arg}
if version != "" {
v, err := strconv.ParseInt(version, 10, 64)
if err != nil {
log.Fatal(fmt.Errorf("invalid version: %v", version))
return
}
req.Version = v
}
res, err := client.WorkflowClient.GetWorkflowData(context.Background(), req)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(res.Data))
}
},
}

func init() {
flags := dataCmd.PersistentFlags()
flags.StringVarP(&version, fVersion, "v", "", "data version")

SubCommands = append(SubCommands, dataCmd)
}
25 changes: 14 additions & 11 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func InsertIntoWfDataTable(ctx context.Context, db *sql.DB, req *pb.UpdateWorkfl

_, err = tx.Exec(`
INSERT INTO
workflow_data (workflow_id, version, worker_id, action_name, data)
workflow_data (workflow_id, version, metadata, data)
VALUES
($1, $2, $3, $4, $5);
`, req.GetWorkflowID(), version, req.GetWorkerID(), req.GetActionName(), req.GetData())
($1, $2, $3, $4);
`, req.GetWorkflowID(), version, string(req.GetMetadata()), string(req.GetData()))
if err != nil {
return errors.Wrap(err, "INSERT Into workflow_data")
}
Expand All @@ -207,23 +207,26 @@ func InsertIntoWfDataTable(ctx context.Context, db *sql.DB, req *pb.UpdateWorkfl
return nil
}

func GetfromWfDataTable(ctx context.Context, db *sql.DB, id string) ([]byte, error) {

version, err := getLatestVersionWfData(ctx, db, id)
if err != nil {
return []byte(""), err
func GetfromWfDataTable(ctx context.Context, db *sql.DB, req *pb.GetWorkflowDataRequest) ([]byte, error) {
version := int(req.GetVersion())
if req.Version == 0 {
v, err := getLatestVersionWfData(ctx, db, req.GetWorkflowID())
if err != nil {
return []byte(""), err
}
version = v
}
query := `
SELECT data
FROM workflow_data
WHERE
workflow_id = $1 AND version = $2
`
row := db.QueryRowContext(ctx, query, id, version)
row := db.QueryRowContext(ctx, query, req.GetWorkflowID(), version)
buf := []byte{}
err = row.Scan(&buf)
err := row.Scan(&buf)
if err == nil {
return buf, nil
return []byte(buf), nil
}

if err != sql.ErrNoRows {
Expand Down
5 changes: 2 additions & 3 deletions deploy/docker-entrypoint-initdb.d/rover-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ CREATE TABLE IF NOT EXISTS workflow_worker_map (
CREATE TABLE IF NOT EXISTS workflow_data (
workflow_id UUID NOT NULL
, version INT
, worker_id VARCHAR(200)
, action_name VARCHAR(200)
, data BYTEA
, metadata JSONB
, data JSONB
);
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func GetWorkflowData(context context.Context, req *pb.GetWorkflowDataRequest, sd
if len(wfID) == 0 {
return &pb.GetWorkflowDataResponse{Data: []byte("")}, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
}
data, err := db.GetfromWfDataTable(context, sdb, wfID)
data, err := db.GetfromWfDataTable(context, sdb, req)
if err != nil {
return &pb.GetWorkflowDataResponse{Data: []byte("")}, status.Errorf(codes.Unknown, err.Error())
}
Expand Down
167 changes: 84 additions & 83 deletions protos/workflow/workflow.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions protos/workflow/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ message WorkflowActionList {

message GetWorkflowDataRequest {
string workflow_iD = 1;
int64 version = 2;
}

message GetWorkflowDataResponse {
Expand All @@ -121,7 +122,6 @@ message GetWorkflowDataResponse {

message UpdateWorkflowDataRequest {
string workflow_iD = 1;
string worker_iD = 2;
string action_name = 3;
bytes data = 4;
bytes metadata = 2;
bytes data = 3;
}
13 changes: 7 additions & 6 deletions worker/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ var (
cli *client.Client
)

func executeAction(ctx context.Context, action *pb.WorkflowAction) (string, pb.ActionState, error) {
func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string) (string, pb.ActionState, error) {
err := pullActionImage(ctx, action)
if err != nil {
return fmt.Sprintf("Failed to pull Image : %s", action.GetImage()), 1, errors.Wrap(err, "DOCKER PULL")
}

id, err := createContainer(ctx, action, action.Command)
id, err := createContainer(ctx, action, action.Command, wfID)
if err != nil {
return fmt.Sprintf("Failed to create container"), 1, errors.Wrap(err, "DOCKER CREATE")
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func executeAction(ctx context.Context, action *pb.WorkflowAction) (string, pb.A
}
if status != 0 {
if status == pb.ActionState_ACTION_FAILED && action.OnFailure != "" {
id, err = createContainer(ctx, action, action.OnFailure)
id, err = createContainer(ctx, action, action.OnFailure, wfID)
if err != nil {
fmt.Println("Failed to create on-failure command: ", err)
}
Expand All @@ -101,7 +101,7 @@ func executeAction(ctx context.Context, action *pb.WorkflowAction) (string, pb.A
fmt.Println("Failed to run on-failure command: ", err)
}
} else if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != "" {
id, err = createContainer(ctx, action, action.OnTimeout)
id, err = createContainer(ctx, action, action.OnTimeout, wfID)
if err != nil {
fmt.Println("Failed to create on-timeout command: ", err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func pullActionImage(ctx context.Context, action *pb.WorkflowAction) error {
return nil
}

func createContainer(ctx context.Context, action *pb.WorkflowAction, cmd string) (string, error) {
func createContainer(ctx context.Context, action *pb.WorkflowAction, cmd string, wfID string) (string, error) {
config := &container.Config{
Image: registry + "/" + action.GetImage(),
AttachStdout: true,
Expand All @@ -165,8 +165,9 @@ func createContainer(ctx context.Context, action *pb.WorkflowAction, cmd string)
config.Cmd = []string{cmd}
}

wfDir := dataDir + string(os.PathSeparator) + wfID
hostConfig := &container.HostConfig{
Binds: []string{workflowData},
Binds: []string{wfDir + ":/workflow"},
}

resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, action.GetName())
Expand Down
107 changes: 81 additions & 26 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

const (
dataFile = "/workflow/data"
dataFile = "data"
dataDir = "/worker"
maxFileSize = "MAX_FILE_SIZE" // in bytes
defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes
)
Expand All @@ -30,6 +31,15 @@ var (
workflowDataSHA = map[string]string{}
)

// WorkflowMetadata is the metadata related to workflow data
type WorkflowMetadata struct {
WorkerID string
Action string
Task string
UpdatedAt time.Time
SHA string
}

func initializeWorker(client pb.WorkflowSvcClient) error {
workerID := os.Getenv("WORKER_ID")
if workerID == "" {
Expand Down Expand Up @@ -70,9 +80,6 @@ func initializeWorker(client pb.WorkflowSvcClient) error {
} else {
switch wfContext.GetCurrentActionState() {
case pb.ActionState_ACTION_SUCCESS:
// send updated workflow data
updateWorkflowData(ctx, client, wfContext)

if isLastAction(wfContext, actions) {
fmt.Printf("Workflow %s completed successfully\n", wfID)
continue
Expand All @@ -96,7 +103,25 @@ func initializeWorker(client pb.WorkflowSvcClient) error {
}

if turn {
fmt.Printf("Starting with action %s\n", actions.GetActionList()[actionIndex])
wfDir := dataDir + string(os.PathSeparator) + wfID
if _, err := os.Stat(wfDir); !os.IsNotExist(err) {
err := os.Mkdir(wfDir, os.FileMode(0755))
if err != nil {
log.Fatal(err)
}

f := openDataFile(wfDir)
_, err = f.Write([]byte("{}"))
if err != nil {
log.Fatal(err)
}

f.Close()
if err != nil {
log.Fatal(err)
}
}
log.Printf("Starting with action %s\n", actions.GetActionList()[actionIndex])
} else {
fmt.Printf("Sleep for %d seconds\n", retryInterval)
time.Sleep(retryInterval)
Expand Down Expand Up @@ -126,7 +151,7 @@ func initializeWorker(client pb.WorkflowSvcClient) error {

// start executing the action
start := time.Now()
message, status, err := executeAction(ctx, actions.GetActionList()[actionIndex])
message, status, err := executeAction(ctx, actions.GetActionList()[actionIndex], wfID)
elapsed := time.Since(start)

actionStatus := &pb.WorkflowActionStatus{
Expand Down Expand Up @@ -162,6 +187,9 @@ func initializeWorker(client pb.WorkflowSvcClient) error {
}
fmt.Printf("Sent action status %s\n", actionStatus)

// send workflow data, if updated
updateWorkflowData(ctx, client, actionStatus)

if len(actions.GetActionList()) == actionIndex+1 {
fmt.Printf("Reached to end of workflow\n")
turn = false
Expand Down Expand Up @@ -245,19 +273,24 @@ func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workflowI
log.Fatal(err)
}

f := openDataFile()
defer f.Close()
if len(res.Data) == 0 {
f.Write([]byte("{}"))
} else {
if len(res.GetData()) != 0 {
log.Printf("Data received: %x", res.GetData())
wfDir := dataDir + string(os.PathSeparator) + workflowID
f := openDataFile(wfDir)
defer f.Close()

_, err := f.Write(res.GetData())
if err != nil {
log.Fatal(err)
}
h := sha.New()
workflowDataSHA[workflowID] = base64.StdEncoding.EncodeToString(h.Sum(res.Data))
f.Write(res.Data)
}
}

func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workflowCtx *pb.WorkflowContext) {
f := openDataFile()
func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) {
wfDir := dataDir + string(os.PathSeparator) + actionStatus.GetWorkflowId()
f := openDataFile(wfDir)
defer f.Close()

data, err := ioutil.ReadAll(f)
Expand All @@ -267,23 +300,45 @@ func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workfl

if isValidDataFile(f, data) {
h := sha.New()
newSHA := base64.StdEncoding.EncodeToString(h.Sum(data))
if !strings.EqualFold(workflowDataSHA[workflowCtx.GetWorkflowId()], newSHA) {
_, err := client.UpdateWorkflowData(ctx, &pb.UpdateWorkflowDataRequest{
WorkflowID: workflowCtx.GetWorkflowId(),
Data: data,
ActionName: workflowCtx.GetCurrentAction(),
WorkerID: workflowCtx.GetCurrentWorker(),
})
if err != nil {
log.Fatal(err)
if _, ok := workflowDataSHA[actionStatus.GetWorkflowId()]; !ok {
checksum := base64.StdEncoding.EncodeToString(h.Sum(data))
workflowDataSHA[actionStatus.GetWorkflowId()] = checksum
sendUpdate(ctx, client, actionStatus, data, checksum)
} else {
newSHA := base64.StdEncoding.EncodeToString(h.Sum(data))
if !strings.EqualFold(workflowDataSHA[actionStatus.GetWorkflowId()], newSHA) {
sendUpdate(ctx, client, actionStatus, data, newSHA)
}
}
}
}

func openDataFile() *os.File {
f, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE, 0644)
func sendUpdate(ctx context.Context, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) {
meta := WorkflowMetadata{
WorkerID: st.GetWorkerId(),
Action: st.GetActionName(),
Task: st.GetTaskName(),
UpdatedAt: time.Now(),
SHA: checksum,
}
metadata, err := json.Marshal(meta)
if err != nil {
log.Fatal(err)
}

log.Printf("Sending updated data: %v\n", string(data))
_, err = client.UpdateWorkflowData(ctx, &pb.UpdateWorkflowDataRequest{
WorkflowID: st.GetWorkflowId(),
Data: data,
Metadata: metadata,
})
if err != nil {
log.Fatal(err)
}
}

func openDataFile(wfDir string) *os.File {
f, err := os.OpenFile(wfDir+string(os.PathSeparator)+dataFile, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit 2220479

Please sign in to comment.