Skip to content

Commit 0d55f64

Browse files
lunnywolfogretechknowlogickwxiaoguang
authoredAug 24, 2023
chore(actions): support cron schedule task (#26655)
Replace #22751 1. only support the default branch in the repository setting. 2. autoload schedule data from the schedule table after starting the service. 3. support specific syntax like `@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly` ## How to use See the [GitHub Actions document](https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule) for getting more detailed information. ```yaml on: schedule: - cron: '30 5 * * 1,3' - cron: '30 5 * * 2,4' jobs: test_schedule: runs-on: ubuntu-latest steps: - name: Not on Monday or Wednesday if: github.event.schedule != '30 5 * * 1,3' run: echo "This step will be skipped on Monday and Wednesday" - name: Every time run: echo "This step will always run" ``` Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com> --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
1 parent b62c8e7 commit 0d55f64

File tree

13 files changed

+693
-9
lines changed

13 files changed

+693
-9
lines changed
 

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ require (
9090
github.com/prometheus/client_golang v1.16.0
9191
github.com/quasoft/websspi v1.1.2
9292
github.com/redis/go-redis/v9 v9.0.5
93+
github.com/robfig/cron/v3 v3.0.1
9394
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
9495
github.com/sassoftware/go-rpmutils v0.2.0
9596
github.com/sergi/go-diff v1.3.1
@@ -254,7 +255,6 @@ require (
254255
github.com/rhysd/actionlint v1.6.25 // indirect
255256
github.com/rivo/uniseg v0.4.4 // indirect
256257
github.com/robfig/cron v1.2.0 // indirect
257-
github.com/robfig/cron/v3 v3.0.1 // indirect
258258
github.com/rogpeppe/go-internal v1.11.0 // indirect
259259
github.com/rs/xid v1.5.0 // indirect
260260
github.com/russross/blackfriday/v2 v2.1.0 // indirect

‎models/actions/schedule.go

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
"time"
9+
10+
"code.gitea.io/gitea/models/db"
11+
repo_model "code.gitea.io/gitea/models/repo"
12+
user_model "code.gitea.io/gitea/models/user"
13+
"code.gitea.io/gitea/modules/timeutil"
14+
webhook_module "code.gitea.io/gitea/modules/webhook"
15+
16+
"github.com/robfig/cron/v3"
17+
)
18+
19+
// ActionSchedule represents a schedule of a workflow file
20+
type ActionSchedule struct {
21+
ID int64
22+
Title string
23+
Specs []string
24+
RepoID int64 `xorm:"index"`
25+
Repo *repo_model.Repository `xorm:"-"`
26+
OwnerID int64 `xorm:"index"`
27+
WorkflowID string
28+
TriggerUserID int64
29+
TriggerUser *user_model.User `xorm:"-"`
30+
Ref string
31+
CommitSHA string
32+
Event webhook_module.HookEventType
33+
EventPayload string `xorm:"LONGTEXT"`
34+
Content []byte
35+
Created timeutil.TimeStamp `xorm:"created"`
36+
Updated timeutil.TimeStamp `xorm:"updated"`
37+
}
38+
39+
func init() {
40+
db.RegisterModel(new(ActionSchedule))
41+
}
42+
43+
// GetSchedulesMapByIDs returns the schedules by given id slice.
44+
func GetSchedulesMapByIDs(ids []int64) (map[int64]*ActionSchedule, error) {
45+
schedules := make(map[int64]*ActionSchedule, len(ids))
46+
return schedules, db.GetEngine(db.DefaultContext).In("id", ids).Find(&schedules)
47+
}
48+
49+
// GetReposMapByIDs returns the repos by given id slice.
50+
func GetReposMapByIDs(ids []int64) (map[int64]*repo_model.Repository, error) {
51+
repos := make(map[int64]*repo_model.Repository, len(ids))
52+
return repos, db.GetEngine(db.DefaultContext).In("id", ids).Find(&repos)
53+
}
54+
55+
var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
56+
57+
// CreateScheduleTask creates new schedule task.
58+
func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error {
59+
// Return early if there are no rows to insert
60+
if len(rows) == 0 {
61+
return nil
62+
}
63+
64+
// Begin transaction
65+
ctx, committer, err := db.TxContext(ctx)
66+
if err != nil {
67+
return err
68+
}
69+
defer committer.Close()
70+
71+
// Loop through each schedule row
72+
for _, row := range rows {
73+
// Create new schedule row
74+
if err = db.Insert(ctx, row); err != nil {
75+
return err
76+
}
77+
78+
// Loop through each schedule spec and create a new spec row
79+
now := time.Now()
80+
81+
for _, spec := range row.Specs {
82+
// Parse the spec and check for errors
83+
schedule, err := cronParser.Parse(spec)
84+
if err != nil {
85+
continue // skip to the next spec if there's an error
86+
}
87+
88+
// Insert the new schedule spec row
89+
if err = db.Insert(ctx, &ActionScheduleSpec{
90+
RepoID: row.RepoID,
91+
ScheduleID: row.ID,
92+
Spec: spec,
93+
Next: timeutil.TimeStamp(schedule.Next(now).Unix()),
94+
}); err != nil {
95+
return err
96+
}
97+
}
98+
}
99+
100+
// Commit transaction
101+
return committer.Commit()
102+
}
103+
104+
func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
105+
ctx, committer, err := db.TxContext(ctx)
106+
if err != nil {
107+
return err
108+
}
109+
defer committer.Close()
110+
111+
if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
112+
return err
113+
}
114+
115+
if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
116+
return err
117+
}
118+
119+
return committer.Commit()
120+
}

