diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 624896bbaacd2..b9de6b69970c2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -141,8 +141,8 @@ For imports you should use the following format (_without_ the comments) ```go import ( // stdlib - "encoding/json" "fmt" + "math" // local packages "code.gitea.io/gitea/models" diff --git a/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md b/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md index 02e0e6c914ab9..7db7fe705afef 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md +++ b/docs/content/doc/advanced/config-cheat-sheet.zh-cn.md @@ -80,7 +80,7 @@ menu: - `LFS_START_SERVER`: 是否启用 git-lfs 支持. 可以为 `true` 或 `false`, 默认是 `false`。 - `LFS_JWT_SECRET`: LFS 认证密钥,改成自己的。 -- `LFS_CONTENT_PATH`: **已废弃**, 存放 lfs 命令上传的文件的地方,默认是 `data/lfs`。 +- `LFS_CONTENT_PATH`: **已废弃**, 存放 lfs 命令上传的文件的地方,默认是 `data/lfs`。**废弃** 请使用 `[lfs]` 的设置。 ## Database (`database`) diff --git a/docs/content/doc/features/webhooks.en-us.md b/docs/content/doc/features/webhooks.en-us.md index 5ded4512c3114..2dba7b7f83c74 100644 --- a/docs/content/doc/features/webhooks.en-us.md +++ b/docs/content/doc/features/webhooks.en-us.md @@ -28,6 +28,7 @@ All event pushes are POST requests. The methods currently supported are: - Microsoft Teams - Feishu - Wechatwork +- Packagist ### Event information diff --git a/docs/content/doc/features/webhooks.zh-cn.md b/docs/content/doc/features/webhooks.zh-cn.md index f3a312eee23ca..76139460c0a22 100644 --- a/docs/content/doc/features/webhooks.zh-cn.md +++ b/docs/content/doc/features/webhooks.zh-cn.md @@ -27,5 +27,6 @@ Gitea 的存储 webhook。这可以有存储库管路设定页 `/:username/:repo - Microsoft Teams - Feishu - Wechatwork +- Packagist ## TBD diff --git a/docs/content/doc/features/webhooks.zh-tw.md b/docs/content/doc/features/webhooks.zh-tw.md index 697b4139160ba..20fec3d62d876 100644 --- a/docs/content/doc/features/webhooks.zh-tw.md +++ b/docs/content/doc/features/webhooks.zh-tw.md @@ -27,6 +27,7 @@ Gitea 的儲存庫事件支援 web hook。這可以有儲存庫管理員在設 - Microsoft Teams - Feishu - Wechatwork +- Packagist ### 事件資訊 diff --git a/go.mod b/go.mod index abb6a93fb7e36..7266acb751487 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/denisenkom/go-mssqldb v0.10.0 github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 - github.com/duo-labs/webauthn v0.0.0-20211221191814-a22482edaa3b + github.com/duo-labs/webauthn v0.0.0-20220122034320-81aea484c951 github.com/dustin/go-humanize v1.0.0 github.com/editorconfig/editorconfig-core-go/v2 v2.4.2 github.com/emirpasic/gods v1.12.0 @@ -54,7 +54,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang/snappy v0.0.4 // indirect github.com/google/go-github/v39 v39.2.0 - github.com/google/uuid v1.2.0 + github.com/google/uuid v1.3.0 github.com/gorilla/feeds v1.1.1 github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/sessions v1.2.1 @@ -145,8 +145,6 @@ replace github.com/markbates/goth v1.68.0 => github.com/zeripath/goth v1.68.1-0. replace github.com/shurcooL/vfsgen => github.com/lunny/vfsgen v0.0.0-20220105142115-2c99e1ffdfa0 -replace github.com/duo-labs/webauthn => github.com/authelia/webauthn v0.0.0-20211225121951-80d1f2a572e4 - replace github.com/satori/go.uuid v1.2.0 => github.com/gofrs/uuid v4.2.0+incompatible exclude github.com/gofrs/uuid v3.2.0+incompatible diff --git a/go.sum b/go.sum index bccfe158eef1d..898a73dd88fbb 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535/go.mod h1:o github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= -github.com/authelia/webauthn v0.0.0-20211225121951-80d1f2a572e4 h1:u3eFvgr4A8IjlAokbFt6XY6VdurX7DEYnQMQ4K2yobc= -github.com/authelia/webauthn v0.0.0-20211225121951-80d1f2a572e4/go.mod h1:EYSpSkwoEcryMmQGfhol2IiB3IMN9IIIaNd/wcAQMGQ= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= @@ -276,6 +274,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q= github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= +github.com/duo-labs/webauthn v0.0.0-20220122034320-81aea484c951 h1:17esZ09oW+29rklBtCVphIguql2u3NxYH2OasFPPZoo= +github.com/duo-labs/webauthn v0.0.0-20220122034320-81aea484c951/go.mod h1:nHy3JdztZWcsjenDeBuE8gn171OAwg12LBN027UP5AE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -491,7 +491,6 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-json v0.7.4 h1:B44qRUFwz/vxPKPISQ1KhvzRi9kZ28RAf6YtjriBZ5k= github.com/goccy/go-json v0.7.4/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -585,8 +584,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= diff --git a/models/webhook/webhook.go b/models/webhook/webhook.go index 21c01d928999e..ffc9b72b64d88 100644 --- a/models/webhook/webhook.go +++ b/models/webhook/webhook.go @@ -161,6 +161,7 @@ const ( FEISHU HookType = "feishu" MATRIX HookType = "matrix" WECHATWORK HookType = "wechatwork" + PACKAGIST HookType = "packagist" ) // HookStatus is the status of a web hook diff --git a/modules/context/context.go b/modules/context/context.go index 998eafe965fcd..0cbdfa023c6b6 100644 --- a/modules/context/context.go +++ b/modules/context/context.go @@ -292,6 +292,7 @@ func (ctx *Context) PlainTextBytes(status int, bs []byte) { } ctx.Resp.WriteHeader(status) ctx.Resp.Header().Set("Content-Type", "text/plain;charset=utf-8") + ctx.Resp.Header().Set("X-Content-Type-Options", "nosniff") if _, err := ctx.Resp.Write(bs); err != nil { log.Error("Write bytes failed: %v", err) } diff --git a/modules/git/diff.go b/modules/git/diff.go index 02ed2c60c42da..38aefabf1a32b 100644 --- a/modules/git/diff.go +++ b/modules/git/diff.go @@ -11,13 +11,11 @@ import ( "fmt" "io" "os" - "os/exec" "regexp" "strconv" "strings" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/process" ) // RawDiffType type of a raw diff. @@ -55,43 +53,41 @@ func GetRepoRawDiffForFile(repo *Repository, startCommit, endCommit string, diff if len(file) > 0 { fileArgs = append(fileArgs, "--", file) } - // FIXME: graceful: These commands should have a timeout - ctx, _, finished := process.GetManager().AddContext(repo.Ctx, fmt.Sprintf("GetRawDiffForFile: [repo_path: %s]", repo.Path)) - defer finished() - var cmd *exec.Cmd + var args []string switch diffType { case RawDiffNormal: if len(startCommit) != 0 { - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"diff", "-M", startCommit, endCommit}, fileArgs...)...) + args = append([]string{"diff", "-M", startCommit, endCommit}, fileArgs...) } else if commit.ParentCount() == 0 { - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"show", endCommit}, fileArgs...)...) + args = append([]string{"show", endCommit}, fileArgs...) } else { c, _ := commit.Parent(0) - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"diff", "-M", c.ID.String(), endCommit}, fileArgs...)...) + args = append([]string{"diff", "-M", c.ID.String(), endCommit}, fileArgs...) } case RawDiffPatch: if len(startCommit) != 0 { query := fmt.Sprintf("%s...%s", endCommit, startCommit) - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"format-patch", "--no-signature", "--stdout", "--root", query}, fileArgs...)...) + args = append([]string{"format-patch", "--no-signature", "--stdout", "--root", query}, fileArgs...) } else if commit.ParentCount() == 0 { - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"format-patch", "--no-signature", "--stdout", "--root", endCommit}, fileArgs...)...) + args = append([]string{"format-patch", "--no-signature", "--stdout", "--root", endCommit}, fileArgs...) } else { c, _ := commit.Parent(0) query := fmt.Sprintf("%s...%s", endCommit, c.ID.String()) - cmd = exec.CommandContext(ctx, GitExecutable, append([]string{"format-patch", "--no-signature", "--stdout", query}, fileArgs...)...) + args = append([]string{"format-patch", "--no-signature", "--stdout", query}, fileArgs...) } default: return fmt.Errorf("invalid diffType: %s", diffType) } stderr := new(bytes.Buffer) - - cmd.Dir = repo.Path - cmd.Stdout = writer - cmd.Stderr = stderr - - if err = cmd.Run(); err != nil { + cmd := NewCommandContextNoGlobals(repo.Ctx, args...) + if err = cmd.RunWithContext(&RunContext{ + Timeout: -1, + Dir: repo.Path, + Stdout: writer, + Stderr: stderr, + }); err != nil { return fmt.Errorf("Run: %v - %s", err, stderr) } return nil diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 4c7a1d4f177ca..9ae3abff60bab 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -133,11 +133,11 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { idx, err := indexer.get() if idx == nil || err != nil { log.Error("Codes indexer handler: unable to get indexer!") - return + return data } for _, datum := range data { @@ -153,6 +153,7 @@ func Init() { continue } } + return nil } indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 8530210628cbb..729981ec71d2a 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { log.Error("Issue indexer handler: unable to get indexer!") - return + return data } iData := make([]*IndexerData, 0, len(data)) @@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) { if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } + return nil } issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index b458444697d63..f983fcd11d563 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -17,13 +17,14 @@ import ( var statsQueue queue.UniqueQueue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(int64) if err := indexer.Index(opts); err != nil { log.Error("stats queue indexer.Index(%d) failed: %v", opts, err) } } + return nil } func initStatsQueue() error { diff --git a/modules/json/json.go b/modules/json/json.go index 1cbb6582610df..3afa86023c6f8 100644 --- a/modules/json/json.go +++ b/modules/json/json.go @@ -4,6 +4,7 @@ package json +// Allow "encoding/json" import. import ( "bytes" "encoding/binary" diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index ecedd70193ea3..a27c5f699cbc5 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -38,13 +38,14 @@ func NewNotifier() base.Notifier { return ns } -func (ns *notificationService) handle(data ...queue.Data) { +func (ns *notificationService) handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(issueNotificationOpts) if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil { log.Error("Was unable to create issue notification: %v", err) } } + return nil } func (ns *notificationService) Run() { diff --git a/modules/public/mime_types.go b/modules/public/mime_types.go new file mode 100644 index 0000000000000..f8c92e824fdfa --- /dev/null +++ b/modules/public/mime_types.go @@ -0,0 +1,41 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package public + +import "strings" + +// wellKnownMimeTypesLower comes from Golang's builtin mime package: `builtinTypesLower`, see the comment of detectWellKnownMimeType +var wellKnownMimeTypesLower = map[string]string{ + ".avif": "image/avif", + ".css": "text/css; charset=utf-8", + ".gif": "image/gif", + ".htm": "text/html; charset=utf-8", + ".html": "text/html; charset=utf-8", + ".jpeg": "image/jpeg", + ".jpg": "image/jpeg", + ".js": "text/javascript; charset=utf-8", + ".json": "application/json", + ".mjs": "text/javascript; charset=utf-8", + ".pdf": "application/pdf", + ".png": "image/png", + ".svg": "image/svg+xml", + ".wasm": "application/wasm", + ".webp": "image/webp", + ".xml": "text/xml; charset=utf-8", + + // well, there are some types missing from the builtin list + ".txt": "text/plain; charset=utf-8", +} + +// detectWellKnownMimeType will return the mime-type for a well-known file ext name +// The purpose of this function is to bypass the unstable behavior of Golang's mime.TypeByExtension +// mime.TypeByExtension would use OS's mime-type config to overwrite the well-known types (see its document). +// If the user's OS has incorrect mime-type config, it would make Gitea can not respond a correct Content-Type to browsers. +// For example, if Gitea returns `text/plain` for a `.js` file, the browser couldn't run the JS due to security reasons. +// detectWellKnownMimeType makes the Content-Type for well-known files stable. +func detectWellKnownMimeType(ext string) string { + ext = strings.ToLower(ext) + return wellKnownMimeTypesLower[ext] +} diff --git a/modules/public/public.go b/modules/public/public.go index 91ecf42a3cac5..7804e945e798a 100644 --- a/modules/public/public.go +++ b/modules/public/public.go @@ -92,6 +92,15 @@ func parseAcceptEncoding(val string) map[string]bool { return types } +// setWellKnownContentType will set the Content-Type if the file is a well-known type. +// See the comments of detectWellKnownMimeType +func setWellKnownContentType(w http.ResponseWriter, file string) { + mimeType := detectWellKnownMimeType(filepath.Ext(file)) + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } +} + func (opts *Options) handle(w http.ResponseWriter, req *http.Request, fs http.FileSystem, file string) bool { // use clean to keep the file is a valid path with no . or .. f, err := fs.Open(path.Clean(file)) @@ -122,6 +131,8 @@ func (opts *Options) handle(w http.ResponseWriter, req *http.Request, fs http.Fi return true } + setWellKnownContentType(w, file) + serveContent(w, req, fi, fi.ModTime(), f) return true } diff --git a/modules/public/dynamic.go b/modules/public/serve_dynamic.go similarity index 100% rename from modules/public/dynamic.go rename to modules/public/serve_dynamic.go diff --git a/modules/public/static.go b/modules/public/serve_static.go similarity index 68% rename from modules/public/static.go rename to modules/public/serve_static.go index d373c712ee535..8e82175e39ca0 100644 --- a/modules/public/static.go +++ b/modules/public/serve_static.go @@ -9,15 +9,12 @@ package public import ( "bytes" - "compress/gzip" "io" - "mime" "net/http" "os" "path/filepath" "time" - "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/timeutil" ) @@ -66,24 +63,16 @@ func serveContent(w http.ResponseWriter, req *http.Request, fi os.FileInfo, modt encodings := parseAcceptEncoding(req.Header.Get("Accept-Encoding")) if encodings["gzip"] { if cf, ok := fi.(*vfsgen۰CompressedFileInfo); ok { - rd := bytes.NewReader(cf.GzipBytes()) - w.Header().Set("Content-Encoding", "gzip") - ctype := mime.TypeByExtension(filepath.Ext(fi.Name())) - if ctype == "" { - // read a chunk to decide between utf-8 text and binary - var buf [512]byte - grd, _ := gzip.NewReader(rd) - n, _ := io.ReadFull(grd, buf[:]) - ctype = http.DetectContentType(buf[:n]) - _, err := rd.Seek(0, io.SeekStart) // rewind to output whole file - if err != nil { - log.Error("rd.Seek error: %v", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + rdGzip := bytes.NewReader(cf.GzipBytes()) + // all static files are managed by Gitea, so we can make sure every file has the correct ext name + // then we can get the correct Content-Type, we do not need to do http.DetectContentType on the decompressed data + mimeType := detectWellKnownMimeType(filepath.Ext(fi.Name())) + if mimeType == "" { + mimeType = "application/octet-stream" } - w.Header().Set("Content-Type", ctype) - http.ServeContent(w, req, fi.Name(), modtime, rd) + w.Header().Set("Content-Type", mimeType) + w.Header().Set("Content-Encoding", "gzip") + http.ServeContent(w, req, fi.Name(), modtime, rdGzip) return } } diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 3a10c8e1259c6..bb98d468fb09b 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -16,6 +16,8 @@ type ByteFIFO interface { Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error + // PushBack pushes data back to the top of the fifo + PushBack(ctx context.Context, data []byte) error } // UniqueByteFIFO defines a FIFO that Uniques its contents @@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } +// PushBack pushes data back to the top of the fifo +func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error { + return nil +} + var _ UniqueByteFIFO = &DummyUniqueByteFIFO{} // DummyUniqueByteFIFO represents a dummy unique fifo diff --git a/modules/queue/manager.go b/modules/queue/manager.go index e0384d15a398e..56298a3e00b5e 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -54,6 +54,18 @@ type Flushable interface { IsEmpty() bool } +// Pausable represents a pool or queue that is Pausable +type Pausable interface { + // IsPaused will return if the pool or queue is paused + IsPaused() bool + // Pause will pause the pool or queue + Pause() + // Resume will resume the pool or queue + Resume() + // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed + IsPausedIsResumed() (paused, resumed <-chan struct{}) +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group @@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() continue } + if pausable, ok := mq.Managed.(Pausable); ok { + // no point flushing paused queues + if pausable.IsPaused() { + wg.Done() + continue + } + } + allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) @@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error log.Debug("All queues are empty") break } - // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign + // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing // but don't delay cancellation here. select { case <-ctx.Done(): @@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can return nil } +// Flushable returns true if the queue is flushable +func (q *ManagedQueue) Flushable() bool { + _, ok := q.Managed.(Flushable) + return ok +} + // Flush flushes the queue with a timeout func (q *ManagedQueue) Flush(timeout time.Duration) error { if flushable, ok := q.Managed.(Flushable); ok { @@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool { return true } +// Pausable returns whether the queue is Pausable +func (q *ManagedQueue) Pausable() bool { + _, ok := q.Managed.(Pausable) + return ok +} + +// Pause pauses the queue +func (q *ManagedQueue) Pause() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Pause() + } +} + +// IsPaused reveals if the queue is paused +func (q *ManagedQueue) IsPaused() bool { + if pausable, ok := q.Managed.(Pausable); ok { + return pausable.IsPaused() + } + return false +} + +// Resume resumes the queue +func (q *ManagedQueue) Resume() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Resume() + } +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { if pool, ok := q.Managed.(ManagedPool); ok { diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 80a9f1f2c729c..3a519651430e1 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -36,7 +36,7 @@ type Type string type Data interface{} // HandlerFunc is a function that takes a variable amount of data and processes it -type HandlerFunc func(...Data) +type HandlerFunc func(...Data) (unhandled []Data) // NewQueueFunc is a function that creates a queue type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error) @@ -61,6 +61,12 @@ type Queue interface { Push(Data) error } +// PushBackable queues can be pushed back to +type PushBackable interface { + // PushBack pushes data back to the top of the fifo + PushBack(Data) error +} + // DummyQueueType is the type for the dummy queue const DummyQueueType Type = "dummy" diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index c4d5d20a896ac..0380497ea675f 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -8,10 +8,12 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" ) // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue @@ -52,8 +54,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + q := &ByteFIFOQueue{ byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -65,7 +66,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem name: config.Name, waitOnEmpty: config.WaitOnEmpty, pushed: make(chan struct{}, 1), - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Name returns the name of this queue @@ -78,6 +89,24 @@ func (q *ByteFIFOQueue) Push(data Data) error { return q.PushFunc(data, nil) } +// PushBack pushes data to the fifo +func (q *ByteFIFOQueue) PushBack(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + return q.byteFIFO.PushBack(q.terminateCtx, bs) +} + // PushFunc pushes data to the fifo func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { @@ -87,14 +116,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - if q.waitOnEmpty { - defer func() { - select { - case q.pushed <- struct{}{}: - default: - } - }() - } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } @@ -108,6 +135,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool { return q.byteFIFO.Len(q.terminateCtx) == 0 } +// Flush flushes the ByteFIFOQueue +func (q *ByteFIFOQueue) Flush(timeout time.Duration) error { + select { + case q.pushed <- struct{}{}: + default: + } + return q.WorkerPool.Flush(timeout) +} + // Run runs the bytefifo queue func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) @@ -142,31 +178,67 @@ func (q *ByteFIFOQueue) readToChan() { // Default backoff values backOffTime := time.Millisecond * 100 + backOffTimer := time.NewTimer(0) + util.StopTimer(backOffTimer) + + paused, _ := q.IsPausedIsResumed() loop: for { - err := q.doPop() - if err == errQueueEmpty { - log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + select { + case <-paused: + log.Trace("Queue %s pausing", q.name) + _, resumed := q.IsPausedIsResumed() + select { - case <-q.pushed: - // reset backOffTime - backOffTime = 100 * time.Millisecond - continue loop + case <-resumed: + paused, _ = q.IsPausedIsResumed() + log.Trace("Queue %s resuming", q.name) + if q.HasNoWorkerScaling() { + log.Warn( + "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name) + q.Pause() + continue loop + } case <-q.shutdownCtx.Done(): - // Oops we've been shutdown whilst waiting - // Make sure the worker pool is shutdown too + // tell the pool to shutdown. q.baseCtxCancel() return + case data := <-q.dataChan: + if err := q.PushBack(data); err != nil { + log.Error("Unable to push back data into queue %s", q.name) + } + atomic.AddInt64(&q.numInQueue, -1) } + default: } - // Reset the backOffTime if there is no error or an unmarshalError - if err == nil || err == errUnmarshal { - backOffTime = 100 * time.Millisecond + // empty the pushed channel + select { + case <-q.pushed: + default: } + err := q.doPop() + + util.StopTimer(backOffTimer) + if err != nil { + if err == errQueueEmpty && q.waitOnEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + + // reset the backoff time but don't set the timer + backOffTime = 100 * time.Millisecond + } else if err == errUnmarshal { + // reset the timer and backoff + backOffTime = 100 * time.Millisecond + backOffTimer.Reset(backOffTime) + } else { + // backoff + backOffTimer.Reset(backOffTime) + } + // Need to Backoff select { case <-q.shutdownCtx.Done(): @@ -174,8 +246,13 @@ loop: // Make sure the worker pool is shutdown too q.baseCtxCancel() return - case <-time.After(backOffTime): - // OK we've waited - so backoff a bit + case <-q.pushed: + // Data has been pushed to the fifo (or flush has been called) + // reset the backoff time + backOffTime = 100 * time.Millisecond + continue loop + case <-backOffTimer.C: + // Calculate the next backoff time backOffTime += backOffTime / 2 if backOffTime > maxBackOffTime { backOffTime = maxBackOffTime @@ -183,6 +260,10 @@ loop: continue loop } } + + // Reset the backoff time + backOffTime = 100 * time.Millisecond + select { case <-q.shutdownCtx.Done(): // Oops we've been shutdown @@ -289,9 +370,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOUniqueQueue{ + q := &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -302,7 +382,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun workers: config.Workers, name: config.Name, }, - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Has checks if the provided data is in the queue diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 4df64b69ee5ee..7de9c17c86243 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,6 +7,8 @@ package queue import ( "context" "fmt" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/log" ) @@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro workers: config.Workers, name: config.Name, } + queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data { + unhandled := handle(data...) + if len(unhandled) > 0 { + // We can only pushback to the channel if we're paused. + if queue.IsPaused() { + atomic.AddInt64(&queue.numInQueue, int64(len(unhandled))) + go func() { + for _, datum := range data { + queue.dataChan <- datum + } + }() + return nil + } + } + return unhandled + }, config.WorkerPoolConfiguration) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -81,6 +99,39 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelQueue) Flush(timeout time.Duration) error { + if q.IsPaused() { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() + for { + select { + case <-paused: + return nil + case data := <-q.dataChan: + if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { q.lock.Lock() @@ -94,6 +145,7 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Shutting down", q.name) go func() { log.Trace("ChannelQueue: %s Flushing", q.name) + // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) return diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index f1ddd7ec92104..b700b28a14a5a 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,6 +5,7 @@ package queue import ( + "sync" "testing" "time" @@ -13,11 +14,12 @@ import ( func TestChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -52,12 +54,13 @@ func TestChannelQueue(t *testing.T) { func TestChannelQueue_Batch(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -95,3 +98,156 @@ func TestChannelQueue_Batch(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) } + +func TestChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 911233a5d9a01..2691ab02f51ee 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -94,6 +94,11 @@ func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn fu return fifo.internal.LPush(data) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index f3cd132d7d631..3b21575a0e979 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } config := configInterface.(PersistableChannelQueueConfiguration) - channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + queue := &PersistableChannelQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( DataDir: config.DataDir, } - levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) if err == nil { - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - internal: levelQueue.(*LevelQueue), - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelQueue), + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( return nil, ErrInvalidConfiguration{cfg: cfg} } - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - cfg: levelCfg, - underlying: LevelQueueType, - timeout: config.Timeout, - maxAttempts: config.MaxAttempts, - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error { } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) @@ -226,6 +246,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool { return q.internal.IsEmpty() } +// IsPaused returns if the pool is paused +func (q *PersistableChannelQueue) IsPaused() bool { + return q.channelQueue.IsPaused() +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + return q.channelQueue.IsPausedIsResumed() +} + +// Pause pauses the WorkerPool +func (q *PersistableChannelQueue) Pause() { + q.channelQueue.Pause() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Pause() +} + +// Resume resumes the WorkerPool +func (q *PersistableChannelQueue) Resume() { + q.channelQueue.Resume() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Resume() +} + // Shutdown processing this queue func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index db12d9575c87a..9bbd146efe9be 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -8,7 +8,9 @@ import ( "os" "sync" "testing" + "time" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -16,7 +18,7 @@ import ( func TestPersistableChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { if datum == nil { continue @@ -24,6 +26,7 @@ func TestPersistableChannelQueue(t *testing.T) { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } lock := sync.Mutex{} @@ -189,3 +192,290 @@ func TestPersistableChannelQueue(t *testing.T) { callback() } } + +func TestPersistableChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + log.Info("pausing") + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data") + assert.NoError(t, err) + defer util.RemoveAll(tmpDir) + + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + lock.Lock() + callbacks := make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + // Now shutdown the queue + for _, callback := range callbacks { + callback() + } + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + // terminate the queue + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + lock.Lock() + pushBack = true + lock.Unlock() + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 1, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", + }, &testData{}) + assert.NoError(t, err) + pausable, ok = queue.(Pausable) + if !assert.True(t, ok) { + return + } + + paused, _ = pausable.IsPausedIsResumed() + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + case <-paused: + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + pausable.Resume() + + result3 := <-handleChan + result4 := <-handleChan + if result4.TestString == test1.TestString { + result3, result4 = result4, result3 + } + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + lock.Lock() + callbacks = make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } +} diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index a2c21fec085bb..d2d8e135cb6cf 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -17,12 +17,13 @@ import ( func TestLevelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } var lock sync.Mutex diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index a5fb866dc1e11..84ab235d5efbf 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -57,6 +57,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) type redisClient interface { RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd + LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd LPop(ctx context.Context, key string) *redis.StringCmd LLen(ctx context.Context, key string) *redis.IntCmd SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd @@ -103,6 +104,11 @@ func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() diff --git a/modules/queue/setting.go b/modules/queue/setting.go index caaf123d42b68..61f156c377e26 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -65,6 +65,16 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { log.Error("Unable to create queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable } @@ -103,5 +113,15 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un log.Error("Unable to create unique queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable.(UniqueQueue) } diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index f617595c04380..b6d2e770fce2d 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -8,6 +8,8 @@ import ( "context" "fmt" "sync" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" @@ -64,7 +66,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue workers: config.Workers, name: config.Name, } - queue.WorkerPool = NewWorkerPool(func(data ...Data) { + queue.WorkerPool = NewWorkerPool(func(data ...Data) (unhandled []Data) { for _, datum := range data { // No error is possible here because PushFunc ensures that this can be marshalled bs, _ := json.Marshal(datum) @@ -73,8 +75,20 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue delete(queue.table, string(bs)) queue.lock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + if queue.IsPaused() { + // We can only pushback to the channel if we're paused. + go func() { + if err := queue.Push(u[0]); err != nil { + log.Error("Unable to push back to queue %d. Error: %v", queue.qid, err) + } + }() + } else { + unhandled = append(unhandled, u...) + } + } } + return unhandled }, config.WorkerPoolConfiguration) queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar) @@ -143,6 +157,42 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { + if q.IsPaused() { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelUniqueQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() + for { + select { + case <-paused: + return nil + default: + } + select { + case data := <-q.dataChan: + if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelUniqueQueue) Shutdown() { log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go new file mode 100644 index 0000000000000..ef6752079e149 --- /dev/null +++ b/modules/queue/unique_queue_channel_test.go @@ -0,0 +1,252 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestChannelUniqueQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 0, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + Workers: 0, + Name: "TestChannelQueue", + }, &testData{}) + assert.NoError(t, err) + + assert.Equal(t, queue.(*ChannelUniqueQueue).WorkerPool.boostWorkers, 5) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + go queue.Push(&test1) + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Batch(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) +} diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bb0eb7d950c59..dae32f75a8518 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -93,6 +93,11 @@ func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, return fifo.internal.LPushFunc(data, fn) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index d71f5e2b0459f..7fc304b17e9bd 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } config := configInterface.(PersistableChannelUniqueQueueConfiguration) - channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{ + queue := &PersistableChannelUniqueQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,18 +97,16 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac DataDir: config.DataDir, } - queue := &PersistableChannelUniqueQueue{ - channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), - } + queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue) - levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { + levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data { for _, datum := range data { err := queue.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, levelCfg, exemplar) if err == nil { queue.delayedStarter = delayedStarter{ @@ -142,6 +153,19 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... @@ -163,13 +187,14 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) q.lock.Lock() if q.internal == nil { - err := q.setInternal(atShutdown, func(data ...Data) { + err := q.setInternal(atShutdown, func(data ...Data) []Data { for _, datum := range data { err := q.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 7474c096655d3..477d5dd81f1d5 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go index 8c815218ddd21..32fa9ed970dbc 100644 --- a/modules/queue/unique_queue_wrapped.go +++ b/modules/queue/unique_queue_wrapped.go @@ -73,7 +73,7 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue // wrapped.handle is passed to the delayedStarting internal queue and is run to handle // data passed to - wrapped.handle = func(data ...Data) { + wrapped.handle = func(data ...Data) (unhandled []Data) { for _, datum := range data { wrapped.tlock.Lock() if !wrapped.ready { @@ -87,8 +87,11 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } } wrapped.tlock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + unhandled = append(unhandled, u...) + } } + return unhandled } _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar) return wrapped, nil diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 653d0558c8026..da56216dcb124 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -22,6 +22,8 @@ type WorkerPool struct { lock sync.Mutex baseCtx context.Context baseCtxCancel context.CancelFunc + paused chan struct{} + resumed chan struct{} cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -35,6 +37,11 @@ type WorkerPool struct { numInQueue int64 } +var ( + _ Flushable = &WorkerPool{} + _ ManagedPool = &WorkerPool{} +) + // WorkerPoolConfiguration is the basic configuration for a WorkerPool type WorkerPoolConfiguration struct { QueueLength int @@ -50,11 +57,15 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo ctx, cancel := context.WithCancel(context.Background()) dataChan := make(chan Data, config.QueueLength) + resumed := make(chan struct{}) + close(resumed) pool := &WorkerPool{ baseCtx: ctx, baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, + resumed: resumed, + paused: make(chan struct{}), handle: handle, blockTimeout: config.BlockTimeout, boostTimeout: config.BoostTimeout, @@ -69,6 +80,14 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() + select { + case <-p.paused: + p.lock.Unlock() + p.dataChan <- data + return + default: + } + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { if p.numberOfWorkers == 0 { p.zeroBoost() @@ -82,6 +101,17 @@ func (p *WorkerPool) Push(data Data) { } } +// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting +func (p *WorkerPool) HasNoWorkerScaling() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.hasNoWorkerScaling() +} + +func (p *WorkerPool) hasNoWorkerScaling() bool { + return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0) +} + func (p *WorkerPool) zeroBoost() { ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) @@ -272,6 +302,12 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } + if p.hasNoWorkerScaling() { + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + p.pause() + } p.lock.Unlock() }() } @@ -290,13 +326,65 @@ func (p *WorkerPool) Wait() { p.cond.Wait() } +// IsPaused returns if the pool is paused +func (p *WorkerPool) IsPaused() bool { + p.lock.Lock() + defer p.lock.Unlock() + select { + case <-p.paused: + return true + default: + return false + } +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + p.lock.Lock() + defer p.lock.Unlock() + return p.paused, p.resumed +} + +// Pause pauses the WorkerPool +func (p *WorkerPool) Pause() { + p.lock.Lock() + defer p.lock.Unlock() + p.pause() +} + +func (p *WorkerPool) pause() { + select { + case <-p.paused: + default: + p.resumed = make(chan struct{}) + close(p.paused) + } +} + +// Resume resumes the WorkerPool +func (p *WorkerPool) Resume() { + p.lock.Lock() + defer p.lock.Unlock() + select { + case <-p.resumed: + default: + p.paused = make(chan struct{}) + close(p.resumed) + } +} + // CleanUp will drain the remaining contents of the channel // This should be called after AddWorkers context is closed func (p *WorkerPool) CleanUp(ctx context.Context) { log.Trace("WorkerPool: %d CleanUp", p.qid) close(p.dataChan) for data := range p.dataChan { - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + if unhandled != nil { + log.Error("Unhandled Data in clean-up of queue %d", p.qid) + } + } + atomic.AddInt64(&p.numInQueue, -1) select { case <-ctx.Done(): @@ -327,7 +415,9 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { for { select { case data := <-p.dataChan: - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1) case <-p.baseCtx.Done(): return p.baseCtx.Err() @@ -341,13 +431,45 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 + + // Create a common timer - we will use this elsewhere + timer := time.NewTimer(0) + util.StopTimer(timer) + + paused, _ := p.IsPausedIsResumed() data := make([]Data, 0, p.batchLength) for { select { + case <-paused: + log.Trace("Worker for Queue %d Pausing", p.qid) + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + } + _, resumed := p.IsPausedIsResumed() + select { + case <-resumed: + paused, _ = p.IsPausedIsResumed() + log.Trace("Worker for Queue %d Resuming", p.qid) + util.StopTimer(timer) + case <-ctx.Done(): + log.Trace("Worker shutting down") + return + } + default: + } + select { + case <-paused: + // go back around case <-ctx.Done(): if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") @@ -357,59 +479,36 @@ func (p *WorkerPool) doWork(ctx context.Context) { // the dataChan has been closed - we should finish up: if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return } data = append(data, datum) + util.StopTimer(timer) + if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) + } else { + timer.Reset(delay) } - default: - timer := time.NewTimer(delay) - select { - case <-ctx.Done(): - util.StopTimer(timer) - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - case datum, ok := <-p.dataChan: - util.StopTimer(timer) - if !ok { - // the dataChan has been closed - we should finish up: - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - } - data = append(data, datum) - if len(data) >= p.batchLength { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) - } - case <-timer.C: - delay = time.Millisecond * 100 - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) + case <-timer.C: + delay = time.Millisecond * 100 + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) } - + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + data = make([]Data, 0, p.batchLength) } } } diff --git a/modules/setting/webhook.go b/modules/setting/webhook.go index b576f9573b329..0bfd7dcb4dd3f 100644 --- a/modules/setting/webhook.go +++ b/modules/setting/webhook.go @@ -36,7 +36,7 @@ func newWebhookService() { Webhook.DeliverTimeout = sec.Key("DELIVER_TIMEOUT").MustInt(5) Webhook.SkipTLSVerify = sec.Key("SKIP_TLS_VERIFY").MustBool() Webhook.AllowedHostList = sec.Key("ALLOWED_HOST_LIST").MustString("") - Webhook.Types = []string{"gitea", "gogs", "slack", "discord", "dingtalk", "telegram", "msteams", "feishu", "matrix", "wechatwork"} + Webhook.Types = []string{"gitea", "gogs", "slack", "discord", "dingtalk", "telegram", "msteams", "feishu", "matrix", "wechatwork", "packagist"} Webhook.PagingNum = sec.Key("PAGING_NUM").MustInt(10) Webhook.ProxyURL = sec.Key("PROXY_URL").MustString("") if Webhook.ProxyURL != "" { diff --git a/modules/structs/hook.go b/modules/structs/hook.go index bb62483cda3c8..e4d7652c72a99 100644 --- a/modules/structs/hook.go +++ b/modules/structs/hook.go @@ -40,7 +40,7 @@ type CreateHookOptionConfig map[string]string // CreateHookOption options when create a hook type CreateHookOption struct { // required: true - // enum: dingtalk,discord,gitea,gogs,msteams,slack,telegram,feishu,wechatwork + // enum: dingtalk,discord,gitea,gogs,msteams,slack,telegram,feishu,wechatwork,packagist Type string `json:"type" binding:"Required"` // required: true Config CreateHookOptionConfig `json:"config" binding:"Required"` diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index 301bd4f66e31b..de0d26d647711 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1947,6 +1947,10 @@ settings.add_matrix_hook_desc = Integrate Matrix into your repo settings.add_msteams_hook_desc = Integrate Microsoft Teams into your repository. settings.add_feishu_hook_desc = Integrate Feishu into your repository. settings.add_Wechat_hook_desc = Integrate Wechatwork into your repository. +settings.add_packagist_hook_desc = Integrate Packagist into your repository. +settings.packagist_username = Packagist username +settings.packagist_api_token = API token +settings.packagist_package_url = Packagist package URL settings.deploy_keys = Deploy Keys settings.add_deploy_key = Add Deploy Key settings.deploy_key_desc = Deploy keys have read-only pull access to the repository. @@ -2803,6 +2807,12 @@ monitor.queue.pool.flush.title = Flush Queue monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out. monitor.queue.pool.flush.submit = Add Flush Worker monitor.queue.pool.flush.added = Flush Worker added for %[1]s +monitor.queue.pool.pause.title = Pause Queue +monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data +monitor.queue.pool.pause.submit = Pause Queue +monitor.queue.pool.resume.title = Resume Queue +monitor.queue.pool.resume.desc = Set this queue to resume work +monitor.queue.pool.resume.submit = Resume Queue monitor.queue.settings.title = Pool Settings monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups. diff --git a/options/locale/locale_ja-JP.ini b/options/locale/locale_ja-JP.ini index fcfb2dcb76bb3..6641fa9dee507 100644 --- a/options/locale/locale_ja-JP.ini +++ b/options/locale/locale_ja-JP.ini @@ -34,6 +34,20 @@ twofa=2要素認証 twofa_scratch=2要素認証スクラッチコード passcode=パスコード +webauthn_insert_key=セキュリティキーを挿入 +webauthn_sign_in=セキュリティキーのボタンを押してください。セキュリティキーにボタンが無い場合は、挿入しなおしてください。 +webauthn_press_button=セキュリティキーのボタンを押してください... +webauthn_use_twofa=携帯電話から2要素認証コードを使用する +webauthn_error=セキュリティキーを読み取ることができません。 +webauthn_unsupported_browser=お使いのブラウザは現在 WebAuthn をサポートしていません。 +webauthn_error_unknown=不明なエラーが発生しました。 もう一度やり直してください。 +webauthn_error_insecure=WebAuthn はセキュアな接続のみをサポートしています。HTTP 経由でテストする場合は、"localhost" または "127.0.0.1" のオリジンが使用できます。 +webauthn_error_unable_to_process=サーバーがリクエストを処理できませんでした。 +webauthn_error_duplicated=このリクエストに対しては、許可されていないセキュリティキーです。 キーが未登録であることを確認してください。 +webauthn_error_empty=このキーに名前を設定する必要があります。 +webauthn_error_timeout=キーを読み取る前にタイムアウトになりました。 このページをリロードしてもう一度やり直してください。 +webauthn_u2f_deprecated=キー: '%s' は非推奨のU2Fプロセスを使用して認証しています。このキーを再登録して古い登録を削除したほうが良いでしょう。 +webauthn_reload=リロード repository=リポジトリ organization=組織 @@ -513,6 +527,7 @@ twofa=2要素認証 account_link=連携アカウント organization=組織 uid=Uid +webauthn=セキュリティキー public_profile=公開プロフィール biography_placeholder=自己紹介を少しだけ @@ -733,6 +748,11 @@ passcode_invalid=パスコードが間違っています。 再度お試しく twofa_enrolled=あなたのアカウントに2要素認証が設定されました。 スクラッチトークン (%s) は一度しか表示しませんので安全な場所に保存してください! twofa_failed_get_secret=シークレットが取得できません。 +webauthn_desc=セキュリティキーは暗号化キーを内蔵するハードウェア ・ デバイスです。 2要素認証に使用できます。 セキュリティキーはWebAuthn Authenticator規格をサポートしている必要があります。 +webauthn_register_key=セキュリティキーを追加 +webauthn_nickname=ニックネーム +webauthn_delete_key=セキュリティキーの登録解除 +webauthn_delete_key_desc=セキュリティキーの登録を解除すると、今後そのセキュリティキーでサインインすることはできなくなります。 続行しますか? manage_account_links=連携アカウントの管理 manage_account_links_desc=これらの外部アカウントがGiteaアカウントと連携されています。 diff --git a/options/locale/locale_uk-UA.ini b/options/locale/locale_uk-UA.ini index 891517d57f438..0f7a098f1c45c 100644 --- a/options/locale/locale_uk-UA.ini +++ b/options/locale/locale_uk-UA.ini @@ -1575,16 +1575,16 @@ activity.title.prs_merged_by=%s злито %s activity.title.prs_opened_by=%s запропоновано %s activity.merged_prs_label=Злито activity.opened_prs_label=Запропоновано -activity.active_issues_count_1=%d Активна проблема +activity.active_issues_count_1=%d Активна задача activity.active_issues_count_n=%d Активні задачі -activity.closed_issues_count_1=Закрита проблема +activity.closed_issues_count_1=Закрита задача activity.closed_issues_count_n=Закриті задачі -activity.title.issues_1=%d Проблема +activity.title.issues_1=%d Задач activity.title.issues_n=%d Задач activity.title.issues_closed_from=%s закрито %s activity.title.issues_created_by=%s створена(і) %s activity.closed_issue_label=Закрито -activity.new_issues_count_1=Нова Проблема +activity.new_issues_count_1=Нова задача activity.new_issues_count_n=Нові Задачі activity.new_issue_label=Відкриті activity.title.unresolved_conv_1=%d Незавершене обговорення @@ -1661,10 +1661,10 @@ settings.use_external_wiki=Використовувати зовнішні Ві settings.external_wiki_url=URL зовнішньої вікі settings.external_wiki_url_error=Зовнішня URL-адреса wiki не є допустимою URL-адресою. settings.external_wiki_url_desc=Відвідувачі будуть перенаправлені на URL-адресу, коли вони клацають по вкладці. -settings.issues_desc=Увімкнути відстеження проблем в репозиторію -settings.use_internal_issue_tracker=Використовувати вбудовану систему відстеження проблем +settings.issues_desc=Увімкнути відстеження задач в репозиторію +settings.use_internal_issue_tracker=Використовувати вбудовану систему відстеження задач settings.use_external_issue_tracker=Використовувати зовнішню систему обліку завдань -settings.external_tracker_url=URL зовнішньої системи відстеження проблем +settings.external_tracker_url=URL зовнішньої системи відстеження задач settings.external_tracker_url_error=URL зовнішнього баг-трекера не є допустимою URL-адресою. settings.external_tracker_url_desc=Відвідувачі перенаправляються на зовнішню URL-адресу, коли натискають вкладку 'Задачі'. settings.tracker_url_format=Формат URL зовнішнього трекера задач @@ -1687,7 +1687,7 @@ settings.pulls.default_delete_branch_after_merge=Видаляти гілку з settings.projects_desc=Увімкнути проєкти у репозиторії settings.admin_settings=Налаштування адміністратора settings.admin_enable_health_check=Включити перевірки працездатності репозиторію (git fsck) -settings.admin_enable_close_issues_via_commit_in_any_branch=Закрити проблему за допомогою коміта, зробленого не головній гілці +settings.admin_enable_close_issues_via_commit_in_any_branch=Закрити задачу за допомогою коміта, зробленого не в головній гілці settings.danger_zone=Небезпечна зона settings.new_owner_has_same_repo=Новий власник вже має репозиторій з такою назвою. Будь ласка, виберіть інше ім'я. settings.convert=Перетворити на звичайний репозиторій @@ -1808,15 +1808,15 @@ settings.event_repository=Репозиторій settings.event_repository_desc=Репозиторій створений або видалено. settings.event_header_issue=Події задачі settings.event_issues=Задачі -settings.event_issues_desc=Проблема відкрита, закрита, повторно відкрита або відредагована. -settings.event_issue_assign=Проблема прив'язана -settings.event_issue_assign_desc=Проблема призначена або скасована. -settings.event_issue_label=Проблема з міткою -settings.event_issue_label_desc=Мітки проблем оновлено або видалено. -settings.event_issue_milestone=Проблеми етапу -settings.event_issue_milestone_desc=Проблема призначена на етап або видалена з етапу. -settings.event_issue_comment=Коментар проблеми -settings.event_issue_comment_desc=Коментар проблеми створено, видалено чи відредаговано. +settings.event_issues_desc=Задача відкрита, закрита, повторно відкрита або відредагована. +settings.event_issue_assign=Задача прив'язана +settings.event_issue_assign_desc=Задачу призначено або скасовано. +settings.event_issue_label=Задача з міткою +settings.event_issue_label_desc=Мітки задачі оновлено або видалено. +settings.event_issue_milestone=Задача з етапом +settings.event_issue_milestone_desc=Задача призначена на етап або видалена з етапу. +settings.event_issue_comment=Коментар задачі +settings.event_issue_comment_desc=Коментар задачі створено, видалено чи відредаговано. settings.event_header_pull_request=Події запиту злиття settings.event_pull_request=Запити до злиття settings.event_pull_request_desc=Запит до злиття відкрито, закрито, перевідкрито або відредаговано. @@ -2584,7 +2584,7 @@ config.default_enable_timetracking=Увімкнути відстеження ч config.default_allow_only_contributors_to_track_time=Враховувати тільки учасників розробки в підрахунку часу config.no_reply_address=Прихований домен електронної пошти config.default_visibility_organization=Видимість за замовчуванням для нових організацій -config.default_enable_dependencies=Увімкнути залежності проблем за замовчуванням +config.default_enable_dependencies=Увімкнути залежності задачі за замовчуванням config.webhook_config=Конфігурація web-хуків config.queue_length=Довжина черги @@ -2739,13 +2739,13 @@ notices.delete_success=Сповіщення системи були видале create_repo=створив(ла) репозиторій %s rename_repo=репозиторій перейменовано з %[1]s на %[3]s commit_repo=надіслав зміни (push) до %[3]s о %[4]s -create_issue=`відкрив проблему %[3]s#%[2]s` -close_issue=`закрив проблему %[3]s#%[2]s` -reopen_issue=`повторно відкрив проблему %[3]s#%[2]s` +create_issue=`відкрив задачу %[3]s#%[2]s` +close_issue=`закрив задачу %[3]s#%[2]s` +reopen_issue=`повторно відкрив задачу %[3]s#%[2]s` create_pull_request=`створив запит злиття %[3]s#%[2]s` close_pull_request=`закрив запит злиття %[3]s#%[2]s` reopen_pull_request=`повторно відкрив запит злиття %[3]s#%[2]s` -comment_issue=`прокоментував проблему %[3]s#%[2]s` +comment_issue=`прокоментував задачу %[3]s#%[2]s` comment_pull=`прокоментував запит злиття %[3]s#%[2]s` merge_pull_request=`прийняв запит злиття %[3]s#%[2]s` transfer_repo=перенесено репозиторій %s у %s diff --git a/public/img/packagist.png b/public/img/packagist.png new file mode 100644 index 0000000000000..76c0e62a20d7c Binary files /dev/null and b/public/img/packagist.png differ diff --git a/routers/web/admin/admin.go b/routers/web/admin/admin.go index 276e1939ad7bb..fac3ef9622520 100644 --- a/routers/web/admin/admin.go +++ b/routers/web/admin/admin.go @@ -394,6 +394,30 @@ func Flush(ctx *context.Context) { ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) } +// Pause pauses a queue +func Pause(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { + ctx.Status(404) + return + } + mq.Pause() + ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) +} + +// Resume resumes a queue +func Resume(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { + ctx.Status(404) + return + } + mq.Resume() + ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) +} + // AddWorkers adds workers to a worker group func AddWorkers(ctx *context.Context) { qid := ctx.ParamsInt64("qid") diff --git a/routers/web/repo/http.go b/routers/web/repo/http.go index 3805ceea76d94..1b5004017fa35 100644 --- a/routers/web/repo/http.go +++ b/routers/web/repo/http.go @@ -12,7 +12,6 @@ import ( "fmt" "net/http" "os" - "os/exec" "path" "regexp" "strconv" @@ -30,7 +29,6 @@ import ( "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/util" @@ -486,18 +484,17 @@ func serviceRPC(ctx gocontext.Context, h serviceHandler, service string) { h.environ = append(h.environ, "GIT_PROTOCOL="+protocol) } - ctx, _, finished := process.GetManager().AddContext(h.r.Context(), fmt.Sprintf("%s %s %s [repo_path: %s]", git.GitExecutable, service, "--stateless-rpc", h.dir)) - defer finished() - var stderr bytes.Buffer - cmd := exec.CommandContext(ctx, git.GitExecutable, service, "--stateless-rpc", h.dir) - cmd.Dir = h.dir - cmd.Env = append(os.Environ(), h.environ...) - cmd.Stdout = h.w - cmd.Stdin = reqBody - cmd.Stderr = &stderr - - if err := cmd.Run(); err != nil { + cmd := git.NewCommandContextNoGlobals(h.r.Context(), service, "--stateless-rpc", h.dir) + cmd.SetDescription(fmt.Sprintf("%s %s %s [repo_path: %s]", git.GitExecutable, service, "--stateless-rpc", h.dir)) + if err := cmd.RunWithContext(&git.RunContext{ + Timeout: -1, + Dir: h.dir, + Env: append(os.Environ(), h.environ...), + Stdout: h.w, + Stdin: reqBody, + Stderr: &stderr, + }); err != nil { log.Error("Fail to serve RPC(%s) in %s: %v - %s", service, h.dir, err, stderr.String()) return } diff --git a/routers/web/repo/webhook.go b/routers/web/repo/webhook.go index 2ec2e8911f695..fb984de7f5857 100644 --- a/routers/web/repo/webhook.go +++ b/routers/web/repo/webhook.go @@ -682,6 +682,59 @@ func WechatworkHooksNewPost(ctx *context.Context) { ctx.Redirect(orCtx.Link) } +// PackagistHooksNewPost response for creating packagist hook +func PackagistHooksNewPost(ctx *context.Context) { + form := web.GetForm(ctx).(*forms.NewPackagistHookForm) + ctx.Data["Title"] = ctx.Tr("repo.settings") + ctx.Data["PageIsSettingsHooks"] = true + ctx.Data["PageIsSettingsHooksNew"] = true + ctx.Data["Webhook"] = webhook.Webhook{HookEvent: &webhook.HookEvent{}} + ctx.Data["HookType"] = webhook.PACKAGIST + + orCtx, err := getOrgRepoCtx(ctx) + if err != nil { + ctx.ServerError("getOrgRepoCtx", err) + return + } + + if ctx.HasError() { + ctx.HTML(http.StatusOK, orCtx.NewTemplate) + return + } + + meta, err := json.Marshal(&webhook_service.PackagistMeta{ + Username: form.Username, + APIToken: form.APIToken, + PackageURL: form.PackageURL, + }) + if err != nil { + ctx.ServerError("Marshal", err) + return + } + + w := &webhook.Webhook{ + RepoID: orCtx.RepoID, + URL: fmt.Sprintf("https://packagist.org/api/update-package?username=%s&apiToken=%s", url.QueryEscape(form.Username), url.QueryEscape(form.APIToken)), + ContentType: webhook.ContentTypeJSON, + HookEvent: ParseHookEvent(form.WebhookForm), + IsActive: form.Active, + Type: webhook.PACKAGIST, + Meta: string(meta), + OrgID: orCtx.OrgID, + IsSystemWebhook: orCtx.IsSystemWebhook, + } + if err := w.UpdateEvent(); err != nil { + ctx.ServerError("UpdateEvent", err) + return + } else if err := webhook.CreateWebhook(db.DefaultContext, w); err != nil { + ctx.ServerError("CreateWebhook", err) + return + } + + ctx.Flash.Success(ctx.Tr("repo.settings.add_hook_success")) + ctx.Redirect(orCtx.Link) +} + func checkWebhook(ctx *context.Context) (*orgRepoCtx, *webhook.Webhook) { ctx.Data["RequireHighlightJS"] = true @@ -719,6 +772,8 @@ func checkWebhook(ctx *context.Context) (*orgRepoCtx, *webhook.Webhook) { ctx.Data["TelegramHook"] = webhook_service.GetTelegramHook(w) case webhook.MATRIX: ctx.Data["MatrixHook"] = webhook_service.GetMatrixHook(w) + case webhook.PACKAGIST: + ctx.Data["PackagistHook"] = webhook_service.GetPackagistHook(w) } ctx.Data["History"], err = w.History(1) @@ -1137,6 +1192,50 @@ func WechatworkHooksEditPost(ctx *context.Context) { ctx.Redirect(fmt.Sprintf("%s/%d", orCtx.Link, w.ID)) } +// PackagistHooksEditPost response for editing packagist hook +func PackagistHooksEditPost(ctx *context.Context) { + form := web.GetForm(ctx).(*forms.NewPackagistHookForm) + ctx.Data["Title"] = ctx.Tr("repo.settings") + ctx.Data["PageIsSettingsHooks"] = true + ctx.Data["PageIsSettingsHooksEdit"] = true + + orCtx, w := checkWebhook(ctx) + if ctx.Written() { + return + } + ctx.Data["Webhook"] = w + + if ctx.HasError() { + ctx.HTML(http.StatusOK, orCtx.NewTemplate) + return + } + + meta, err := json.Marshal(&webhook_service.PackagistMeta{ + Username: form.Username, + APIToken: form.APIToken, + PackageURL: form.PackageURL, + }) + if err != nil { + ctx.ServerError("Marshal", err) + return + } + + w.Meta = string(meta) + w.URL = fmt.Sprintf("https://packagist.org/api/update-package?username=%s&apiToken=%s", url.QueryEscape(form.Username), url.QueryEscape(form.APIToken)) + w.HookEvent = ParseHookEvent(form.WebhookForm) + w.IsActive = form.Active + if err := w.UpdateEvent(); err != nil { + ctx.ServerError("UpdateEvent", err) + return + } else if err := webhook.UpdateWebhook(w); err != nil { + ctx.ServerError("UpdateWebhook", err) + return + } + + ctx.Flash.Success(ctx.Tr("repo.settings.update_hook_success")) + ctx.Redirect(fmt.Sprintf("%s/%d", orCtx.Link, w.ID)) +} + // TestWebhook test if web hook is work fine func TestWebhook(ctx *context.Context) { hookID := ctx.ParamsInt64(":id") diff --git a/routers/web/web.go b/routers/web/web.go index 6415788e446ba..545194aabd3ed 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -402,6 +402,8 @@ func RegisterRoutes(m *web.Route) { m.Post("/add", admin.AddWorkers) m.Post("/cancel/{pid}", admin.WorkerCancel) m.Post("/flush", admin.Flush) + m.Post("/pause", admin.Pause) + m.Post("/resume", admin.Resume) }) }) @@ -446,6 +448,7 @@ func RegisterRoutes(m *web.Route) { m.Post("/msteams/{id}", bindIgnErr(forms.NewMSTeamsHookForm{}), repo.MSTeamsHooksEditPost) m.Post("/feishu/{id}", bindIgnErr(forms.NewFeishuHookForm{}), repo.FeishuHooksEditPost) m.Post("/wechatwork/{id}", bindIgnErr(forms.NewWechatWorkHookForm{}), repo.WechatworkHooksEditPost) + m.Post("/packagist/{id}", bindIgnErr(forms.NewPackagistHookForm{}), repo.PackagistHooksEditPost) }, webhooksEnabled) m.Group("/{configType:default-hooks|system-hooks}", func() { @@ -460,6 +463,7 @@ func RegisterRoutes(m *web.Route) { m.Post("/msteams/new", bindIgnErr(forms.NewMSTeamsHookForm{}), repo.MSTeamsHooksNewPost) m.Post("/feishu/new", bindIgnErr(forms.NewFeishuHookForm{}), repo.FeishuHooksNewPost) m.Post("/wechatwork/new", bindIgnErr(forms.NewWechatWorkHookForm{}), repo.WechatworkHooksNewPost) + m.Post("/packagist/new", bindIgnErr(forms.NewPackagistHookForm{}), repo.PackagistHooksNewPost) }) m.Group("/auths", func() { @@ -655,6 +659,7 @@ func RegisterRoutes(m *web.Route) { m.Post("/msteams/new", bindIgnErr(forms.NewMSTeamsHookForm{}), repo.MSTeamsHooksNewPost) m.Post("/feishu/new", bindIgnErr(forms.NewFeishuHookForm{}), repo.FeishuHooksNewPost) m.Post("/wechatwork/new", bindIgnErr(forms.NewWechatWorkHookForm{}), repo.WechatworkHooksNewPost) + m.Post("/packagist/new", bindIgnErr(forms.NewPackagistHookForm{}), repo.PackagistHooksNewPost) m.Group("/{id}", func() { m.Get("", repo.WebHooksEdit) m.Post("/test", repo.TestWebhook) @@ -670,6 +675,7 @@ func RegisterRoutes(m *web.Route) { m.Post("/msteams/{id}", bindIgnErr(forms.NewMSTeamsHookForm{}), repo.MSTeamsHooksEditPost) m.Post("/feishu/{id}", bindIgnErr(forms.NewFeishuHookForm{}), repo.FeishuHooksEditPost) m.Post("/wechatwork/{id}", bindIgnErr(forms.NewWechatWorkHookForm{}), repo.WechatworkHooksEditPost) + m.Post("/packagist/{id}", bindIgnErr(forms.NewPackagistHookForm{}), repo.PackagistHooksEditPost) }, webhooksEnabled) m.Group("/keys", func() { diff --git a/services/forms/repo_form.go b/services/forms/repo_form.go index 19b5a37664f76..e6bd088da414b 100644 --- a/services/forms/repo_form.go +++ b/services/forms/repo_form.go @@ -396,6 +396,20 @@ func (f *NewWechatWorkHookForm) Validate(req *http.Request, errs binding.Errors) return middleware.Validate(errs, ctx.Data, f, ctx.Locale) } +// NewPackagistHookForm form for creating packagist hook +type NewPackagistHookForm struct { + Username string `binding:"Required"` + APIToken string `binding:"Required"` + PackageURL string `binding:"Required;ValidUrl"` + WebhookForm +} + +// Validate validates the fields +func (f *NewPackagistHookForm) Validate(req *http.Request, errs binding.Errors) binding.Errors { + ctx := context.GetContext(req) + return middleware.Validate(errs, ctx.Data, f, ctx.Locale) +} + // .___ // | | ______ ________ __ ____ // | |/ ___// ___/ | \_/ __ \ diff --git a/services/gitdiff/gitdiff.go b/services/gitdiff/gitdiff.go index 25d5e139d9d26..80e706b5ed25d 100644 --- a/services/gitdiff/gitdiff.go +++ b/services/gitdiff/gitdiff.go @@ -15,7 +15,6 @@ import ( "io" "net/url" "os" - "os/exec" "regexp" "sort" "strings" @@ -30,7 +29,6 @@ import ( "code.gitea.io/gitea/modules/highlight" "code.gitea.io/gitea/modules/lfs" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" "github.com/sergi/go-diff/diffmatchpatch" @@ -1322,10 +1320,6 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff return nil, err } - timeout := time.Duration(setting.Git.Timeout.Default) * time.Second - ctx, _, finished := process.GetManager().AddContextTimeout(gitRepo.Ctx, timeout, fmt.Sprintf("GetDiffRange [repo_path: %s]", repoPath)) - defer finished() - argsLength := 6 if len(opts.WhitespaceBehavior) > 0 { argsLength++ @@ -1375,21 +1369,28 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff diffArgs = append(diffArgs, files...) } - cmd := exec.CommandContext(ctx, git.GitExecutable, diffArgs...) - - cmd.Dir = repoPath - cmd.Stderr = os.Stderr + reader, writer := io.Pipe() + defer func() { + _ = reader.Close() + _ = writer.Close() + }() - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("error creating StdoutPipe: %w", err) - } + go func(ctx context.Context, diffArgs []string, repoPath string, writer *io.PipeWriter) { + cmd := git.NewCommandContextNoGlobals(ctx, diffArgs...) + cmd.SetDescription(fmt.Sprintf("GetDiffRange [repo_path: %s]", repoPath)) + if err := cmd.RunWithContext(&git.RunContext{ + Timeout: time.Duration(setting.Git.Timeout.Default) * time.Second, + Dir: repoPath, + Stderr: os.Stderr, + Stdout: writer, + }); err != nil { + log.Error("error during RunWithContext: %w", err) + } - if err = cmd.Start(); err != nil { - return nil, fmt.Errorf("error during Start: %w", err) - } + _ = writer.Close() + }(gitRepo.Ctx, diffArgs, repoPath, writer) - diff, err := ParsePatch(opts.MaxLines, opts.MaxLineCharacters, opts.MaxFiles, stdout, parsePatchSkipToFile) + diff, err := ParsePatch(opts.MaxLines, opts.MaxLineCharacters, opts.MaxFiles, reader, parsePatchSkipToFile) if err != nil { return nil, fmt.Errorf("unable to ParsePatch: %w", err) } @@ -1408,7 +1409,7 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff IndexFile: indexFilename, WorkTree: worktree, } - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(gitRepo.Ctx) if err := checker.Init(ctx); err != nil { log.Error("Unable to open checker for %s. Error: %v", opts.AfterCommitID, err) } else { @@ -1472,10 +1473,6 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff } } - if err = cmd.Wait(); err != nil { - return nil, fmt.Errorf("error during cmd.Wait: %w", err) - } - separator := "..." if opts.DirectComparison { separator = ".." @@ -1485,12 +1482,12 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff if len(opts.BeforeCommitID) == 0 || opts.BeforeCommitID == git.EmptySHA { shortstatArgs = []string{git.EmptyTreeSHA, opts.AfterCommitID} } - diff.NumFiles, diff.TotalAddition, diff.TotalDeletion, err = git.GetDiffShortStat(ctx, repoPath, shortstatArgs...) + diff.NumFiles, diff.TotalAddition, diff.TotalDeletion, err = git.GetDiffShortStat(gitRepo.Ctx, repoPath, shortstatArgs...) if err != nil && strings.Contains(err.Error(), "no merge base") { // git >= 2.28 now returns an error if base and head have become unrelated. // previously it would return the results of git diff --shortstat base head so let's try that... shortstatArgs = []string{opts.BeforeCommitID, opts.AfterCommitID} - diff.NumFiles, diff.TotalAddition, diff.TotalDeletion, err = git.GetDiffShortStat(ctx, repoPath, shortstatArgs...) + diff.NumFiles, diff.TotalAddition, diff.TotalDeletion, err = git.GetDiffShortStat(gitRepo.Ctx, repoPath, shortstatArgs...) } if err != nil { return nil, err diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go index eeb98b5879127..3ca9b50fc6147 100644 --- a/services/mailer/mailer.go +++ b/services/mailer/mailer.go @@ -346,7 +346,7 @@ func NewContext() { Sender = &dummySender{} } - mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) { + mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data { for _, datum := range data { msg := datum.(*Message) gomailMsg := msg.ToMessage() @@ -357,6 +357,7 @@ func NewContext() { log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info) } } + return nil }, &Message{}) go graceful.GetManager().RunWithShutdownFns(mailQueue.Run) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 26432001743f3..6f285ec467c63 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -130,11 +130,12 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } -func queueHandle(data ...queue.Data) { +func queueHandle(data ...queue.Data) []queue.Data { for _, datum := range data { req := datum.(*SyncRequest) doMirrorSync(graceful.GetManager().ShutdownContext(), req) } + return nil } // InitSyncMirrors initializes a go routine to sync the mirrors diff --git a/services/pull/check.go b/services/pull/check.go index 3615c6c6544b0..2203fb87499f1 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -216,12 +216,13 @@ func InitializePullRequests(ctx context.Context) { } // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { id, _ := strconv.ParseInt(datum.(string), 10, 64) testPR(id) } + return nil } func testPR(id int64) { diff --git a/services/pull/check_test.go b/services/pull/check_test.go index f0ec096ea9459..4cdd17cc7b5c2 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -22,11 +22,12 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { idChan := make(chan int64, 10) - q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) { + q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) []queue.Data { for _, datum := range data { id, _ := strconv.ParseInt(datum.(string), 10, 64) idChan <- id } + return nil }, queue.ChannelUniqueQueueConfiguration{ WorkerPoolConfiguration: queue.WorkerPoolConfiguration{ QueueLength: 10, diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go index f982e2ef7b181..ad2141ef3301f 100644 --- a/services/repository/archiver/archiver.go +++ b/services/repository/archiver/archiver.go @@ -246,7 +246,7 @@ var archiverQueue queue.UniqueQueue // Init initlize archive func Init() error { - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { for _, datum := range data { archiveReq, ok := datum.(*ArchiveRequest) if !ok { @@ -258,6 +258,7 @@ func Init() error { log.Error("Archive %v failed: %v", datum, err) } } + return nil } archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) diff --git a/services/repository/push.go b/services/repository/push.go index 518ad041576d2..fafe4736ab4e5 100644 --- a/services/repository/push.go +++ b/services/repository/push.go @@ -33,13 +33,14 @@ import ( var pushQueue queue.Queue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.([]*repo_module.PushUpdateOptions) if err := pushUpdates(opts); err != nil { log.Error("pushUpdate failed: %v", err) } } + return nil } func initPushQueue() error { diff --git a/services/task/task.go b/services/task/task.go index 376fe1dce1020..3f823fc224c8e 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -49,13 +49,14 @@ func Init() error { return nil } -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { task := datum.(*models.Task) if err := Run(task); err != nil { log.Error("Run task failed: %v", err) } } + return nil } // MigrateRepository add migration repository to task diff --git a/services/webhook/packagist.go b/services/webhook/packagist.go new file mode 100644 index 0000000000000..ace93b13ff056 --- /dev/null +++ b/services/webhook/packagist.go @@ -0,0 +1,112 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package webhook + +import ( + "errors" + + webhook_model "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + api "code.gitea.io/gitea/modules/structs" +) + +type ( + // PackagistPayload represents + PackagistPayload struct { + PackagistRepository struct { + URL string `json:"url"` + } `json:"repository"` + } + + // PackagistMeta contains the meta data for the webhook + PackagistMeta struct { + Username string `json:"username"` + APIToken string `json:"api_token"` + PackageURL string `json:"package_url"` + } +) + +// GetPackagistHook returns packagist metadata +func GetPackagistHook(w *webhook_model.Webhook) *PackagistMeta { + s := &PackagistMeta{} + if err := json.Unmarshal([]byte(w.Meta), s); err != nil { + log.Error("webhook.GetPackagistHook(%d): %v", w.ID, err) + } + return s +} + +// JSONPayload Marshals the PackagistPayload to json +func (f *PackagistPayload) JSONPayload() ([]byte, error) { + data, err := json.MarshalIndent(f, "", " ") + if err != nil { + return []byte{}, err + } + return data, nil +} + +var _ PayloadConvertor = &PackagistPayload{} + +// Create implements PayloadConvertor Create method +func (f *PackagistPayload) Create(p *api.CreatePayload) (api.Payloader, error) { + return nil, nil +} + +// Delete implements PayloadConvertor Delete method +func (f *PackagistPayload) Delete(p *api.DeletePayload) (api.Payloader, error) { + return nil, nil +} + +// Fork implements PayloadConvertor Fork method +func (f *PackagistPayload) Fork(p *api.ForkPayload) (api.Payloader, error) { + return nil, nil +} + +// Push implements PayloadConvertor Push method +func (f *PackagistPayload) Push(p *api.PushPayload) (api.Payloader, error) { + return f, nil +} + +// Issue implements PayloadConvertor Issue method +func (f *PackagistPayload) Issue(p *api.IssuePayload) (api.Payloader, error) { + return nil, nil +} + +// IssueComment implements PayloadConvertor IssueComment method +func (f *PackagistPayload) IssueComment(p *api.IssueCommentPayload) (api.Payloader, error) { + return nil, nil +} + +// PullRequest implements PayloadConvertor PullRequest method +func (f *PackagistPayload) PullRequest(p *api.PullRequestPayload) (api.Payloader, error) { + return nil, nil +} + +// Review implements PayloadConvertor Review method +func (f *PackagistPayload) Review(p *api.PullRequestPayload, event webhook_model.HookEventType) (api.Payloader, error) { + return nil, nil +} + +// Repository implements PayloadConvertor Repository method +func (f *PackagistPayload) Repository(p *api.RepositoryPayload) (api.Payloader, error) { + return nil, nil +} + +// Release implements PayloadConvertor Release method +func (f *PackagistPayload) Release(p *api.ReleasePayload) (api.Payloader, error) { + return nil, nil +} + +// GetPackagistPayload converts a packagist webhook into a PackagistPayload +func GetPackagistPayload(p api.Payloader, event webhook_model.HookEventType, meta string) (api.Payloader, error) { + s := new(PackagistPayload) + + packagist := &PackagistMeta{} + if err := json.Unmarshal([]byte(meta), &packagist); err != nil { + return s, errors.New("GetPackagistPayload meta json:" + err.Error()) + } + s.PackagistRepository.URL = packagist.PackageURL + return convertPayloader(s, p, event) +} diff --git a/services/webhook/packagist_test.go b/services/webhook/packagist_test.go new file mode 100644 index 0000000000000..08912924d260a --- /dev/null +++ b/services/webhook/packagist_test.go @@ -0,0 +1,140 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package webhook + +import ( + "testing" + + webhook_model "code.gitea.io/gitea/models/webhook" + api "code.gitea.io/gitea/modules/structs" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPackagistPayload(t *testing.T) { + t.Run("Create", func(t *testing.T) { + p := createTestPayload() + + d := new(PackagistPayload) + pl, err := d.Create(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Delete", func(t *testing.T) { + p := deleteTestPayload() + + d := new(PackagistPayload) + pl, err := d.Delete(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Fork", func(t *testing.T) { + p := forkTestPayload() + + d := new(PackagistPayload) + pl, err := d.Fork(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Push", func(t *testing.T) { + p := pushTestPayload() + + d := new(PackagistPayload) + d.PackagistRepository.URL = "https://packagist.org/api/update-package?username=THEUSERNAME&apiToken=TOPSECRETAPITOKEN" + pl, err := d.Push(p) + require.NoError(t, err) + require.NotNil(t, pl) + require.IsType(t, &PackagistPayload{}, pl) + + assert.Equal(t, "https://packagist.org/api/update-package?username=THEUSERNAME&apiToken=TOPSECRETAPITOKEN", pl.(*PackagistPayload).PackagistRepository.URL) + }) + + t.Run("Issue", func(t *testing.T) { + p := issueTestPayload() + + d := new(PackagistPayload) + p.Action = api.HookIssueOpened + pl, err := d.Issue(p) + require.NoError(t, err) + require.Nil(t, pl) + + p.Action = api.HookIssueClosed + pl, err = d.Issue(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("IssueComment", func(t *testing.T) { + p := issueCommentTestPayload() + + d := new(PackagistPayload) + pl, err := d.IssueComment(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("PullRequest", func(t *testing.T) { + p := pullRequestTestPayload() + + d := new(PackagistPayload) + pl, err := d.PullRequest(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("PullRequestComment", func(t *testing.T) { + p := pullRequestCommentTestPayload() + + d := new(PackagistPayload) + pl, err := d.IssueComment(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Review", func(t *testing.T) { + p := pullRequestTestPayload() + p.Action = api.HookIssueReviewed + + d := new(PackagistPayload) + pl, err := d.Review(p, webhook_model.HookEventPullRequestReviewApproved) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Repository", func(t *testing.T) { + p := repositoryTestPayload() + + d := new(PackagistPayload) + pl, err := d.Repository(p) + require.NoError(t, err) + require.Nil(t, pl) + }) + + t.Run("Release", func(t *testing.T) { + p := pullReleaseTestPayload() + + d := new(PackagistPayload) + pl, err := d.Release(p) + require.NoError(t, err) + require.Nil(t, pl) + }) +} + +func TestPackagistJSONPayload(t *testing.T) { + p := pushTestPayload() + + pl, err := new(PackagistPayload).Push(p) + require.NoError(t, err) + require.NotNil(t, pl) + require.IsType(t, &PackagistPayload{}, pl) + + json, err := pl.JSONPayload() + require.NoError(t, err) + assert.NotEmpty(t, json) +} diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index bb7a9692d11b7..607fac963452f 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -58,6 +58,10 @@ var webhooks = map[webhook_model.HookType]*webhook{ name: webhook_model.WECHATWORK, payloadCreator: GetWechatworkPayload, }, + webhook_model.PACKAGIST: { + name: webhook_model.PACKAGIST, + payloadCreator: GetPackagistPayload, + }, } // RegisterWebhook registers a webhook diff --git a/templates/admin/hook_new.tmpl b/templates/admin/hook_new.tmpl index 2cd3fc826c16f..049e54ef833cf 100644 --- a/templates/admin/hook_new.tmpl +++ b/templates/admin/hook_new.tmpl @@ -34,6 +34,8 @@ {{else if eq .HookType "wechatwork"}} + {{else if eq .HookType "packagist"}} + {{end}} @@ -48,6 +50,7 @@ {{template "repo/settings/webhook/feishu" .}} {{template "repo/settings/webhook/matrix" .}} {{template "repo/settings/webhook/wechatwork" .}} + {{template "repo/settings/webhook/packagist" .}} {{template "repo/settings/webhook/history" .}} diff --git a/templates/admin/queue.tmpl b/templates/admin/queue.tmpl index 3d9cc95592c58..d2d2c83baf487 100644 --- a/templates/admin/queue.tmpl +++ b/templates/admin/queue.tmpl @@ -92,6 +92,35 @@ + {{if .Queue.Pausable}} + {{if .Queue.IsPaused}} +

