Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Mirror Queue a queue #17326

Merged
merged 12 commits into from
Oct 17, 2021
4 changes: 2 additions & 2 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,10 @@ PATH =
;; Global limit of repositories per user, applied at creation time. -1 means no limit
;MAX_CREATION_LIMIT = -1
;;
;; Mirror sync queue length, increase if mirror syncing starts hanging
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
;MIRROR_QUEUE_LENGTH = 1000
;;
;; Patch test queue length, increase if pull request patch testing starts hanging
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
;PULL_REQUEST_QUEUE_LENGTH = 1000
;;
;; Preferred Licenses to place at the top of the List
Expand Down
39 changes: 36 additions & 3 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
`-1` means no limit.
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
as large as possible. Use caution when editing this value.
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
testing starts hanging.
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
the top of the list. Name must match file name in options/license or custom/options/license.
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
Expand Down Expand Up @@ -382,6 +382,8 @@ relation to port exhaustion.

## Queue (`queue` and `queue.*`)

Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.)

- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
- `LENGTH`: **20**: Maximal queue size before channel queues block
Expand All @@ -400,6 +402,37 @@ relation to port exhaustion.
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.

Gitea creates the following non-unique queues:

- `code_indexer`
- `issue_indexer`
- `notification-service`
- `task`
- `mail`
- `push_update`

And the following unique queues:

- `repo_stats_update`
- `repo-archive`
- `mirror`
- `pr_patch_checker`

Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration):

- `[queue.issue_indexer]`
- `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set.
- `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set.
- `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set.
- `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set.
- `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set.
- `[queue.mailer]`
- `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is.
- `[queue.pr_patch_checker]`
- `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is.
- `[queue.mirror]`
- `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is.

## Admin (`admin`)

- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
Expand Down Expand Up @@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
command or full path).
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`
zeripath marked this conversation as resolved.
Show resolved Hide resolved

## Cache (`cache`)

Expand Down
29 changes: 22 additions & 7 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)

Expand All @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
table map[Data]bool
table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
Expand All @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)

queue := &ChannelUniqueQueue{
table: map[Data]bool{},
table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
Expand All @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
// No error is possible here because PushFunc ensures that this can be marshalled
bs, _ := json.Marshal(datum)

queue.lock.Lock()
delete(queue.table, datum)
delete(queue.table, string(bs))
queue.lock.Unlock()

handle(datum)
}
}, config.WorkerPoolConfiguration)
Expand All @@ -94,23 +99,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}

bs, err := json.Marshal(data)
if err != nil {
return err
}
q.lock.Lock()
locked := true
defer func() {
if locked {
q.lock.Unlock()
}
}()
if _, ok := q.table[data]; ok {
if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
q.table[data] = true
q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
delete(q.table, data)
delete(q.table, string(bs))
return err
}
}
Expand All @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {

// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
bs, err := json.Marshal(data)
if err != nil {
return false, err
}

Copy link
Contributor

@wxiaoguang wxiaoguang Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can introduce a func generateDataTaskKey(data interface{}) string to generate the keys for data. It's easier for developers to change/improve the key generating method in future.

ps: just a little concern, should we limit the generated key's length? eg: if a key is too long, we can use key[:lengthLimit] + "..." + sha256hex(data) instead.

Copy link
Contributor Author

@zeripath zeripath Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Json marshalability is tightly expected by the queues so it's not really replaceable. Unless you were thinking of something like allowing pushed Data to express some interface(s):

type UniqueKeyed interface {
   UniqueKey() string
}

type Conversion interface {
  ToQueue() ([]byte, error)
  FromQueue([]byte) error
}

type defaultConverter struct {
  Data
}

func (d defaultConverter) ToQueue() ([]byte, error) {
  return json.Marshal(d.Data)
}

func (d defaultConverter) FromQueue(bs []byte) error {
  return json.Unmarshal(bs, d.Data)
}

func AsConversion(data Data) Conversion {
   if conversion, ok := data.(Conversion); ok {
     return conversion
   }
   return defaultConverter{Data: data}
}

// with defaultUniqueKeyed structs and AsUniqueKeyed() similarly.

Then wired in as appropriate throughout the queues.


As this isn't exposed to users I think the size isn't too much of an issue - developers should think carefully about the size of things that they push in to any queue though and we as maintainers should be aware that these things shouldn't be too large.


In some ways as there is no current requirement for this - deciding to do this now we would be choosing an interface and API that would bind our hands but if you really think its needed then the above would be how I would do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean isn't that complex. I was thinking about:

func generateDataTaskKey(data interface{}) (string, err) {
    bs, err := json.Marshal(data)
    if err != nil {
        return err
    }
    return string(bs)
}

...

dataTaskKey, err := generateDataTaskKey(data)
...
delete(queue.table, dataTaskKey))
...
if _, ok := q.table[dataTaskKey]; ok {
...
}
...
q.table[dataTaskKey] = true
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with part a, the common function.
I am unsure about part b, I would rather say no as name clashes can then happen more easily.

Copy link
Contributor Author

@zeripath zeripath Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only unique queue implementation - if we were to change the keying from json marshalling here in the ChannelUniqueQueue we'd have to change it everywhere. This PR is simply a bugfix ensuring that the keying of the ChannelUniqueQueue is the same as the other queues .

If we want to make it possible to change the keying algorithm then we need to do something like above and wire it in correctly otherwise someone will simply change one queue type and think it's fine.

Whilst DRY is a good thing - if you make a helper you're saying this implementation can be changed easily when it absolutely cannot be.

Copy link
Member

@delvh delvh Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still would like this helper function, no matter whether that mechanism is easy to replace or not.
Then I'll approve the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly do not see where this would be helpful.

Please feel free to send me a PR with this helper function because I cannot see where it would be useful.

q.lock.Lock()
defer q.lock.Unlock()
_, has := q.table[data]
_, has := q.table[string(bs)]
return has, nil
}

Expand Down
2 changes: 0 additions & 2 deletions modules/setting/mailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// Mailer represents mail service.
type Mailer struct {
// Mailer
QueueLength int
Name string
From string
FromName string
Expand Down Expand Up @@ -54,7 +53,6 @@ func newMailService() {
}

MailService = &Mailer{
QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
Name: sec.Key("NAME").MustString(AppName),
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),
Expand Down
64 changes: 39 additions & 25 deletions modules/setting/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package setting

import (
"fmt"
"path/filepath"
"strconv"
"time"

"code.gitea.io/gitea/modules/log"
ini "gopkg.in/ini.v1"
)

// QueueSettings represent the settings for a queue from the ini
Expand Down Expand Up @@ -106,11 +107,8 @@ func NewQueueService() {

// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" {
directlySet := toDirectlySetKeysMap(section)
if !directlySet["TYPE"] && defaultType == "" {
switch Indexer.IssueQueueType {
case LevelQueueType:
_, _ = section.NewKey("TYPE", "level")
Expand All @@ -125,37 +123,53 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
_, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
}
if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 {
_, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
}
if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" {
if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
}
if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" {
if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
}

// Handle the old mailer configuration
section = Cfg.Section("queue.mailer")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
}
handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))

// Handle the old test pull requests configuration
// Please note this will be a unique queue
section = Cfg.Section("queue.pr_patch_checker")
sectionMap = map[string]bool{}
handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))

// Handle the old mirror queue configuration
// Please note this will be a unique queue
handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
}

// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
func handleOldLengthConfiguration(queueName string, value int) {
// Don't override with 0
if value <= 0 {
return
}

section := Cfg.Section("queue." + queueName)
directlySet := toDirectlySetKeysMap(section)
if !directlySet["LENGTH"] {
_, _ = section.NewKey("LENGTH", strconv.Itoa(value))
}
}

// toDirectlySetKeysMap returns a bool map of keys directly set by this section
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
// but this section does not.
func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
}
return sectionMap
}
4 changes: 0 additions & 4 deletions modules/setting/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ var (
DefaultPrivate string
DefaultPushCreatePrivate bool
MaxCreationLimit int
MirrorQueueLength int
PullRequestQueueLength int
PreferredLicenses []string
DisableHTTPGit bool
AccessControlAllowOrigin string
Expand Down Expand Up @@ -142,8 +140,6 @@ var (
DefaultPrivate: RepoCreatingLastUserVisibility,
DefaultPushCreatePrivate: true,
MaxCreationLimit: -1,
MirrorQueueLength: 1000,
PullRequestQueueLength: 1000,
PreferredLicenses: []string{"Apache License 2.0", "MIT License"},
DisableHTTPGit: false,
AccessControlAllowOrigin: "",
Expand Down
Loading