Skip to content

Commit

Permalink
Added importing structure across proto files
Browse files Browse the repository at this point in the history
Currently we have couple of protos common between workflow and rover
merged them also added importing support across all proto files.
  • Loading branch information
parauliya committed Nov 21, 2019
1 parent aca143f commit c59a333
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 602 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ server := rover-linux-x86_64
cli := cmd/rover/rover-linux-x86_64
worker := worker/rover-worker-linux-x86_64
binaries := ${server} ${cli} ${worker}
all: ${binaries}
GOPATH := $(shell go env GOPATH)
all: proto-gen ${binaries}

.PHONY: server ${binaries} cli worker test
server: ${server}
Expand All @@ -18,6 +19,12 @@ ${cli}:
${worker}:
CGO_ENABLED=0 GOOS=linux go build -o $@ ./$(@D)

proto-gen:
protoc protos/template/template.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc protos/target/target.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc protos/workflow/workflow.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc -I$(GOPATH)/src --go_out=plugins=grpc:$(GOPATH)/src $(GOPATH)/src/github.com/packethost/rover/protos/rover/rover.proto

run: ${binaries}
docker-compose up -d --build db
docker-compose up --build server cli
Expand Down
19 changes: 10 additions & 9 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -366,7 +367,7 @@ func UpdateWorkflow(ctx context.Context, db *sql.DB, wf Workflow, state int32) e
return nil
}

func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error {
func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *workflowpb.WorkflowContext) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
Expand All @@ -392,7 +393,7 @@ func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *pb.Workflow
return nil
}

func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowContext, error) {
func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*workflowpb.WorkflowContext, error) {
query := `
SELECT current_worker, current_task_name, current_action_name, current_action_index, current_action_state, total_number_of_actions
FROM workflow_state
Expand All @@ -402,10 +403,10 @@ func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.Work
row := db.QueryRowContext(ctx, query, wfId)
var cw, ct, ca string
var cai, tact int64
var cas pb.ActionState
var cas workflowpb.ActionState
err := row.Scan(&cw, &ct, &ca, &cai, &cas, &tact)
if err == nil {
return &pb.WorkflowContext{
return &workflowpb.WorkflowContext{
WorkflowId: wfId,
CurrentWorker: cw,
CurrentTask: ct,
Expand All @@ -420,7 +421,7 @@ func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.Work
} else {
err = nil
}
return &pb.WorkflowContext{}, nil
return &workflowpb.WorkflowContext{}, nil
}

func GetWorkflowActions(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowActionList, error) {
Expand Down Expand Up @@ -448,7 +449,7 @@ func GetWorkflowActions(ctx context.Context, db *sql.DB, wfId string) (*pb.Workf
return &pb.WorkflowActionList{}, nil
}

func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.WorkflowActionStatus, time time.Time) error {
func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *workflowpb.WorkflowActionStatus, time time.Time) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
Expand All @@ -472,7 +473,7 @@ func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.W
}

// ShowWorkflowEvents returns all workflows
func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs pb.WorkflowActionStatus) error) error {
func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs workflowpb.WorkflowActionStatus) error) error {
rows, err := db.Query(`
SELECT worker_id, task_name, action_name, execution_time, message, status, created_at
FROM workflow_event
Expand Down Expand Up @@ -502,13 +503,13 @@ func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs pb.WorkflowActionSt
return err
}
createdAt, _ := ptypes.TimestampProto(evTime)
wfs := pb.WorkflowActionStatus{
wfs := workflowpb.WorkflowActionStatus{
WorkerId: id,
TaskName: tName,
ActionName: aName,
Seconds: secs,
Message: msg,
ActionStatus: pb.ActionState(status),
ActionStatus: workflowpb.ActionState(status),
CreatedAt: createdAt,
}
err = fn(wfs)
Expand Down
7 changes: 4 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
empty "github.com/golang/protobuf/ptypes/empty"
"github.com/packethost/rover/db"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -23,7 +24,7 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
return nil, status.Errorf(codes.InvalidArgument, "Worker not found for any workflows")
}

wfContexts := []*pb.WorkflowContext{}
wfContexts := []*workflowpb.WorkflowContext{}

for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
Expand Down Expand Up @@ -52,7 +53,7 @@ func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest,
}

// ReportActionStatus implements rover.ReportActionStatus
func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, sdb *sql.DB) (*empty.Empty, error) {
func ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionStatus, sdb *sql.DB) (*empty.Empty, error) {
wfID := req.GetWorkflowId()
if len(wfID) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
Expand All @@ -76,7 +77,7 @@ func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, s
// We need bunch of checks here considering
// Considering concurrency and network latencies & accuracy for proceeding of WF
actionIndex := wfContext.GetCurrentActionIndex()
if req.GetActionStatus() == pb.ActionState_ACTION_IN_PROGRESS {
if req.GetActionStatus() == workflowpb.ActionState_ACTION_IN_PROGRESS {
if wfContext.GetCurrentAction() != "" {
actionIndex = actionIndex + 1
}
Expand Down
3 changes: 2 additions & 1 deletion grpc-server/rover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/golang/protobuf/ptypes/empty"
exec "github.com/packethost/rover/executor"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
)

// GetWorkflowContexts implements rover.GetWorkflowContexts
Expand All @@ -19,6 +20,6 @@ func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowAct
}

// ReportActionStatus implements rover.ReportActionStatus
func (s *server) ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*empty.Empty, error) {
func (s *server) ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionStatus) (*empty.Empty, error) {
return exec.ReportActionStatus(context, req, s.db)
}
8 changes: 4 additions & 4 deletions grpc-server/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/packethost/rover/db"
"github.com/packethost/rover/metrics"
"github.com/packethost/rover/protos/rover"
"github.com/packethost/rover/protos/workflow"
workflowpb "github.com/packethost/rover/protos/workflow"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *server) ListWorkflows(_ *workflow.Empty, stream workflow.WorkflowSvc_Li
timer := prometheus.NewTimer(metrics.CacheDuration.With(labels))
defer timer.ObserveDuration()
err := db.ListWorkflows(s.db, func(w db.Workflow) error {
wf := &workflow.Workflow{
wf := &workflowpb.Workflow{
Id: w.ID,
Template: w.Template,
Target: w.Target,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s *server) GetWorkflowContext(ctx context.Context, in *workflow.GetRequest
labels["op"] = "get"
msg = "getting a workflow"

fn := func() (*rover.WorkflowContext, error) { return db.GetWorkflowContexts(ctx, s.db, in.Id) }
fn := func() (*workflowpb.WorkflowContext, error) { return db.GetWorkflowContexts(ctx, s.db, in.Id) }
metrics.CacheTotals.With(labels).Inc()
timer := prometheus.NewTimer(metrics.CacheDuration.With(labels))
defer timer.ObserveDuration()
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *server) ShowWorkflowEvents(req *workflow.GetRequest, stream workflow.Wo

timer := prometheus.NewTimer(metrics.CacheDuration.With(labels))
defer timer.ObserveDuration()
err := db.ShowWorkflowEvents(s.db, req.Id, func(w rover.WorkflowActionStatus) error {
err := db.ShowWorkflowEvents(s.db, req.Id, func(w workflowpb.WorkflowActionStatus) error {
wfs := &workflow.WorkflowActionStatus{
WorkerId: w.WorkerId,
TaskName: w.TaskName,
Expand Down
Loading

0 comments on commit c59a333

Please sign in to comment.