+ {{.i18n.Tr "admin.monitor.queue.pool.resume.title"}} +

+
+

{{.i18n.Tr "admin.monitor.queue.pool.resume.desc"}}

+
+ {{$.CsrfTokenHtml}} +
+ +
+
+
+ {{else}} +

+ {{.i18n.Tr "admin.monitor.queue.pool.pause.title"}} +

+
+

{{.i18n.Tr "admin.monitor.queue.pool.pause.desc"}}

+
+ {{$.CsrfTokenHtml}} +
+ +
+
+
+ {{end}} + {{end}}

{{.i18n.Tr "admin.monitor.queue.pool.flush.title"}}

diff --git a/templates/org/settings/hook_new.tmpl b/templates/org/settings/hook_new.tmpl index 43351d0cebdba..5e8ebb51e9427 100644 --- a/templates/org/settings/hook_new.tmpl +++ b/templates/org/settings/hook_new.tmpl @@ -29,6 +29,8 @@ {{else if eq .HookType "wechatwork"}} + {{else if eq .HookType "packagist"}} + {{end}} @@ -43,6 +45,7 @@ {{template "repo/settings/webhook/feishu" .}} {{template "repo/settings/webhook/matrix" .}} {{template "repo/settings/webhook/wechatwork" .}} + {{template "repo/settings/webhook/packagist" .}} {{template "repo/settings/webhook/history" .}} diff --git a/templates/repo/settings/webhook/base_list.tmpl b/templates/repo/settings/webhook/base_list.tmpl index e96d086039c84..f16c43bad60e0 100644 --- a/templates/repo/settings/webhook/base_list.tmpl +++ b/templates/repo/settings/webhook/base_list.tmpl @@ -34,6 +34,9 @@ Wechatwork + + Packagist + diff --git a/templates/repo/settings/webhook/new.tmpl b/templates/repo/settings/webhook/new.tmpl index 6df128f40ac86..a438a4c71a3d7 100644 --- a/templates/repo/settings/webhook/new.tmpl +++ b/templates/repo/settings/webhook/new.tmpl @@ -27,6 +27,8 @@ {{else if eq .HookType "wechatwork"}} + {{else if eq .HookType "packagist"}} + {{end}} @@ -41,6 +43,7 @@ {{template "repo/settings/webhook/feishu" .}} {{template "repo/settings/webhook/matrix" .}} {{template "repo/settings/webhook/wechatwork" .}} + {{template "repo/settings/webhook/packagist" .}} {{template "repo/settings/webhook/history" .}} diff --git a/templates/repo/settings/webhook/packagist.tmpl b/templates/repo/settings/webhook/packagist.tmpl new file mode 100644 index 0000000000000..04161dc40f84d --- /dev/null +++ b/templates/repo/settings/webhook/packagist.tmpl @@ -0,0 +1,19 @@ +{{if eq .HookType "packagist"}} +

{{.i18n.Tr "repo.settings.add_packagist_hook_desc" "https://packagist.org" | Str2html}}

+
+ {{.CsrfTokenHtml}} +
+ + +
+
+ + +
+
+ + +
+ {{template "repo/settings/webhook/settings" .}} +
+{{end}} diff --git a/templates/swagger/v1_json.tmpl b/templates/swagger/v1_json.tmpl index bba728363a3e5..768c4c69ee389 100644 --- a/templates/swagger/v1_json.tmpl +++ b/templates/swagger/v1_json.tmpl @@ -13517,7 +13517,8 @@ "slack", "telegram", "feishu", - "wechatwork" + "wechatwork", + "packagist" ], "x-go-name": "Type" }