Skip to content

Commit

Permalink
Added a new rpc call workflow.GetWorkflowContextList to be used by bo…
Browse files Browse the repository at this point in the history
…ots (tinkerbell#237)

## Description
As the worker starts, Boots checks [if there is a workflow](https://github.com/tinkerbell/boots/blob/master/packet/endpoints.go#L48) defined for that worker. If yes, it will continue to PXE and it will not otherwise. (more context [here](tinkerbell/smee#50)).

The PR introduces a new RPC that can be used by Boots to get the workflow contexts without having to maintain a gRPC stream. 

## Why is this needed

Recently there have been [changes](tinkerbell#222) in Tink, to allow a worker to maintain a gRPC stream with the server to get the workflows. The same RPC was being by boots. 

## How Has This Been Tested?
Still a WIP. 

## How are existing users impacted? What migration steps/scripts do we need?

No direct impact on users.

## Checklist:

I have:

- [ ] updated the documentation and/or roadmap (if required)
- [ ] added unit or e2e tests
- [ ] provided instructions on how to upgrade
  • Loading branch information
mergify[bot] authored Aug 6, 2020
2 parents 316396c + 1ab6de4 commit 0f947d8
Show file tree
Hide file tree
Showing 7 changed files with 1,405 additions and 689 deletions.
4 changes: 2 additions & 2 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func GetWorkflowDataVersion(ctx context.Context, db *sql.DB, workflowID string)
return getLatestVersionWfData(ctx, db, workflowID)
}

// GetfromWfWorkflowTable : returns the list of workflows for a particular worker
func GetfromWfWorkflowTable(db *sql.DB, id string) ([]string, error) {
// GetWorkflowsForWorker : returns the list of workflows for a particular worker
func GetWorkflowsForWorker(db *sql.DB, id string) ([]string, error) {
rows, err := db.Query(`
SELECT workflow_id
FROM workflow_worker_map
Expand Down
40 changes: 33 additions & 7 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@ var (

// GetWorkflowContexts implements tinkerbell.GetWorkflowContexts
func GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_GetWorkflowContextsServer, sdb *sql.DB) error {
if len(req.WorkerId) == 0 {
return status.Errorf(codes.InvalidArgument, "worker_id is invalid")
}
wfs, _ := db.GetfromWfWorkflowTable(sdb, req.WorkerId)
if wfs == nil {
return status.Errorf(codes.InvalidArgument, "No workflow found for worker %s ", req.GetWorkerId())
wfs, err := getWorkflowsForWorker(sdb, req.WorkerId)
if err != nil {
return err
}

for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context.Background(), sdb, wf)
if err != nil {
Expand All @@ -39,6 +35,36 @@ func GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.WorkflowSvc_G
return nil
}

// GetWorkflowContextList implements tinkerbell.GetWorkflowContextList
func GetWorkflowContextList(context context.Context, req *pb.WorkflowContextRequest, sdb *sql.DB) (*pb.WorkflowContextList, error) {
wfs, err := getWorkflowsForWorker(sdb, req.WorkerId)
if err != nil {
return nil, err
}
wfContexts := []*pb.WorkflowContext{}
for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
if err != nil {
return nil, status.Errorf(codes.Aborted, "Invalid workflow %s found for worker %s", wf, req.WorkerId)
}
wfContexts = append(wfContexts, wfContext)
}
return &pb.WorkflowContextList{
WorkflowContexts: wfContexts,
}, nil
}

func getWorkflowsForWorker(sdb *sql.DB, id string) ([]string, error) {
if len(id) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "worker_id is invalid")
}
wfs, _ := db.GetWorkflowsForWorker(sdb, id)
if wfs == nil {
return nil, status.Errorf(codes.InvalidArgument, "Worker not found for any workflows")
}
return wfs, nil
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest, sdb *sql.DB) (*pb.WorkflowActionList, error) {
wfID := req.GetWorkflowId()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ require (
go.uber.org/atomic v1.2.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.7.1 // indirect
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.23.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.3
gotest.tools v2.2.0+incompatible // indirect
Expand Down
5 changes: 5 additions & 0 deletions grpc-server/tinkerbell.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ func (s *server) GetWorkflowContexts(req *pb.WorkflowContextRequest, stream pb.W
return exec.GetWorkflowContexts(req, stream, s.db)
}

// GetWorkflowContextList implements tinkerbell.GetWorkflowContextList
func (s *server) GetWorkflowContextList(context context.Context, req *pb.WorkflowContextRequest) (*pb.WorkflowContextList, error) {
return exec.GetWorkflowContextList(context, req, s.db)
}

// GetWorkflowActions implements tinkerbell.GetWorkflowActions
func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest) (*pb.WorkflowActionList, error) {
return exec.GetWorkflowActions(context, req, s.db)
Expand Down
Loading

0 comments on commit 0f947d8

Please sign in to comment.