Skip to content

Commit

Permalink
Fixed the issue tinkerbell#222 as well in this PR
Browse files Browse the repository at this point in the history
Following things are fixed:
1. Modified the filter from the server side while sending the workflow context
2. Added a gRPC stream for getting the workflow context
3. Continue the loop for worker even after completing the workflow
  • Loading branch information
parauliya committed Jul 30, 2020
1 parent 4688164 commit ec40f7f
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 189 deletions.
1 change: 0 additions & 1 deletion cmd/tink-worker/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string)
if err != nil {
return fmt.Sprintf("Failed to pull Image : %s", action.GetImage()), 1, errors.Wrap(err, "DOCKER PULL")
}

id, err := createContainer(ctx, action, action.Command, wfID)
if err != nil {
return fmt.Sprintf("Failed to create container"), 1, errors.Wrap(err, "DOCKER CREATE")
Expand Down
64 changes: 13 additions & 51 deletions cmd/tink-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,20 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
log = logger.WithField("worker_id", workerID)
ctx := context.Background()
var err error
cli, err = initializeDockerClient()
if err != nil {
return err
}
for {
err := fetchLatestContext(ctx, client, workerID)
if err != nil {
return err
}

if allWorkflowsFinished() {
log.Infoln("All workflows finished")
return nil
}

cli, err = initializeDockerClient()
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
fmt.Println("failed to get context")
}

for wfID, wfContext := range workflowcontexts {
actions, ok := workflowactions[wfID]
if !ok {
for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() {
wfID := wfContext.GetWorkflowId()
actions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID})
if err != nil {
return fmt.Errorf("can't find actions for workflow %s", wfID)
}

Expand Down Expand Up @@ -121,9 +116,6 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
}
log.Printf("Starting with action %s\n", actions.GetActionList()[actionIndex])
} else {
log.Infof("Sleep for %d seconds\n", retryInterval)
time.Sleep(retryInterval)
}

for turn {
Expand Down Expand Up @@ -207,41 +199,11 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}
}
}
// sleep for 3 seconds before asking for new workflows
time.Sleep(retryInterval * time.Second)
}
}

func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error {
log.Debugf("Fetching latest context for worker %s\n", workerID)
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
}
for _, wfContext := range res.GetWorkflowContexts() {
workflowcontexts[wfContext.WorkflowId] = wfContext
if _, ok := workflowactions[wfContext.WorkflowId]; !ok {
wfActions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfContext.WorkflowId})
if err != nil {
return err
}
workflowactions[wfContext.WorkflowId] = wfActions
}
}
return nil
}

func allWorkflowsFinished() bool {
for wfID, wfContext := range workflowcontexts {
actions := workflowactions[wfID]
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_FAILED || wfContext.GetCurrentActionState() == pb.ActionState_ACTION_TIMEOUT {
continue
}
if !(wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions)) {
return false
}
}
return true
}

func exitWithGrpcError(err error) {
if err != nil {
errStatus, _ := status.FromError(err)
Expand Down
4 changes: 2 additions & 2 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func GetWorkflowDataVersion(ctx context.Context, db *sql.DB, workflowID string)
return getLatestVersionWfData(ctx, db, workflowID)
}

// GetfromWfWorkflowTable : gives you the current workflow
func GetfromWfWorkflowTable(ctx context.Context, db *sql.DB, id string) ([]string, error) {
// GetfromWfWorkflowTable : returns the list of workflows for a particular worker
func GetfromWfWorkflowTable(db *sql.DB, id string) ([]string, error) {
rows, err := db.Query(`
SELECT workflow_id
FROM workflow_worker_map
Expand Down
40 changes: 19 additions & 21 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,25 @@ var (
)

// GetWorkflowContexts implements tinkerbell.GetWorkflowContexts
func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest, sdb *sql.DB) (*pb.WorkflowContextList, error) {
func GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_GetWorkflowContextsServer, sdb *sql.DB) error {
if len(req.WorkerId) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "worker_id is invalid")
return status.Errorf(codes.InvalidArgument, "worker_id is invalid")
}
wfs, _ := db.GetfromWfWorkflowTable(context, sdb, req.WorkerId)
wfs, _ := db.GetfromWfWorkflowTable(sdb, req.WorkerId)
if wfs == nil {
return nil, status.Errorf(codes.InvalidArgument, "worker not found for any workflows")
return status.Errorf(codes.InvalidArgument, "No workflow found for worker %s ", req.GetWorkerId())
}

wfContexts := []*pb.WorkflowContext{}

for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
wfContext, err := db.GetWorkflowContexts(context.Background(), sdb, wf)
if err != nil {
return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
return status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
}
if isApplicableToSend(context, wfContext, req.WorkerId, sdb) {
wfContexts = append(wfContexts, wfContext)
if isApplicableToSend(context.Background(), wfContext, req.WorkerId, sdb) {
stream.Send(wfContext)
}
}

return &pb.WorkflowContextList{
WorkflowContexts: wfContexts,
}, nil
return nil
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
Expand Down Expand Up @@ -173,15 +168,18 @@ func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext,
if err != nil {
return false
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions) {
log.Println("This workflow is completed ", wfContext.GetWorkflowId())
} else if int(wfContext.GetCurrentActionIndex()) == 0 {
if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS {
if isLastAction(wfContext, actions) {
return false
}
if wfContext.GetCurrentActionIndex() == 0 {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
}
} else {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID {
if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
Expand Down
4 changes: 2 additions & 2 deletions grpc-server/tinkerbell.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

// GetWorkflowContexts implements tinkerbell.GetWorkflowContexts
func (s *server) GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest) (*pb.WorkflowContextList, error) {
return exec.GetWorkflowContexts(context, req, s.db)
func (s *server) GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_GetWorkflowContextsServer) error {
return exec.GetWorkflowContexts(req, stream, s.db)
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
Expand Down
Loading

0 comments on commit ec40f7f

Please sign in to comment.