‎models/actions/schedule_list.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
9+
"code.gitea.io/gitea/models/db"
10+
repo_model "code.gitea.io/gitea/models/repo"
11+
user_model "code.gitea.io/gitea/models/user"
12+
"code.gitea.io/gitea/modules/container"
13+
14+
"xorm.io/builder"
15+
)
16+
17+
type ScheduleList []*ActionSchedule
18+
19+
// GetUserIDs returns a slice of user's id
20+
func (schedules ScheduleList) GetUserIDs() []int64 {
21+
ids := make(container.Set[int64], len(schedules))
22+
for _, schedule := range schedules {
23+
ids.Add(schedule.TriggerUserID)
24+
}
25+
return ids.Values()
26+
}
27+
28+
func (schedules ScheduleList) GetRepoIDs() []int64 {
29+
ids := make(container.Set[int64], len(schedules))
30+
for _, schedule := range schedules {
31+
ids.Add(schedule.RepoID)
32+
}
33+
return ids.Values()
34+
}
35+
36+
func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error {
37+
userIDs := schedules.GetUserIDs()
38+
users := make(map[int64]*user_model.User, len(userIDs))
39+
if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
40+
return err
41+
}
42+
for _, schedule := range schedules {
43+
if schedule.TriggerUserID == user_model.ActionsUserID {
44+
schedule.TriggerUser = user_model.NewActionsUser()
45+
} else {
46+
schedule.TriggerUser = users[schedule.TriggerUserID]
47+
}
48+
}
49+
return nil
50+
}
51+
52+
func (schedules ScheduleList) LoadRepos() error {
53+
repoIDs := schedules.GetRepoIDs()
54+
repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
55+
if err != nil {
56+
return err
57+
}
58+
for _, schedule := range schedules {
59+
schedule.Repo = repos[schedule.RepoID]
60+
}
61+
return nil
62+
}
63+
64+
type FindScheduleOptions struct {
65+
db.ListOptions
66+
RepoID int64
67+
OwnerID int64
68+
}
69+
70+
func (opts FindScheduleOptions) toConds() builder.Cond {
71+
cond := builder.NewCond()
72+
if opts.RepoID > 0 {
73+
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
74+
}
75+
if opts.OwnerID > 0 {
76+
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
77+
}
78+
79+
return cond
80+
}
81+
82+
func FindSchedules(ctx context.Context, opts FindScheduleOptions) (ScheduleList, int64, error) {
83+
e := db.GetEngine(ctx).Where(opts.toConds())
84+
if !opts.ListAll && opts.PageSize > 0 && opts.Page >= 1 {
85+
e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
86+
}
87+
var schedules ScheduleList
88+
total, err := e.Desc("id").FindAndCount(&schedules)
89+
return schedules, total, err
90+
}
91+
92+
func CountSchedules(ctx context.Context, opts FindScheduleOptions) (int64, error) {
93+
return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionSchedule))
94+
}

