Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename grpc pipeline to workflow #2173

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)

func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc {
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.LogFunc {
return func(step *backend.Step, rc multipart.Reader) error {
loglogger := logger.With().
Str("image", step.Image).
Expand All @@ -41,7 +41,7 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
uploads.Add(1)

var secrets []string
for _, secret := range work.Config.Secrets {
for _, secret := range workflow.Config.Secrets {
if secret.Mask {
secrets = append(secrets, secret.Value)
}
Expand Down
30 changes: 15 additions & 15 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
}, nil
}

// Next returns the next pipeline in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) {
// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
Expand Down Expand Up @@ -115,17 +115,17 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
return nil, nil
}

p := new(rpc.Pipeline)
p.ID = res.GetPipeline().GetId()
p.Timeout = res.GetPipeline().GetTimeout()
p.Config = new(backend.Config)
if err := json.Unmarshal(res.GetPipeline().GetPayload(), p.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pipeline config of '%s'", p.ID)
w := new(rpc.Workflow)
w.ID = res.GetPipeline().GetId()
w.Timeout = res.GetPipeline().GetTimeout()
w.Config = new(backend.Config)
if err := json.Unmarshal(res.GetPipeline().GetPayload(), w.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal workflow config of '%s'", w.ID)
}
return p, nil
return w, nil
}

// Wait blocks until the pipeline is complete.
// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
Expand Down Expand Up @@ -159,7 +159,7 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
return nil
}

// Init signals the pipeline is initialized.
// Init signals the workflow is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.InitRequest)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
return nil
}

// Done signals the pipeline is complete.
// Done signals the work is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro
return nil
}

// Extend extends the pipeline deadline
// Extend extends the workflow deadline
func (c *client) Extend(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
return nil
}

// Update updates the pipeline state.
// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
return nil
}

// Log writes the pipeline log entry.
// Log writes the workflow log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
Expand Down
4 changes: 2 additions & 2 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)

func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, work *rpc.Pipeline) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
steplogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Expand All @@ -50,7 +50,7 @@ func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, wo
defer func() {
steplogger.Debug().Msg("update step status")

if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil {
if uerr := r.client.Update(ctxmeta, workflow.ID, stepState); uerr != nil {
steplogger.Debug().
Err(uerr).
Msg("update step status error")
Expand Down
22 changes: 11 additions & 11 deletions pipeline/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
Labels map[string]string `json:"labels"`
}

// State defines the pipeline state.
// State defines the workflow state.
State struct {
Step string `json:"step"`
Exited bool `json:"exited"`
Expand All @@ -37,8 +37,8 @@ type (
Error string `json:"error"`
}

// Pipeline defines the pipeline execution details.
Pipeline struct {
// Workflow defines the workflow execution details.
Workflow struct {
ID string `json:"id"`
Config *backend.Config `json:"config"`
Timeout int64 `json:"timeout"`
Expand All @@ -55,25 +55,25 @@ type Peer interface {
// Version returns the server- & grpc-version
Version(c context.Context) (*Version, error)

// Next returns the next pipeline in the queue.
Next(c context.Context, f Filter) (*Pipeline, error)
// Next returns the next workflow in the queue
Next(c context.Context, f Filter) (*Workflow, error)

// Wait blocks until the pipeline is complete.
// Wait blocks until the workflow is complete
Wait(c context.Context, id string) error

// Init signals the pipeline is initialized.
// Init signals the workflow is initialized
Init(c context.Context, id string, state State) error

// Done signals the pipeline is complete.
// Done signals the workflow is complete
Done(c context.Context, id string, state State) error

// Extend extends the pipeline deadline
// Extend extends the workflow deadline
Extend(c context.Context, id string) error

// Update updates the pipeline state.
// Update updates the workflow state
Update(c context.Context, id string, state State) error

// Log writes the pipeline log entry.
// Log writes the workflow log entry
Log(c context.Context, logEntry *LogEntry) error

// RegisterAgent register our agent to the server
Expand Down
4 changes: 2 additions & 2 deletions pipeline/rpc/proto/woodpecker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message Filter {
map<string, string> labels = 1;
}

message Pipeline {
6543 marked this conversation as resolved.
Show resolved Hide resolved
message Workflow {
string id = 1;
int64 timeout = 2;
bytes payload = 3;
Expand Down Expand Up @@ -126,7 +126,7 @@ message VersionResponse {
}

message NextResponse {
Pipeline pipeline = 1;
Workflow workflow = 1;
}

message RegisterAgentResponse {
Expand Down
8 changes: 4 additions & 4 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type RPC struct {
}

// Next implements the rpc.Next function
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, error) {
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
metadata, ok := grpcMetadata.FromIncomingContext(c)
if ok {
hostname, ok := metadata["hostname"]
Expand Down Expand Up @@ -82,9 +82,9 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er
}

if task.ShouldRun() {
pipeline := new(rpc.Pipeline)
err = json.Unmarshal(task.Data, pipeline)
return pipeline, err
workflow := new(rpc.Workflow)
err = json.Unmarshal(task.Data, workflow)
return workflow, err
}

if err := s.Done(c, task.ID, rpc.State{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error {
task.RunOn = item.RunsOn
task.DepStatus = make(map[string]model.StatusValue)

task.Data, _ = json.Marshal(rpc.Pipeline{
task.Data, _ = json.Marshal(rpc.Workflow{
ID: fmt.Sprint(item.Workflow.ID),
Config: item.Config,
Timeout: repo.Timeout,
Expand Down