Skip to content

Commit

Permalink
Added "rover workflow state" cli
Browse files Browse the repository at this point in the history
  • Loading branch information
parauliya committed Nov 6, 2019
1 parent 1fad471 commit 6482276
Show file tree
Hide file tree
Showing 8 changed files with 500 additions and 89 deletions.
71 changes: 71 additions & 0 deletions cmd/rover/cmd/workflow/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package workflow

import (
"context"
"fmt"
"log"
"os"
"strconv"

"github.com/jedib0t/go-pretty/table"
"github.com/packethost/rover/client"
"github.com/packethost/rover/protos/workflow"
"github.com/spf13/cobra"
)

// getCmd represents the get subcommand for workflow command
var stateCmd = &cobra.Command{
Use: "state [id]",
Short: "get the current workflow context",
Example: "rover workflow state [id]",
Args: func(c *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("%v requires an argument", c.UseLine())
}
return validateID(args[0])
},
Run: func(c *cobra.Command, args []string) {
for _, arg := range args {
req := workflow.GetRequest{Id: arg}
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"Field Name", "Values"})
wf, err := client.WorkflowClient.GetWorkflowContext(context.Background(), &req)
if err != nil {
log.Fatal(err)
}
wfProgress := "0%"
wfProgress = calWorkflowProgress(wf.CurrentActionIndex, wf.TotalNumberOfActions, int(wf.CurrentActionState))
t.AppendRow(table.Row{"Workflow ID", wf.WorkflowId})
t.AppendRow(table.Row{"Workflow Progress", wfProgress})
t.AppendRow(table.Row{"Current Task", wf.CurrentTask})
t.AppendRow(table.Row{"Current Action", wf.CurrentAction})
t.AppendRow(table.Row{"Current Worker", wf.CurrentWorker})
t.AppendRow(table.Row{"Current Action State", wf.CurrentActionState})

t.Render()

}
},
}

func calWorkflowProgress(cur int64, total int64, state int) string {
if total == 0 || (cur == 0 && state != 2) {
return "0%"
}
var taskCompleted int64
if state == 2 {
taskCompleted = cur + 1
} else {
taskCompleted = cur
}
progress := (taskCompleted * 100) / total
fmt.Println("Value of progress is ", progress)
perc := strconv.Itoa(int(progress)) + "%"

return fmt.Sprintf("%s", perc)
}

func init() {
SubCommands = append(SubCommands, stateCmd)
}
34 changes: 17 additions & 17 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func insertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
return errors.Wrap(err, "Invalid Template")
}
var actionList []pb.WorkflowAction
var totalActions int64
var uniqueWorkerID uuid.UUID
for _, task := range wfymldata.Tasks {
workerID, err := getWorkerID(ctx, db, task.WorkerAddr)
Expand Down Expand Up @@ -148,26 +149,24 @@ func insertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
OnFailure: ac.OnFailure,
}
actionList = append(actionList, action)
totalActions++
}
}
actionData, err := json.Marshal(actionList)
if err != nil {
return err
}
/*tx, err = db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
}*/

