Skip to content

Commit

Permalink
Added code for update workflow_state and workflow_event table in db
Browse files Browse the repository at this point in the history
The above table will be updated  when the ReportActionStatus endpoint of rover server will be called
The ReportActionStatus endpoint is called to update the status of a particular action from worker node
  • Loading branch information
parauliya authored and gauravgahlot committed Nov 4, 2019
1 parent b3c37f5 commit b103eb2
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 48 deletions.
71 changes: 55 additions & 16 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
pb "github.com/packethost/rover/protos/rover"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gopkg.in/yaml.v2"
Expand All @@ -30,30 +31,19 @@ type (
Name string `yaml:"name"`
WorkerAddr string `yaml:"worker"`
Actions []Action `yaml:"actions"`
OnFailure string `yaml:"on-failure"`
OnTimeout string `yaml:"on-timeout"`
}

// Action is the basic executional unit for a workflow
Action struct {
Name string `yaml:"name"`
Image string `yaml:"image"`
Timeout int `yaml:"timeout"`
Timeout int64 `yaml:"timeout"`
Command string `yaml:"command"`
OnTimeout string `yaml:"on-timeout"`
OnFailure string `yaml:"on-failure"`
}
)

type WorkflowAction struct {
TaskName string
WorkerID uuid.UUID
Name string
Image string
Timeout int
OnTimeout string
OnFailure string
}

// Workflow represents a workflow instance in database
type Workflow struct {
State int32
Expand Down Expand Up @@ -229,7 +219,7 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
if err != nil {
return errors.Wrap(err, "Invalid Template")
}
var actionList []WorkflowAction
var actionList []pb.WorkflowAction
var uniqueWorkerID uuid.UUID
for _, task := range wfymldata.Tasks {
workerID, err := getWorkerID(ctx, db, task.WorkerAddr)
Expand All @@ -247,12 +237,13 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
uniqueWorkerID = workerUID
}
for _, ac := range task.Actions {
action := WorkflowAction{
action := pb.WorkflowAction{
TaskName: task.Name,
WorkerID: workerUID,
WorkerId: workerUID.String(),
Name: ac.Name,
Image: ac.Image,
Timeout: ac.Timeout,
Command: ac.Command,
OnTimeout: ac.OnTimeout,
OnFailure: ac.OnFailure,
}
Expand Down Expand Up @@ -437,3 +428,51 @@ func UpdateWorkflow(ctx context.Context, db *sql.DB, wf Workflow, state int32) e
}
return nil
}

func UpdateWorkflowStateTable(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
}

_, err = tx.Exec(`
UPDATE workflow_state
SET current_task_name = $2,
current_action_name = $3,
current_action_state = $4,
current_worker = $5,
current_action_index = $6
WHERE
workflow_id = $1;
`, wfContext.WorkflowId, wfContext.CurrentTask, wfContext.CurrentAction, wfContext.CurrentActionState, wfContext.CurrentWorker, wfContext.CurrentActionIndex)
if err != nil {
return errors.Wrap(err, "INSERT in to workflow_state")
}
err = tx.Commit()
if err != nil {
return errors.Wrap(err, "COMMIT")
}
return nil
}

func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.WorkflowActionStatus) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
}

_, err = tx.Exec(`
INSERT INTO
workflow_event (workflow_id, task_name, action_name, execution_time, message, status)
VALUES
($1, $2, $3, $4, $5, $6);
`, wfEvent.WorkflowId, wfEvent.TaskName, wfEvent.ActionName, wfEvent.Seconds, wfEvent.Message, wfEvent.ActionStatus)
if err != nil {
return errors.Wrap(err, "INSERT in to workflow_state")
}
err = tx.Commit()
if err != nil {
return errors.Wrap(err, "COMMIT")
}
return nil
}
12 changes: 11 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package executor

import (
"context"
"database/sql"
"fmt"

empty "github.com/golang/protobuf/ptypes/empty"
"github.com/packethost/rover/db"
pb "github.com/packethost/rover/protos/rover"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -100,7 +102,7 @@ func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest)
}

// ReportActionStatus implements rover.ReportActionStatus
func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*empty.Empty, error) {
func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, sdb *sql.DB) (*empty.Empty, error) {
wfID := req.GetWorkflowId()
if len(wfID) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
Expand Down Expand Up @@ -141,6 +143,14 @@ func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (
wfContext.CurrentAction = req.GetActionName()
wfContext.CurrentActionState = req.GetActionStatus()
wfContext.CurrentActionIndex = actionIndex
err := db.UpdateWorkflowStateTable(context, sdb, wfContext)
if err != nil {
return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_state table. Error : %s", err)
}
err = db.InsertIntoWorkflowEventTable(context, sdb, req)
if err != nil {
return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_event table. Error : %s", err)
}
fmt.Printf("Current context %s\n", wfContext)
return &empty.Empty{}, nil
}
2 changes: 1 addition & 1 deletion grpc-server/rover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowAct

// ReportActionStatus implements rover.ReportActionStatus
func (s *server) ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*empty.Empty, error) {
return exec.ReportActionStatus(context, req)
return exec.ReportActionStatus(context, req, s.db)
}
7 changes: 4 additions & 3 deletions grpc-server/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ func (s *server) CreateWorkflow(ctx context.Context, in *workflow.CreateRequest)
Target: in.Target,
State: workflow.State_value[workflow.State_PENDING.String()],
}
err := db.CreateWorkflow(ctx, s.db, wf)
data, err := createYaml(ctx, s.db, in.Template, in.Target)
if err != nil {
return err
}
data, err = createYaml(ctx, s.db, in.Template, in.Target)
err = db.InsertActionList(ctx, s.db, data, id)
if err != nil {
return err
}
err = db.InsertActionList(ctx, s.db, data, id)
err = db.CreateWorkflow(ctx, s.db, wf)
if err != nil {
return err
}

return nil
}

Expand Down
53 changes: 27 additions & 26 deletions protos/rover/rover.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protos/rover/rover.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ message WorkflowAction {
string task_name = 1;
string name = 2;
string image = 3;
string timeout = 4;
int64 timeout = 4;
string command = 5;
string on_timeout = 6;
string on_failure = 7;
Expand Down

0 comments on commit b103eb2

Please sign in to comment.