Skip to content

Commit

Permalink
Merge pull request #216 from infracloudio/fix_196
Browse files Browse the repository at this point in the history
Fix Issue #196: Filtered the workflow context sent to worker by server
  • Loading branch information
parauliya authored Aug 4, 2020
2 parents 64d8cc2 + 42621d5 commit c0a4a29
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 183 deletions.
1 change: 0 additions & 1 deletion cmd/tink-worker/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string)
if err != nil {
return fmt.Sprintf("Failed to pull Image : %s", action.GetImage()), 1, errors.Wrap(err, "DOCKER PULL")
}

id, err := createContainer(ctx, action, action.Command, wfID)
if err != nil {
return fmt.Sprintf("Failed to create container"), 1, errors.Wrap(err, "DOCKER CREATE")
Expand Down
70 changes: 17 additions & 53 deletions cmd/tink-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,20 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
log = logger.WithField("worker_id", workerID)
ctx := context.Background()
var err error
cli, err = initializeDockerClient()
if err != nil {
return err
}
for {
err := fetchLatestContext(ctx, client, workerID)
if err != nil {
return err
}

if allWorkflowsFinished() {
log.Infoln("All workflows finished")
return nil
}

cli, err = initializeDockerClient()
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
fmt.Println("failed to get context")
}

for wfID, wfContext := range workflowcontexts {
actions, ok := workflowactions[wfID]
if !ok {
for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() {
wfID := wfContext.GetWorkflowId()
actions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID})
if err != nil {
return fmt.Errorf("can't find actions for workflow %s", wfID)
}

Expand Down Expand Up @@ -121,9 +116,6 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
}
log.Printf("Starting with action %s\n", actions.GetActionList()[actionIndex])
} else {
log.Infof("Sleep for %d seconds\n", retryInterval)
time.Sleep(retryInterval)
}

for turn {
Expand Down Expand Up @@ -176,6 +168,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
if rerr != nil {
exitWithGrpcError(rerr)
}
delete(workflowcontexts, wfID)
return err
}

Expand All @@ -193,6 +186,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {

if len(actions.GetActionList()) == actionIndex+1 {
log.Infoln("Reached to end of workflow")
delete(workflowcontexts, wfID)
turn = false
break
}
Expand All @@ -205,41 +199,11 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
}
}
// sleep for 3 seconds before asking for new workflows
time.Sleep(retryInterval * time.Second)
}
}

func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error {
log.Infof("Fetching latest context for worker %s\n", workerID)
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
}
for _, wfContext := range res.GetWorkflowContexts() {
workflowcontexts[wfContext.WorkflowId] = wfContext
if _, ok := workflowactions[wfContext.WorkflowId]; !ok {
wfActions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfContext.WorkflowId})
if err != nil {
return err
}
workflowactions[wfContext.WorkflowId] = wfActions
}
}
return nil
}

func allWorkflowsFinished() bool {
for wfID, wfContext := range workflowcontexts {
actions := workflowactions[wfID]
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_FAILED || wfContext.GetCurrentActionState() == pb.ActionState_ACTION_TIMEOUT {
continue
}
if !(wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions)) {
return false
}
}
return true
}

func exitWithGrpcError(err error) {
if err != nil {
errStatus, _ := status.FromError(err)
Expand All @@ -257,8 +221,8 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action
for r := 1; r <= retries; r++ {
_, err = client.ReportActionStatus(ctx, actionStatus)
if err != nil {
log.Println("Report action status to server failed as : ", err)
log.Printf("Retrying after %v seconds", retryInterval)
log.Errorln("Report action status to server failed as : ", err)
log.Errorf("Retrying after %v seconds", retryInterval)
<-time.After(retryInterval * time.Second)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func GetWorkflowDataVersion(ctx context.Context, db *sql.DB, workflowID string)
return getLatestVersionWfData(ctx, db, workflowID)
}

// GetfromWfWorkflowTable : gives you the current workflow
func GetfromWfWorkflowTable(ctx context.Context, db *sql.DB, id string) ([]string, error) {
// GetfromWfWorkflowTable : returns the list of workflows for a particular worker
func GetfromWfWorkflowTable(db *sql.DB, id string) ([]string, error) {
rows, err := db.Query(`
SELECT workflow_id
FROM workflow_worker_map
Expand Down
58 changes: 45 additions & 13 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"time"

"github.com/tinkerbell/tink/db"
Expand All @@ -17,28 +18,25 @@ var (
)

// GetWorkflowContexts implements tinkerbell.GetWorkflowContexts
func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest, sdb *sql.DB) (*pb.WorkflowContextList, error) {
func GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_GetWorkflowContextsServer, sdb *sql.DB) error {
if len(req.WorkerId) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "worker_id is invalid")
return status.Errorf(codes.InvalidArgument, "worker_id is invalid")
}
wfs, _ := db.GetfromWfWorkflowTable(context, sdb, req.WorkerId)
wfs, _ := db.GetfromWfWorkflowTable(sdb, req.WorkerId)
if wfs == nil {
return nil, status.Errorf(codes.InvalidArgument, "worker not found for any workflows")
return status.Errorf(codes.InvalidArgument, "No workflow found for worker %s ", req.GetWorkerId())
}

wfContexts := []*pb.WorkflowContext{}

for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
wfContext, err := db.GetWorkflowContexts(context.Background(), sdb, wf)
if err != nil {
return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
return status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
}
if isApplicableToSend(context.Background(), wfContext, req.WorkerId, sdb) {
stream.Send(wfContext)
}
wfContexts = append(wfContexts, wfContext)
}

return &pb.WorkflowContextList{
WorkflowContexts: wfContexts,
}, nil
return nil
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
Expand Down Expand Up @@ -160,3 +158,37 @@ func GetWorkflowDataVersion(context context.Context, workflowID string, sdb *sql
}
return &pb.GetWorkflowDataResponse{Version: version}, nil
}

// The below function check whether a particular workflow context is applicable or needed to
// be send to a worker based on the state of the current action and the targeted workerID.
func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext, workerID string, sdb *sql.DB) bool {
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_FAILED ||
wfContext.GetCurrentActionState() == pb.ActionState_ACTION_TIMEOUT {
return false
}
actions, err := GetWorkflowActions(context, &pb.WorkflowActionsRequest{WorkflowId: wfContext.GetWorkflowId()}, sdb)
if err != nil {
return false
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS {
if isLastAction(wfContext, actions) {
return false
}
if wfContext.GetCurrentActionIndex() == 0 {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
}
} else {
if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
}
return false
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}
4 changes: 2 additions & 2 deletions grpc-server/tinkerbell.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

// GetWorkflowContexts implements tinkerbell.GetWorkflowContexts
func (s *server) GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest) (*pb.WorkflowContextList, error) {
return exec.GetWorkflowContexts(context, req, s.db)
func (s *server) GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_GetWorkflowContextsServer) error {
return exec.GetWorkflowContexts(req, stream, s.db)
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
Expand Down
Loading

0 comments on commit c0a4a29

Please sign in to comment.