_, err = tx.Exec(`
INSERT INTO
workflow_state (workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index)
workflow_state (workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index, total_number_of_actions)
VALUES
($1, $2, $3, $4, $5, $6, $7)
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (workflow_id)
DO
UPDATE SET
(workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index) = ($1, $2, $3, $4, $5, $6, $7);
`, id, "", "", "", 0, actionData, 0)
(workflow_id, current_worker, current_task_name, current_action_name, current_action_state, action_list, current_action_index, total_number_of_actions) = ($1, $2, $3, $4, $5, $6, $7, $8);
`, id, "", "", "", 0, actionData, 0, totalActions)
if err != nil {
return errors.Wrap(err, "INSERT in to workflow_state")
}
Expand Down Expand Up @@ -382,24 +381,25 @@ func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *pb.Workflow

func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowContext, error) {
query := `
SELECT current_worker, current_task_name, current_action_name, current_action_index, current_action_state
SELECT current_worker, current_task_name, current_action_name, current_action_index, current_action_state, total_number_of_actions
FROM workflow_state
WHERE
workflow_id = $1;
`
row := db.QueryRowContext(ctx, query, wfId)
var cw, ct, ca string
var cai int64
var cai, tact int64
var cas pb.ActionState
err := row.Scan(&cw, &ct, &ca, &cai, &cas)
err := row.Scan(&cw, &ct, &ca, &cai, &cas, &tact)
if err == nil {
return &pb.WorkflowContext{
WorkflowId: wfId,
CurrentWorker: cw,
CurrentTask: ct,
CurrentAction: ca,
CurrentActionIndex: cai,
CurrentActionState: cas}, nil
WorkflowId: wfId,
CurrentWorker: cw,
CurrentTask: ct,
CurrentAction: ca,
CurrentActionIndex: cai,
CurrentActionState: cas,
TotalNumberOfActions: tact}, nil
}
if err != sql.ErrNoRows {
err = errors.Wrap(err, "SELECT from worflow_state")
Expand Down
1 change: 1 addition & 0 deletions deploy/docker-entrypoint-initdb.d/rover-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CREATE TABLE IF NOT EXISTS workflow_state (
, current_worker VARCHAR(200)
, action_list JSONB
, current_action_index int
, total_number_of_actions INT
);

CREATE INDEX IF NOT EXISTS idx_wfid ON workflow_state (workflow_id);
Expand Down
42 changes: 42 additions & 0 deletions grpc-server/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/packethost/rover/db"
"github.com/packethost/rover/metrics"
"github.com/packethost/rover/protos/rover"
"github.com/packethost/rover/protos/workflow"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -42,6 +43,9 @@ func (s *server) CreateWorkflow(ctx context.Context, in *workflow.CreateRequest)
State: workflow.State_value[workflow.State_PENDING.String()],
}
data, err := createYaml(ctx, s.db, in.Template, in.Target)
if err != nil {
return errors.Wrap(err, "Failed to create Yaml")
}
err = db.CreateWorkflow(ctx, s.db, wf, data, id)
if err != nil {
return err
Expand Down Expand Up @@ -181,6 +185,44 @@ func (s *server) ListWorkflows(_ *workflow.Empty, stream workflow.WorkflowSvc_Li
return nil
}

func (s *server) GetWorkflowContext(ctx context.Context, in *workflow.GetRequest) (*workflow.WorkflowContext, error) {
logger.Info("GetworkflowContext")
labels := prometheus.Labels{"method": "GetWorkflowContext", "op": ""}
metrics.CacheInFlight.With(labels).Inc()
defer metrics.CacheInFlight.With(labels).Dec()

msg := ""
labels["op"] = "get"
msg = "getting a workflow"

fn := func() (*rover.WorkflowContext, error) { return db.GetWorkflowContexts(ctx, s.db, in.Id) }
metrics.CacheTotals.With(labels).Inc()
timer := prometheus.NewTimer(metrics.CacheDuration.With(labels))
defer timer.ObserveDuration()

logger.Info(msg)
w, err := fn()
logger.Info("done " + msg)
if err != nil {
metrics.CacheErrors.With(labels).Inc()
l := logger
if pqErr := db.Error(err); pqErr != nil {
l = l.With("detail", pqErr.Detail, "where", pqErr.Where)
}
l.Error(err)
}
wf := &workflow.WorkflowContext{
WorkflowId: w.WorkflowId,
CurrentWorker: w.CurrentWorker,
CurrentTask: w.CurrentTask,
CurrentAction: w.CurrentAction,
CurrentActionIndex: w.CurrentActionIndex,
CurrentActionState: workflow.ActionState(w.CurrentActionState),
TotalNumberOfActions: w.TotalNumberOfActions,
}
return wf, err
}

func createYaml(ctx context.Context, sqlDB *sql.DB, temp string, tar string) (string, error) {
tempData, err := db.GetTemplate(ctx, sqlDB, temp)
if err != nil {
Expand Down
99 changes: 54 additions & 45 deletions protos/rover/rover.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protos/rover/rover.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message WorkflowContext {
string current_action = 4;
int64 current_action_index = 5;
ActionState current_action_state = 6;
int64 total_number_of_actions = 7;
}

message WorkflowContextList {
Expand Down
Loading

0 comments on commit 6482276

Please sign in to comment.