From b103eb2fc279dd8599fdd616e14215a4a04f8ffb Mon Sep 17 00:00:00 2001 From: parauliya Date: Sat, 2 Nov 2019 20:50:56 +0530 Subject: [PATCH] Added code for update workflow_state and workflow_event table in db The above table will be updated when the ReportActionStatus endpoint of rover server will be called The ReportActionStatus endpoint is called to update the status of a particular action from worker node --- db/workflow.go | 71 +++++++++++++++++++++++++++++++--------- executor/executor.go | 12 ++++++- grpc-server/rover.go | 2 +- grpc-server/workflow.go | 7 ++-- protos/rover/rover.pb.go | 53 +++++++++++++++--------------- protos/rover/rover.proto | 2 +- 6 files changed, 99 insertions(+), 48 deletions(-) diff --git a/db/workflow.go b/db/workflow.go index 3c2470884..0847cb74f 100644 --- a/db/workflow.go +++ b/db/workflow.go @@ -10,6 +10,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" + pb "github.com/packethost/rover/protos/rover" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" "gopkg.in/yaml.v2" @@ -30,30 +31,19 @@ type ( Name string `yaml:"name"` WorkerAddr string `yaml:"worker"` Actions []Action `yaml:"actions"` - OnFailure string `yaml:"on-failure"` - OnTimeout string `yaml:"on-timeout"` } // Action is the basic executional unit for a workflow Action struct { Name string `yaml:"name"` Image string `yaml:"image"` - Timeout int `yaml:"timeout"` + Timeout int64 `yaml:"timeout"` + Command string `yaml:"command"` OnTimeout string `yaml:"on-timeout"` OnFailure string `yaml:"on-failure"` } ) -type WorkflowAction struct { - TaskName string - WorkerID uuid.UUID - Name string - Image string - Timeout int - OnTimeout string - OnFailure string -} - // Workflow represents a workflow instance in database type Workflow struct { State int32 @@ -229,7 +219,7 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid. if err != nil { return errors.Wrap(err, "Invalid Template") } - var actionList []WorkflowAction + var actionList []pb.WorkflowAction var uniqueWorkerID uuid.UUID for _, task := range wfymldata.Tasks { workerID, err := getWorkerID(ctx, db, task.WorkerAddr) @@ -247,12 +237,13 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid. uniqueWorkerID = workerUID } for _, ac := range task.Actions { - action := WorkflowAction{ + action := pb.WorkflowAction{ TaskName: task.Name, - WorkerID: workerUID, + WorkerId: workerUID.String(), Name: ac.Name, Image: ac.Image, Timeout: ac.Timeout, + Command: ac.Command, OnTimeout: ac.OnTimeout, OnFailure: ac.OnFailure, } @@ -437,3 +428,51 @@ func UpdateWorkflow(ctx context.Context, db *sql.DB, wf Workflow, state int32) e } return nil } + +func UpdateWorkflowStateTable(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error { + tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return errors.Wrap(err, "BEGIN transaction") + } + + _, err = tx.Exec(` + UPDATE workflow_state + SET current_task_name = $2, + current_action_name = $3, + current_action_state = $4, + current_worker = $5, + current_action_index = $6 + WHERE + workflow_id = $1; + `, wfContext.WorkflowId, wfContext.CurrentTask, wfContext.CurrentAction, wfContext.CurrentActionState, wfContext.CurrentWorker, wfContext.CurrentActionIndex) + if err != nil { + return errors.Wrap(err, "INSERT in to workflow_state") + } + err = tx.Commit() + if err != nil { + return errors.Wrap(err, "COMMIT") + } + return nil +} + +func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.WorkflowActionStatus) error { + tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return errors.Wrap(err, "BEGIN transaction") + } + + _, err = tx.Exec(` + INSERT INTO + workflow_event (workflow_id, task_name, action_name, execution_time, message, status) + VALUES + ($1, $2, $3, $4, $5, $6); + `, wfEvent.WorkflowId, wfEvent.TaskName, wfEvent.ActionName, wfEvent.Seconds, wfEvent.Message, wfEvent.ActionStatus) + if err != nil { + return errors.Wrap(err, "INSERT in to workflow_state") + } + err = tx.Commit() + if err != nil { + return errors.Wrap(err, "COMMIT") + } + return nil +} diff --git a/executor/executor.go b/executor/executor.go index 32a533f32..07a24905c 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2,9 +2,11 @@ package executor import ( "context" + "database/sql" "fmt" empty "github.com/golang/protobuf/ptypes/empty" + "github.com/packethost/rover/db" pb "github.com/packethost/rover/protos/rover" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -100,7 +102,7 @@ func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest) } // ReportActionStatus implements rover.ReportActionStatus -func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*empty.Empty, error) { +func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, sdb *sql.DB) (*empty.Empty, error) { wfID := req.GetWorkflowId() if len(wfID) == 0 { return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid") @@ -141,6 +143,14 @@ func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) ( wfContext.CurrentAction = req.GetActionName() wfContext.CurrentActionState = req.GetActionStatus() wfContext.CurrentActionIndex = actionIndex + err := db.UpdateWorkflowStateTable(context, sdb, wfContext) + if err != nil { + return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_state table. Error : %s", err) + } + err = db.InsertIntoWorkflowEventTable(context, sdb, req) + if err != nil { + return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_event table. Error : %s", err) + } fmt.Printf("Current context %s\n", wfContext) return &empty.Empty{}, nil } diff --git a/grpc-server/rover.go b/grpc-server/rover.go index d879f9091..cf128613f 100644 --- a/grpc-server/rover.go +++ b/grpc-server/rover.go @@ -20,5 +20,5 @@ func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowAct // ReportActionStatus implements rover.ReportActionStatus func (s *server) ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*empty.Empty, error) { - return exec.ReportActionStatus(context, req) + return exec.ReportActionStatus(context, req, s.db) } diff --git a/grpc-server/workflow.go b/grpc-server/workflow.go index 954b5356b..adacfccb8 100644 --- a/grpc-server/workflow.go +++ b/grpc-server/workflow.go @@ -42,18 +42,19 @@ func (s *server) CreateWorkflow(ctx context.Context, in *workflow.CreateRequest) Target: in.Target, State: workflow.State_value[workflow.State_PENDING.String()], } - err := db.CreateWorkflow(ctx, s.db, wf) + data, err := createYaml(ctx, s.db, in.Template, in.Target) if err != nil { return err } - data, err = createYaml(ctx, s.db, in.Template, in.Target) + err = db.InsertActionList(ctx, s.db, data, id) if err != nil { return err } - err = db.InsertActionList(ctx, s.db, data, id) + err = db.CreateWorkflow(ctx, s.db, wf) if err != nil { return err } + return nil } diff --git a/protos/rover/rover.pb.go b/protos/rover/rover.pb.go index b82fc5648..03794ebb1 100644 --- a/protos/rover/rover.pb.go +++ b/protos/rover/rover.pb.go @@ -6,12 +6,13 @@ package rover import ( context "context" fmt "fmt" + math "math" + proto "github.com/golang/protobuf/proto" empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -293,7 +294,7 @@ type WorkflowAction struct { TaskName string `protobuf:"bytes,1,opt,name=task_name,json=taskName,proto3" json:"task_name,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Image string `protobuf:"bytes,3,opt,name=image,proto3" json:"image,omitempty"` - Timeout string `protobuf:"bytes,4,opt,name=timeout,proto3" json:"timeout,omitempty"` + Timeout int64 `protobuf:"varint,4,opt,name=timeout,proto3" json:"timeout,omitempty"` Command string `protobuf:"bytes,5,opt,name=command,proto3" json:"command,omitempty"` OnTimeout string `protobuf:"bytes,6,opt,name=on_timeout,json=onTimeout,proto3" json:"on_timeout,omitempty"` OnFailure string `protobuf:"bytes,7,opt,name=on_failure,json=onFailure,proto3" json:"on_failure,omitempty"` @@ -349,11 +350,11 @@ func (m *WorkflowAction) GetImage() string { return "" } -func (m *WorkflowAction) GetTimeout() string { +func (m *WorkflowAction) GetTimeout() int64 { if m != nil { return m.Timeout } - return "" + return 0 } func (m *WorkflowAction) GetCommand() string { @@ -517,7 +518,7 @@ func init() { func init() { proto.RegisterFile("rover.proto", fileDescriptor_a2b4b2643fc791e2) } var fileDescriptor_a2b4b2643fc791e2 = []byte{ - // 668 bytes of a gzipped FileDescriptorProto + // 670 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0x5f, 0x4f, 0x1a, 0x4f, 0x14, 0x75, 0x41, 0x54, 0x2e, 0xc2, 0x0f, 0xaf, 0xfe, 0xc8, 0x16, 0x63, 0xa4, 0x9b, 0x34, 0x21, 0x3e, 0x60, 0x63, 0xd3, 0x36, 0x7d, 0x34, 0x08, 0x66, 0x53, 0x8b, 0x66, 0xc1, 0x90, 0xf4, 0x65, @@ -539,27 +540,27 @@ var fileDescriptor_a2b4b2643fc791e2 = []byte{ 0xb2, 0x00, 0xb3, 0x3e, 0xad, 0x02, 0x93, 0x23, 0x99, 0x0e, 0xec, 0x25, 0xff, 0xad, 0xdf, 0x06, 0xd4, 0xb2, 0xb5, 0x49, 0xc8, 0x89, 0xc7, 0x6e, 0xe8, 0x05, 0x44, 0x87, 0x9c, 0x00, 0x03, 0x2f, 0x20, 0x88, 0xb0, 0x29, 0x70, 0x99, 0x92, 0xf8, 0x8d, 0x07, 0x50, 0xf2, 0x03, 0xef, 0x81, 0xa8, - 0x54, 0xe4, 0x01, 0x4d, 0xd8, 0xe6, 0x7e, 0x40, 0x68, 0xcc, 0x55, 0x10, 0xfa, 0x98, 0x30, 0x13, - 0x1a, 0x04, 0x5e, 0x38, 0x15, 0xae, 0x97, 0x1d, 0x7d, 0xc4, 0x23, 0x00, 0x1a, 0xba, 0xba, 0x6c, - 0x4b, 0x90, 0x65, 0x1a, 0x8e, 0x54, 0xa1, 0xa4, 0xef, 0x3d, 0x7f, 0x1e, 0x47, 0xc4, 0xdc, 0xd6, - 0x74, 0x5f, 0x02, 0xd9, 0xed, 0xdc, 0xc9, 0x6d, 0xe7, 0x15, 0x60, 0xf6, 0x7f, 0x0a, 0xfb, 0x3f, - 0x40, 0x45, 0x65, 0x3a, 0xf7, 0x19, 0x57, 0xc6, 0xff, 0x9f, 0x33, 0x5e, 0xea, 0x1d, 0xf0, 0x96, - 0x75, 0xd6, 0x2f, 0x03, 0x0e, 0xb2, 0x74, 0x92, 0x72, 0xcc, 0x5e, 0x5e, 0xf8, 0x8c, 0xbb, 0x85, - 0x9c, 0xbb, 0xc7, 0xcb, 0xcf, 0x11, 0xb4, 0xf4, 0x53, 0xcd, 0x15, 0x82, 0x8f, 0x50, 0x4d, 0xed, - 0x60, 0xcc, 0x84, 0xb5, 0xeb, 0x97, 0x70, 0xd7, 0x4b, 0x7f, 0x97, 0x09, 0xdb, 0x8c, 0x4c, 0x68, - 0x38, 0x65, 0x6a, 0xd3, 0xf5, 0x31, 0x61, 0x02, 0xc2, 0x58, 0x92, 0x9f, 0x34, 0x5c, 0x1f, 0x4f, - 0x38, 0x54, 0x52, 0x0d, 0x11, 0xa1, 0x76, 0xde, 0x1d, 0xd9, 0xd7, 0x03, 0xf7, 0xa6, 0x37, 0xb8, - 0xb0, 0x07, 0x97, 0xf5, 0x0d, 0x6c, 0x00, 0x2a, 0xcc, 0x1e, 0xb8, 0x37, 0xce, 0xf5, 0xa5, 0xd3, - 0x1b, 0x0e, 0xeb, 0x46, 0x4a, 0x3b, 0xbc, 0xed, 0x76, 0x13, 0xac, 0x80, 0x7b, 0x50, 0x55, 0x58, - 0xff, 0xdc, 0xbe, 0xea, 0x5d, 0xd4, 0x8b, 0x29, 0xd9, 0xc8, 0xfe, 0xd2, 0xbb, 0xbe, 0x1d, 0xd5, - 0x37, 0x4f, 0xee, 0xa0, 0xaa, 0x9d, 0x95, 0x73, 0x6b, 0x00, 0xe3, 0x7e, 0x6a, 0x26, 0x42, 0x6d, - 0xdc, 0xcf, 0xcd, 0x93, 0x9a, 0xd5, 0xac, 0x2a, 0x94, 0xc7, 0xfd, 0xd5, 0x1c, 0x49, 0x2f, 0x67, - 0x9c, 0xfd, 0x31, 0xa0, 0xe4, 0x24, 0x8e, 0xa1, 0x03, 0xfb, 0x97, 0x84, 0xe7, 0xae, 0x18, 0xc3, - 0xa3, 0x7f, 0xdc, 0x3d, 0x79, 0xad, 0x9a, 0xcd, 0xf5, 0xb4, 0x58, 0x8d, 0x0d, 0xbc, 0x01, 0x4c, - 0xf5, 0x54, 0x37, 0xf2, 0x59, 0xcb, 0xec, 0x4d, 0x6d, 0xbe, 0x5a, 0x4b, 0xab, 0x8e, 0x9f, 0x01, - 0x1d, 0xf2, 0x48, 0x23, 0x9e, 0xd9, 0xb5, 0xc3, 0xb5, 0x25, 0x92, 0x6c, 0x36, 0x3a, 0xf2, 0x89, - 0xef, 0xe8, 0x27, 0xbe, 0xd3, 0x4b, 0x9e, 0x78, 0x6b, 0xe3, 0x6e, 0x4b, 0x20, 0xef, 0xfe, 0x06, - 0x00, 0x00, 0xff, 0xff, 0xb1, 0x33, 0x5b, 0xb3, 0x18, 0x06, 0x00, 0x00, + 0x54, 0xe4, 0x01, 0x4d, 0xd8, 0xe6, 0x7e, 0x40, 0x68, 0xcc, 0x45, 0x10, 0x45, 0x47, 0x1f, 0x13, + 0x66, 0x42, 0x83, 0xc0, 0x0b, 0xa7, 0xc2, 0xf5, 0xb2, 0xa3, 0x8f, 0x78, 0x04, 0x40, 0x43, 0x57, + 0x97, 0x6d, 0x09, 0xb2, 0x4c, 0xc3, 0x91, 0x2a, 0x94, 0xf4, 0xbd, 0xe7, 0xcf, 0xe3, 0x88, 0x98, + 0xdb, 0x9a, 0xee, 0x4b, 0x20, 0xbb, 0x9d, 0x3b, 0xb9, 0xed, 0xbc, 0x02, 0xcc, 0xfe, 0x4f, 0x61, + 0xff, 0x07, 0xa8, 0xa8, 0x4c, 0xe7, 0x3e, 0xe3, 0xca, 0xf8, 0xff, 0x73, 0xc6, 0x4b, 0xbd, 0x03, + 0xde, 0xb2, 0xce, 0xfa, 0x65, 0xc0, 0x41, 0x96, 0x4e, 0x52, 0x8e, 0xd9, 0xcb, 0x0b, 0x9f, 0x71, + 0xb7, 0x90, 0x73, 0xf7, 0x78, 0xf9, 0x39, 0x82, 0x96, 0x7e, 0xaa, 0xb9, 0x42, 0xf0, 0x11, 0xaa, + 0xa9, 0x1d, 0x8c, 0x99, 0xb0, 0x76, 0xfd, 0x12, 0xee, 0x7a, 0xe9, 0xef, 0x32, 0x61, 0x9b, 0x91, + 0x09, 0x0d, 0xa7, 0x4c, 0x6d, 0xba, 0x3e, 0x26, 0x4c, 0x40, 0x18, 0x4b, 0xf2, 0x93, 0x86, 0xeb, + 0xe3, 0x09, 0x87, 0x4a, 0xaa, 0x21, 0x22, 0xd4, 0xce, 0xbb, 0x23, 0xfb, 0x7a, 0xe0, 0xde, 0xf4, + 0x06, 0x17, 0xf6, 0xe0, 0xb2, 0xbe, 0x81, 0x0d, 0x40, 0x85, 0xd9, 0x03, 0xf7, 0xc6, 0xb9, 0xbe, + 0x74, 0x7a, 0xc3, 0x61, 0xdd, 0x48, 0x69, 0x87, 0xb7, 0xdd, 0x6e, 0x82, 0x15, 0x70, 0x0f, 0xaa, + 0x0a, 0xeb, 0x9f, 0xdb, 0x57, 0xbd, 0x8b, 0x7a, 0x31, 0x25, 0x1b, 0xd9, 0x5f, 0x7a, 0xd7, 0xb7, + 0xa3, 0xfa, 0xe6, 0xc9, 0x1d, 0x54, 0xb5, 0xb3, 0x72, 0x6e, 0x0d, 0x60, 0xdc, 0x4f, 0xcd, 0x44, + 0xa8, 0x8d, 0xfb, 0xb9, 0x79, 0x52, 0xb3, 0x9a, 0x55, 0x85, 0xf2, 0xb8, 0xbf, 0x9a, 0x23, 0xe9, + 0xe5, 0x8c, 0xb3, 0x3f, 0x06, 0x94, 0x9c, 0xc4, 0x31, 0x74, 0x60, 0xff, 0x92, 0xf0, 0xdc, 0x15, + 0x63, 0x78, 0xf4, 0x8f, 0xbb, 0x27, 0xaf, 0x55, 0xb3, 0xb9, 0x9e, 0x16, 0xab, 0xb1, 0x81, 0x37, + 0x80, 0xa9, 0x9e, 0xea, 0x46, 0x3e, 0x6b, 0x99, 0xbd, 0xa9, 0xcd, 0x57, 0x6b, 0x69, 0xd5, 0xf1, + 0x33, 0xa0, 0x43, 0x1e, 0x69, 0xc4, 0x33, 0xbb, 0x76, 0xb8, 0xb6, 0x44, 0x92, 0xcd, 0x46, 0x47, + 0x3e, 0xf1, 0x1d, 0xfd, 0xc4, 0x77, 0x7a, 0xc9, 0x13, 0x6f, 0x6d, 0xdc, 0x6d, 0x09, 0xe4, 0xdd, + 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd8, 0x47, 0x3e, 0xff, 0x18, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/protos/rover/rover.proto b/protos/rover/rover.proto index 156daff26..4863f4224 100644 --- a/protos/rover/rover.proto +++ b/protos/rover/rover.proto @@ -51,7 +51,7 @@ message WorkflowAction { string task_name = 1; string name = 2; string image = 3; - string timeout = 4; + int64 timeout = 4; string command = 5; string on_timeout = 6; string on_failure = 7;