Skip to content

Commit

Permalink
Create db function to run end to end workflow in respect of db changes
Browse files Browse the repository at this point in the history
  • Loading branch information
parauliya authored and gauravgahlot committed Nov 4, 2019
1 parent b103eb2 commit 0c3e2be
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 104 deletions.
95 changes: 90 additions & 5 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,14 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.

_, err = tx.Exec(`
INSERT INTO
workflow_state (workflow_id, action_list, current_action_index)
workflow_state (workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index)
VALUES
($1, $2, $3)
($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (workflow_id)
DO
UPDATE SET
(workflow_id, action_list, current_action_index) = ($1, $2, $3);
`, id, actionData, 0)
(workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index) = ($1, $2, $3, $4, $5, $6, $7);
`, id, "", "", "", 0, actionData, 0)
if err != nil {
return errors.Wrap(err, "INSERT in to workflow_state")
}
Expand All @@ -279,6 +279,36 @@ func InsertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
return nil
}

func GetfromWfWorkflowTable(ctx context.Context, db *sql.DB, id string) ([]string, error) {
rows, err := db.Query(`
SELECT workflow_id
FROM workflow_worker_map
WHERE
worker_id = $1;
`, id)
if err != nil {
return nil, err
}
var wfID []string
defer rows.Close()
var workerId string

for rows.Next() {
err = rows.Scan(&workerId)
if err != nil {
err = errors.Wrap(err, "SELECT from worflow_worker_map")
logger.Error(err)
return nil, err
}
wfID = append(wfID, workerId)
}
err = rows.Err()
if err == sql.ErrNoRows {
return nil, nil
}
return wfID, err
}

// GetWorkflow returns a workflow
func GetWorkflow(ctx context.Context, db *sql.DB, id string) (Workflow, error) {
query := `
Expand Down Expand Up @@ -429,7 +459,7 @@ func UpdateWorkflow(ctx context.Context, db *sql.DB, wf Workflow, state int32) e
return nil
}

func UpdateWorkflowStateTable(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error {
func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
Expand All @@ -455,6 +485,61 @@ func UpdateWorkflowStateTable(ctx context.Context, db *sql.DB, wfContext *pb.Wor
return nil
}

func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowContext, error) {
query := `
SELECT current_worker, current_task_name, current_action_name, current_action_index, current_action_state
FROM workflow_state
WHERE
workflow_id = $1;
`
row := db.QueryRowContext(ctx, query, wfId)
var cw, ct, ca string
var cai int64
var cas pb.ActionState
err := row.Scan(&cw, &ct, &ca, &cai, &cas)
if err == nil {
return &pb.WorkflowContext{
WorkflowId: wfId,
CurrentWorker: cw,
CurrentTask: ct,
CurrentAction: ca,
CurrentActionIndex: cai,
CurrentActionState: cas}, nil
}
if err != sql.ErrNoRows {
err = errors.Wrap(err, "SELECT from worflow_state")
logger.Error(err)
} else {
err = nil
}
return &pb.WorkflowContext{}, nil
}

func GetWorkflowActions(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowActionList, error) {
query := `
SELECT action_list
FROM workflow_state
WHERE
workflow_id = $1;
`
row := db.QueryRowContext(ctx, query, wfId)
var actionList string
err := row.Scan(&actionList)
if err == nil {
actions := []*pb.WorkflowAction{}
err = json.Unmarshal([]byte(actionList), &actions)
return &pb.WorkflowActionList{
ActionList: actions}, nil
}
if err != sql.ErrNoRows {
err = errors.Wrap(err, "SELECT from worflow_state")
logger.Error(err)
} else {
err = nil
}
return &pb.WorkflowActionList{}, nil
}

func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.WorkflowActionStatus) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion deploy/docker-entrypoint-initdb.d/rover-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ CREATE TABLE IF NOT EXISTS workflow_state (
, current_task_name VARCHAR(200)
, current_action_name VARCHAR(200)
, current_action_state SMALLINT
, current_worker UUID
, current_worker VARCHAR(200)
, action_list JSONB
, current_action_index int
);
Expand Down
69 changes: 0 additions & 69 deletions executor/data.go

This file was deleted.

42 changes: 22 additions & 20 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
pb "github.com/packethost/rover/protos/rover"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
yaml "gopkg.in/yaml.v2"
)

var (
workflowcontexts = map[string]*pb.WorkflowContext{}
workflowactions = map[string]*pb.WorkflowActionList{}
workers = map[string][]string{}
//workflowcontexts = map[string]*pb.WorkflowContext{}
//workflowactions = map[string]*pb.WorkflowActionList{}
//workers = map[string][]string{}
)