‎models/actions/schedule_spec.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
9+
"code.gitea.io/gitea/models/db"
10+
repo_model "code.gitea.io/gitea/models/repo"
11+
"code.gitea.io/gitea/modules/timeutil"
12+
13+
"github.com/robfig/cron/v3"
14+
)
15+
16+
// ActionScheduleSpec represents a schedule spec of a workflow file
17+
type ActionScheduleSpec struct {
18+
ID int64
19+
RepoID int64 `xorm:"index"`
20+
Repo *repo_model.Repository `xorm:"-"`
21+
ScheduleID int64 `xorm:"index"`
22+
Schedule *ActionSchedule `xorm:"-"`
23+
24+
// Next time the job will run, or the zero time if Cron has not been
25+
// started or this entry's schedule is unsatisfiable
26+
Next timeutil.TimeStamp `xorm:"index"`
27+
// Prev is the last time this job was run, or the zero time if never.
28+
Prev timeutil.TimeStamp
29+
Spec string
30+
31+
Created timeutil.TimeStamp `xorm:"created"`
32+
Updated timeutil.TimeStamp `xorm:"updated"`
33+
}
34+
35+
func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) {
36+
return cronParser.Parse(s.Spec)
37+
}
38+
39+
func init() {
40+
db.RegisterModel(new(ActionScheduleSpec))
41+
}
42+
43+
func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error {
44+
sess := db.GetEngine(ctx).ID(spec.ID)
45+
if len(cols) > 0 {
46+
sess.Cols(cols...)
47+
}
48+
_, err := sess.Update(spec)
49+
return err
50+
}

‎models/actions/schedule_spec_list.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
9+
"code.gitea.io/gitea/models/db"
10+
repo_model "code.gitea.io/gitea/models/repo"
11+
"code.gitea.io/gitea/modules/container"
12+
13+
"xorm.io/builder"
14+
)
15+
16+
type SpecList []*ActionScheduleSpec
17+
18+
func (specs SpecList) GetScheduleIDs() []int64 {
19+
ids := make(container.Set[int64], len(specs))
20+
for _, spec := range specs {
21+
ids.Add(spec.ScheduleID)
22+
}
23+
return ids.Values()
24+
}
25+
26+
func (specs SpecList) LoadSchedules() error {
27+
scheduleIDs := specs.GetScheduleIDs()
28+
schedules, err := GetSchedulesMapByIDs(scheduleIDs)
29+
if err != nil {
30+
return err
31+
}
32+
for _, spec := range specs {
33+
spec.Schedule = schedules[spec.ScheduleID]
34+
}
35+
36+
repoIDs := specs.GetRepoIDs()
37+
repos, err := GetReposMapByIDs(repoIDs)
38+
if err != nil {
39+
return err
40+
}
41+
for _, spec := range specs {
42+
spec.Repo = repos[spec.RepoID]
43+
}
44+
45+
return nil
46+
}
47+
48+
func (specs SpecList) GetRepoIDs() []int64 {
49+
ids := make(container.Set[int64], len(specs))
50+
for _, spec := range specs {
51+
ids.Add(spec.RepoID)
52+
}
53+
return ids.Values()
54+
}
55+
56+
func (specs SpecList) LoadRepos() error {
57+
repoIDs := specs.GetRepoIDs()
58+
repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
59+
if err != nil {
60+
return err
61+
}
62+
for _, spec := range specs {
63+
spec.Repo = repos[spec.RepoID]
64+
}
65+
return nil
66+
}
67+
68+
type FindSpecOptions struct {
69+
db.ListOptions
70+
RepoID int64
71+
Next int64
72+
}
73+
74+
func (opts FindSpecOptions) toConds() builder.Cond {
75+
cond := builder.NewCond()
76+
if opts.RepoID > 0 {
77+
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
78+
}
79+
80+
if opts.Next > 0 {
81+
cond = cond.And(builder.Lte{"next": opts.Next})
82+
}
83+
84+
return cond
85+
}
86+
87+
func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) {
88+
e := db.GetEngine(ctx).Where(opts.toConds())
89+
if opts.PageSize > 0 && opts.Page >= 1 {
90+
e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
91+
}
92+
var specs SpecList
93+
total, err := e.Desc("id").FindAndCount(&specs)
94+
if err != nil {
95+
return nil, 0, err
96+
}
97+
98+
if err := specs.LoadSchedules(); err != nil {
99+
return nil, 0, err
100+
}
101+
return specs, total, nil
102+
}
103+
104+
func CountSpecs(ctx context.Context, opts FindSpecOptions) (int64, error) {
105+
return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionScheduleSpec))
106+
}

