Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support upload outputs and use needs context on Actions #24230

Merged
merged 19 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module code.gitea.io/gitea
go 1.19

require (
code.gitea.io/actions-proto-go v0.2.0
code.gitea.io/actions-proto-go v0.2.1
code.gitea.io/gitea-vet v0.2.2
code.gitea.io/sdk/gitea v0.15.1
codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
code.gitea.io/actions-proto-go v0.2.0 h1:nYh9nhhfk67YA4wVNLsCzd//RCvXnljwXClJ33+HPVk=
code.gitea.io/actions-proto-go v0.2.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
code.gitea.io/actions-proto-go v0.2.1 h1:ToMN/8thz2q10TuCq8dL2d8mI+/pWpJcHCvG+TELwa0=
code.gitea.io/actions-proto-go v0.2.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
code.gitea.io/gitea-vet v0.2.1/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
code.gitea.io/gitea-vet v0.2.2 h1:TEOV/Glf38iGmKzKP0EB++Z5OSL4zGg3RrAvlwaMuvk=
code.gitea.io/gitea-vet v0.2.2/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
Expand Down
56 changes: 56 additions & 0 deletions models/actions/task_output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
"context"
"crypto/sha1"
wolfogre marked this conversation as resolved.
Show resolved Hide resolved
"fmt"

"code.gitea.io/gitea/models/db"
)

// ActionTaskOutput represents an output of ActionTask.
// So the outputs are bound to a task, that means when a completed job has been rerun,
// the outputs of the job will be reset because the task is new.
// It's by design, to avoid the outputs of the old task to be mixed with the new task.
type ActionTaskOutput struct {
ID int64
TaskID int64 `xorm:"INDEX UNIQUE(task_id_key)"`
Key string `xorm:"VARCHAR(255)"`
KeyHash string `xorm:"CHAR(40) UNIQUE(task_id_key)"`
Value string `xorm:"TEXT"`
wolfogre marked this conversation as resolved.
Show resolved Hide resolved
}

// FindTaskOutputByTaskID returns the outputs of the task.
func FindTaskOutputByTaskID(ctx context.Context, taskID int64) ([]*ActionTaskOutput, error) {
var outputs []*ActionTaskOutput
return outputs, db.GetEngine(ctx).Where("task_id=?", taskID).Find(&outputs)
}

// FindTaskOutputKeyByTaskID returns the keys of the outputs of the task.
func FindTaskOutputKeyByTaskID(ctx context.Context, taskID int64) ([]string, error) {
var keys []string
return keys, db.GetEngine(ctx).Table(ActionTaskOutput{}).Where("task_id=?", taskID).Cols("key").Find(&keys)
}

// InsertTaskOutputIfNotExist inserts a new task output if it does not exist.
func InsertTaskOutputIfNotExist(ctx context.Context, taskID int64, key, value string) error {
keyHash := fmt.Sprintf("%x", sha1.Sum([]byte(key)))
return db.WithTx(ctx, func(ctx context.Context) error {
sess := db.GetEngine(ctx)
if exist, err := sess.Exist(&ActionTaskOutput{TaskID: taskID, KeyHash: keyHash}); err != nil {
return err
} else if exist {
return nil
}
_, err := sess.Insert(&ActionTaskOutput{
TaskID: taskID,
Key: key,
KeyHash: keyHash,
Value: value,
})
return err
})
}
2 changes: 2 additions & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ var migrations = []Migration{
NewMigration("Fix incorrect admin team unit access mode", v1_20.FixIncorrectAdminTeamUnitAccessMode),
// v253 -> v254
NewMigration("Fix ExternalTracker and ExternalWiki accessMode in owner and admin team", v1_20.FixExternalTrackerAndExternalWikiAccessModeInOwnerAndAdminTeam),
// v254 -> v255
NewMigration("Add ActionTaskOutput table", v1_20.AddActionTaskOutputTable),
}

// GetCurrentDBVersion returns the current db version
Expand Down
19 changes: 19 additions & 0 deletions models/migrations/v1_20/v254.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_20 //nolint

import (
"xorm.io/xorm"
)

func AddActionTaskOutputTable(x *xorm.Engine) error {
type ActionTaskOutput struct {
ID int64
TaskID int64 `xorm:"INDEX UNIQUE(task_id_key)"`
Key string `xorm:"VARCHAR(255)"`
KeyHash string `xorm:"CHAR(40) UNIQUE(task_id_key)"`
Value string `xorm:"TEXT"`
}
return x.Sync(new(ActionTaskOutput))
}
19 changes: 18 additions & 1 deletion routers/api/actions/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *Service) Register(
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
req *connect.Request[runnerv1.FetchTaskRequest],
_ *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)

Expand Down Expand Up @@ -145,6 +145,22 @@ func (s *Service) UpdateTask(
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}

for k, v := range req.Msg.Outputs {
if len(k) > 255 {
log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
continue
}
if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
wolfogre marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
// It's ok not to return errors, the runner will resend the outputs.
}
}
sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
if err != nil {
log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
// It's not to return errors, it can be handled when the runner resends sent outputs.
}

if err := task.LoadJob(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "load job: %v", err)
}
Expand All @@ -162,6 +178,7 @@ func (s *Service) UpdateTask(
Id: req.Msg.State.Id,
Result: task.Status.AsResult(),
},
SentOutputs: sentOutputs,
}), nil
}

Expand Down
54 changes: 54 additions & 0 deletions routers/api/actions/runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ func pickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
Context: generateTaskContext(t),
Secrets: getSecretsOfTask(ctx, t),
}

if needs, err := findTaskNeeds(ctx, t); err != nil {
log.Error("Cannot find needs for task %v: %v", t.ID, err)
// Go on with empty needs.
// If return error, the task will be wild, which means the runner will never get it when it has been assigned to the runner.
// In contrast, missing needs is less serious.
// And the task will fail and the runner will report the error in the logs.
} else {
task.Needs = needs
}

return task, true, nil
}

Expand Down Expand Up @@ -124,3 +135,46 @@ func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {

return taskContext
}

func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*runnerv1.TaskNeed, error) {
if err := task.LoadAttributes(ctx); err != nil {
return nil, fmt.Errorf("LoadAttributes: %w", err)
}
if len(task.Job.Needs) == 0 {
return nil, nil
}
needs := map[string]struct{}{}
for _, v := range task.Job.Needs {
needs[v] = struct{}{}
}

jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
if err != nil {
return nil, fmt.Errorf("FindRunJobs: %w", err)
}

ret := make(map[string]*runnerv1.TaskNeed, len(needs))
for _, job := range jobs {
if _, ok := needs[job.JobID]; !ok {
continue
}
if job.TaskID == 0 || !job.Status.IsDone() {
// it shouldn't happen, or the job has been rerun
continue
}
outputs := make(map[string]string)
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
if err != nil {
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
}
for _, v := range got {
outputs[v.Key] = v.Value
}
ret[job.JobID] = &runnerv1.TaskNeed{
Outputs: outputs,
Result: runnerv1.Result(job.Status),
}
}

return ret, nil
}