// LoadWorkflow loads workflow in memory and polulates required constructs
func LoadWorkflow(id, data string) error {
// LoadWorkflow loads workflow in memory and populates required constructs
/*func LoadWorkflow(id, data string) error {
var wf Workflow
err := yaml.Unmarshal([]byte(data), &wf)
if err != nil {
Expand All @@ -36,6 +35,7 @@ func LoadWorkflow(id, data string) error {
return nil
}
func updateWorkflowActions(id string, tasks []Task) {
list := []*pb.WorkflowAction{}
for _, task := range tasks {
Expand All @@ -61,23 +61,25 @@ func updateWorkflowActions(id string, tasks []Task) {
}
}
workflowactions[id] = &pb.WorkflowActionList{ActionList: list}
}
}*/

// GetWorkflowContexts implements rover.GetWorkflowContexts
func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest) (*pb.WorkflowContextList, error) {
func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest, sdb *sql.DB) (*pb.WorkflowContextList, error) {
if len(req.WorkerId) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "worker_id is invalid")
}
wfs, ok := workers[req.WorkerId]
if !ok {
wfs, _ := db.GetfromWfWorkflowTable(context, sdb, req.WorkerId)
//wfs, ok := workers[req.WorkerId]
if wfs == nil {
return nil, status.Errorf(codes.InvalidArgument, "Worker not found for any workflows")
}

wfContexts := []*pb.WorkflowContext{}

for _, wf := range wfs {
wfContext, ok := workflowcontexts[wf]
if !ok {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
//wfContext, ok := workflowcontexts[wf]
if err != nil {
return nil, status.Errorf(codes.Aborted, "Invalid workflow %s found for worker %s", wf, req.WorkerId)
}
wfContexts = append(wfContexts, wfContext)
Expand All @@ -89,13 +91,13 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
}

// GetWorkflowActions implements rover.GetWorkflowActions
func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest) (*pb.WorkflowActionList, error) {
func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest, sdb *sql.DB) (*pb.WorkflowActionList, error) {
wfID := req.GetWorkflowId()
if len(wfID) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
}
actions, ok := workflowactions[wfID]
if !ok {
actions, err := db.GetWorkflowActions(context, sdb, wfID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
}
return actions, nil
Expand All @@ -114,12 +116,12 @@ func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, s
return nil, status.Errorf(codes.InvalidArgument, "action_name is invalid")
}
fmt.Printf("Received action status: %s\n", req)
wfContext, ok := workflowcontexts[wfID]
if !ok {
wfContext, err := db.GetWorkflowContexts(context, sdb, wfID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Workflow context not found for workflow %s", wfID)
}
wfActions, ok := workflowactions[wfID]
if !ok {
wfActions, err := db.GetWorkflowActions(context, sdb, wfID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Workflow actions not found for workflow %s", wfID)
}

Expand All @@ -143,7 +145,7 @@ func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, s
wfContext.CurrentAction = req.GetActionName()
wfContext.CurrentActionState = req.GetActionStatus()
wfContext.CurrentActionIndex = actionIndex
err := db.UpdateWorkflowStateTable(context, sdb, wfContext)
err = db.UpdateWorkflowState(context, sdb, wfContext)
if err != nil {
return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_state table. Error : %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions grpc-server/rover.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

// GetWorkflowContexts implements rover.GetWorkflowContexts
func (s *server) GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest) (*pb.WorkflowContextList, error) {
return exec.GetWorkflowContexts(context, req)
return exec.GetWorkflowContexts(context, req, s.db)
}

// GetWorkflowActions implements rover.GetWorkflowActions
func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest) (*pb.WorkflowActionList, error) {
return exec.GetWorkflowActions(context, req)
return exec.GetWorkflowActions(context, req, s.db)
}

// ReportActionStatus implements rover.ReportActionStatus
Expand Down
14 changes: 7 additions & 7 deletions grpc-server/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"text/template"

"github.com/packethost/rover/db"
exec "github.com/packethost/rover/executor"
//exec "github.com/packethost/rover/executor"
"github.com/packethost/rover/metrics"
"github.com/packethost/rover/protos/workflow"
"github.com/pkg/errors"
Expand All @@ -34,7 +34,7 @@ func (s *server) CreateWorkflow(ctx context.Context, in *workflow.CreateRequest)
labels["op"] = "createworkflow"
msg = "creating a new workflow"
id := uuid.NewV4()
var data string
//var data string
fn := func() error {
wf := db.Workflow{
ID: id.String(),
Expand Down Expand Up @@ -62,13 +62,13 @@ func (s *server) CreateWorkflow(ctx context.Context, in *workflow.CreateRequest)
timer := prometheus.NewTimer(metrics.CacheDuration.With(labels))
defer timer.ObserveDuration()

err := exec.LoadWorkflow(id.String(), data)
if err != nil {
return &workflow.CreateResponse{}, err
}
//err := exec.LoadWorkflow(id.String(), data)
//if err != nil {
// return &workflow.CreateResponse{}, err
//}

logger.Info(msg)
err = fn()
err := fn()
logger.Info("done " + msg)
if err != nil {
metrics.CacheErrors.With(labels).Inc()
Expand Down

0 comments on commit 0c3e2be

Please sign in to comment.