‎models/migrations/migrations.go

+2
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,8 @@ var migrations = []Migration{
526526
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
527527
// v272 -> v273
528528
NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable),
529+
// v273 -> v274
530+
NewMigration("Add Action Schedule Table", v1_21.AddActionScheduleTable),
529531
}
530532

531533
// GetCurrentDBVersion returns the current db version

‎models/migrations/v1_21/v273.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package v1_21 //nolint
5+
import (
6+
"code.gitea.io/gitea/modules/timeutil"
7+
8+
"xorm.io/xorm"
9+
)
10+
11+
func AddActionScheduleTable(x *xorm.Engine) error {
12+
type ActionSchedule struct {
13+
ID int64
14+
Title string
15+
Specs []string
16+
RepoID int64 `xorm:"index"`
17+
OwnerID int64 `xorm:"index"`
18+
WorkflowID string
19+
TriggerUserID int64
20+
Ref string
21+
CommitSHA string
22+
Event string
23+
EventPayload string `xorm:"LONGTEXT"`
24+
Content []byte
25+
Created timeutil.TimeStamp `xorm:"created"`
26+
Updated timeutil.TimeStamp `xorm:"updated"`
27+
}
28+
29+
type ActionScheduleSpec struct {
30+
ID int64
31+
RepoID int64 `xorm:"index"`
32+
ScheduleID int64 `xorm:"index"`
33+
Spec string
34+
Next timeutil.TimeStamp `xorm:"index"`
35+
Prev timeutil.TimeStamp
36+
37+
Created timeutil.TimeStamp `xorm:"created"`
38+
Updated timeutil.TimeStamp `xorm:"updated"`
39+
}
40+
41+
return x.Sync(
42+
new(ActionSchedule),
43+
new(ActionScheduleSpec),
44+
)
45+
}

‎models/repo.go

+2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
170170
&actions_model.ActionRunJob{RepoID: repoID},
171171
&actions_model.ActionRun{RepoID: repoID},
172172
&actions_model.ActionRunner{RepoID: repoID},
173+
&actions_model.ActionScheduleSpec{RepoID: repoID},
174+
&actions_model.ActionSchedule{RepoID: repoID},
173175
&actions_model.ActionArtifact{RepoID: repoID},
174176
); err != nil {
175177
return fmt.Errorf("deleteBeans: %w", err)

‎modules/actions/workflows.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -95,25 +95,40 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) {
9595
return events, nil
9696
}
9797

98-
func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader) ([]*DetectedWorkflow, error) {
98+
func DetectWorkflows(
99+
gitRepo *git.Repository,
100+
commit *git.Commit,
101+
triggedEvent webhook_module.HookEventType,
102+
payload api.Payloader,
103+
) ([]*DetectedWorkflow, []*DetectedWorkflow, error) {
99104
entries, err := ListWorkflows(commit)
100105
if err != nil {
101-
return nil, err
106+
return nil, nil, err
102107
}
103108

104109
workflows := make([]*DetectedWorkflow, 0, len(entries))
110+
schedules := make([]*DetectedWorkflow, 0, len(entries))
105111
for _, entry := range entries {
106112
content, err := GetContentFromEntry(entry)
107113
if err != nil {
108-
return nil, err
114+
return nil, nil, err
109115
}
116+
110117
events, err := GetEventsFromContent(content)
111118
if err != nil {
112119
log.Warn("ignore invalid workflow %q: %v", entry.Name(), err)
113120
continue
114121
}
115122
for _, evt := range events {
116123
log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent)
124+
if evt.IsSchedule() {
125+
dwf := &DetectedWorkflow{
126+
EntryName: entry.Name(),
127+
TriggerEvent: evt.Name,
128+
Content: content,
129+
}
130+
schedules = append(schedules, dwf)
131+
}
117132
if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
118133
dwf := &DetectedWorkflow{
119134
EntryName: entry.Name(),
@@ -125,7 +140,7 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w
125140
}
126141
}
127142

128-
return workflows, nil
143+
return workflows, schedules, nil
129144
}
130145

131146
func detectMatched(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader, evt *jobparser.Event) bool {

‎options/locale/locale_en-US.ini

+1
Original file line numberDiff line numberDiff line change
@@ -2756,6 +2756,7 @@ dashboard.gc_lfs = Garbage collect LFS meta objects
27562756
dashboard.stop_zombie_tasks = Stop zombie tasks
27572757
dashboard.stop_endless_tasks = Stop endless tasks
27582758
dashboard.cancel_abandoned_jobs = Cancel abandoned jobs
2759+
dashboard.start_schedule_tasks = Start schedule tasks
27592760
dashboard.sync_branch.started = Branches Sync started
27602761
dashboard.rebuild_issue_indexer = Rebuild issue indexer
27612762

‎services/actions/notifier_helper.go

+104-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package actions
55

66
import (
7+
"bytes"
78
"context"
89
"fmt"
910
"strings"
@@ -24,6 +25,7 @@ import (
2425
"code.gitea.io/gitea/services/convert"
2526

2627
"github.com/nektos/act/pkg/jobparser"
28+
"github.com/nektos/act/pkg/model"
2729
)
2830

2931
var methodCtxKey struct{}
@@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error {
143145
}
144146

145147
var detectedWorkflows []*actions_module.DetectedWorkflow
146-
workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
148+
actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
149+
workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
147150
if err != nil {
148151
return fmt.Errorf("DetectWorkflows: %w", err)
149152
}
153+
150154
if len(workflows) == 0 {
151155
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
152156
} else {
153-
actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
154-
155157
for _, wf := range workflows {
156158
if actionsConfig.IsWorkflowDisabled(wf.EntryName) {
157159
log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName)
@@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error {
171173
if err != nil {
172174
return fmt.Errorf("gitRepo.GetCommit: %w", err)
173175
}
174-
baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
176+
baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
175177
if err != nil {
176178
return fmt.Errorf("DetectWorkflows: %w", err)
177179
}
@@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error {
186188
}
187189
}
188190

191+
if err := handleSchedules(ctx, schedules, commit, input); err != nil {
192+
return err
193+
}
194+
195+
return handleWorkflows(ctx, detectedWorkflows, commit, input, ref)
196+
}
197+
198+
func handleWorkflows(
199+
ctx context.Context,
200+
detectedWorkflows []*actions_module.DetectedWorkflow,
201+
commit *git.Commit,
202+
input *notifyInput,
203+
ref string,
204+
) error {
189205
if len(detectedWorkflows) == 0 {
206+
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
190207
return nil
191208
}
192209

@@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep
350367
log.Trace("need approval because it's the first time user %d triggered actions", user.ID)
351368
return true, nil
352369
}
370+
371+
func handleSchedules(
372+
ctx context.Context,
373+
detectedWorkflows []*actions_module.DetectedWorkflow,
374+
commit *git.Commit,
375+
input *notifyInput,
376+
) error {
377+
if len(detectedWorkflows) == 0 {
378+
log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID)
379+
return nil
380+
}
381+
382+
branch, err := commit.GetBranchName()
383+
if err != nil {
384+
return err
385+
}
386+
if branch != input.Repo.DefaultBranch {
387+
log.Trace("commit branch is not default branch in repo")
388+
return nil
389+
}
390+
391+
rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID})
392+
if err != nil {
393+
log.Error("FindCrons: %v", err)
394+
return err
395+
}
396+
397+
if len(rows) > 0 {
398+
if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil {
399+
log.Error("DeleteCronTaskByRepo: %v", err)
400+
}
401+
}
402+
403+
p, err := json.Marshal(input.Payload)
404+
if err != nil {
405+
return fmt.Errorf("json.Marshal: %w", err)
406+
}
407+
408+
crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows))
409+
for _, dwf := range detectedWorkflows {
410+
// Check cron job condition. Only working in default branch
411+
workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content))
412+
if err != nil {
413+
log.Error("ReadWorkflow: %v", err)
414+
continue
415+
}
416+
schedules := workflow.OnSchedule()
417+
if len(schedules) == 0 {
418+
log.Warn("no schedule event")
419+
continue
420+
}
421+
422+
run := &actions_model.ActionSchedule{
423+
Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0],
424+
RepoID: input.Repo.ID,
425+
OwnerID: input.Repo.OwnerID,
426+
WorkflowID: dwf.EntryName,
427+
TriggerUserID: input.Doer.ID,
428+
Ref: input.Ref,
429+
CommitSHA: commit.ID.String(),
430+
Event: input.Event,
431+
EventPayload: string(p),
432+
Specs: schedules,
433+
Content: dwf.Content,
434+
}
435+
436+
// cancel running jobs if the event is push
437+
if run.Event == webhook_module.HookEventPush {
438+
// cancel running jobs of the same workflow
439+
if err := actions_model.CancelRunningJobs(
440+
ctx,
441+
run.RepoID,
442+
run.Ref,
443+
run.WorkflowID,
444+
); err != nil {
445+
log.Error("CancelRunningJobs: %v", err)
446+
}
447+
}
448+
crons = append(crons, run)
449+
}
450+
451+
return actions_model.CreateScheduleTask(ctx, crons)
452+
}

‎services/actions/schedule_tasks.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"time"
10+
11+
actions_model "code.gitea.io/gitea/models/actions"
12+
"code.gitea.io/gitea/models/db"
13+
"code.gitea.io/gitea/modules/log"
14+
"code.gitea.io/gitea/modules/timeutil"
15+
webhook_module "code.gitea.io/gitea/modules/webhook"
16+
17+
"github.com/nektos/act/pkg/jobparser"
18+
)
19+
20+
// StartScheduleTasks start the task
21+
func StartScheduleTasks(ctx context.Context) error {
22+
return startTasks(ctx)
23+
}
24+
25+
// startTasks retrieves specifications in pages, creates a schedule task for each specification,
26+
// and updates the specification's next run time and previous run time.
27+
// The function returns an error if there's an issue with finding or updating the specifications.
28+
func startTasks(ctx context.Context) error {
29+
// Set the page size
30+
pageSize := 50
31+
32+
// Retrieve specs in pages until all specs have been retrieved
33+
now := time.Now()
34+
for page := 1; ; page++ {
35+
// Retrieve the specs for the current page
36+
specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
37+
ListOptions: db.ListOptions{
38+
Page: page,
39+
PageSize: pageSize,
40+
},
41+
Next: now.Unix(),
42+
})
43+
if err != nil {
44+
return fmt.Errorf("find specs: %w", err)
45+
}
46+
47+
// Loop through each spec and create a schedule task for it
48+
for _, row := range specs {
49+
// cancel running jobs if the event is push
50+
if row.Schedule.Event == webhook_module.HookEventPush {
51+
// cancel running jobs of the same workflow
52+
if err := actions_model.CancelRunningJobs(
53+
ctx,
54+
row.RepoID,
55+
row.Schedule.Ref,
56+
row.Schedule.WorkflowID,
57+
); err != nil {
58+
log.Error("CancelRunningJobs: %v", err)
59+
}
60+
}
61+
62+
if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
63+
log.Error("CreateScheduleTask: %v", err)
64+
return err
65+
}
66+
67+
// Parse the spec
68+
schedule, err := row.Parse()
69+
if err != nil {
70+
log.Error("Parse: %v", err)
71+
return err
72+
}
73+
74+
// Update the spec's next run time and previous run time
75+
row.Prev = row.Next
76+
row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
77+
if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
78+
log.Error("UpdateScheduleSpec: %v", err)
79+
return err
80+
}
81+
}
82+
83+
// Stop if all specs have been retrieved
84+
if len(specs) < pageSize {
85+
break
86+
}
87+
}
88+
89+
return nil
90+
}
91+
92+
// CreateScheduleTask creates a scheduled task from a cron action schedule.
93+
// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
94+
func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
95+
// Create a new action run based on the schedule
96+
run := &actions_model.ActionRun{
97+
Title: cron.Title,
98+
RepoID: cron.RepoID,
99+
OwnerID: cron.OwnerID,
100+
WorkflowID: cron.WorkflowID,
101+
TriggerUserID: cron.TriggerUserID,
102+
Ref: cron.Ref,
103+
CommitSHA: cron.CommitSHA,
104+
Event: cron.Event,
105+
EventPayload: cron.EventPayload,
106+
Status: actions_model.StatusWaiting,
107+
}
108+
109+
// Parse the workflow specification from the cron schedule
110+
workflows, err := jobparser.Parse(cron.Content)
111+
if err != nil {
112+
return err
113+
}
114+
115+
// Insert the action run and its associated jobs into the database
116+
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
117+
return err
118+
}
119+
120+
// Retrieve the jobs for the newly created action run
121+
jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
122+
if err != nil {
123+
return err
124+
}
125+
126+
// Create commit statuses for each job
127+
for _, job := range jobs {
128+
if err := createCommitStatus(ctx, job); err != nil {
129+
return err
130+
}
131+
}
132+
133+
// Return nil if no errors occurred
134+
return nil
135+
}

‎services/cron/tasks_actions.go

+14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func initActionsTasks() {
1818
registerStopZombieTasks()
1919
registerStopEndlessTasks()
2020
registerCancelAbandonedJobs()
21+
registerScheduleTasks()
2122
}
2223

2324
func registerStopZombieTasks() {
@@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() {
4950
return actions_service.CancelAbandonedJobs(ctx)
5051
})
5152
}
53+
54+
// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks.
55+
func registerScheduleTasks() {
56+
// Register the task with a unique name, enabled status, and schedule for every minute.
57+
RegisterTaskFatal("start_schedule_tasks", &BaseConfig{
58+
Enabled: true,
59+
RunAtStart: false,
60+
Schedule: "@every 1m",
61+
}, func(ctx context.Context, _ *user_model.User, cfg Config) error {
62+
// Call the function to start schedule tasks and pass the context.
63+
return actions_service.StartScheduleTasks(ctx)
64+
})
65+
}

0 commit comments

Comments
 (0)
Please sign in to comment.