Skip to content

Commit 4ce4614

Browse files
committed
Make the Mirror Queue a queue (go-gitea#17326)
Convert the old mirror syncing queue to the more modern queue format. Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead. The documentation is also updated to add information about the deprecated configuration values. Signed-off-by: Andrew Thornton <art27@cantab.net>
1 parent 86ab980 commit 4ce4614

File tree

7 files changed

+164
-76
lines changed

7 files changed

+164
-76
lines changed

Diff for: custom/conf/app.example.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -769,10 +769,10 @@ PATH =
769769
;; Global limit of repositories per user, applied at creation time. -1 means no limit
770770
;MAX_CREATION_LIMIT = -1
771771
;;
772-
;; Mirror sync queue length, increase if mirror syncing starts hanging
772+
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
773773
;MIRROR_QUEUE_LENGTH = 1000
774774
;;
775-
;; Patch test queue length, increase if pull request patch testing starts hanging
775+
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
776776
;PULL_REQUEST_QUEUE_LENGTH = 1000
777777
;;
778778
;; Preferred Licenses to place at the top of the List

Diff for: docs/content/doc/advanced/config-cheat-sheet.en-us.md

+36-3
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
5454
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
5555
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
5656
`-1` means no limit.
57-
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
57+
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
5858
as large as possible. Use caution when editing this value.
5959
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
60-
testing starts hanging.
60+
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
6161
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
6262
the top of the list. Name must match file name in options/license or custom/options/license.
6363
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
@@ -382,6 +382,8 @@ relation to port exhaustion.
382382

383383
## Queue (`queue` and `queue.*`)
384384

385+
Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.)
386+
385387
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
386388
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
387389
- `LENGTH`: **20**: Maximal queue size before channel queues block
@@ -400,6 +402,37 @@ relation to port exhaustion.
400402
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
401403
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.
402404

405+
Gitea creates the following non-unique queues:
406+
407+
- `code_indexer`
408+
- `issue_indexer`
409+
- `notification-service`
410+
- `task`
411+
- `mail`
412+
- `push_update`
413+
414+
And the following unique queues:
415+
416+
- `repo_stats_update`
417+
- `repo-archive`
418+
- `mirror`
419+
- `pr_patch_checker`
420+
421+
Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration):
422+
423+
- `[queue.issue_indexer]`
424+
- `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set.
425+
- `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set.
426+
- `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set.
427+
- `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set.
428+
- `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set.
429+
- `[queue.mailer]`
430+
- `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is.
431+
- `[queue.pr_patch_checker]`
432+
- `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is.
433+
- `[queue.mirror]`
434+
- `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is.
435+
403436
## Admin (`admin`)
404437

405438
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
@@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
588621
command or full path).
589622
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
590623
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
591-
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
624+
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`
592625

593626
## Cache (`cache`)
594627

Diff for: modules/queue/unique_queue_channel.go

+22-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"sync"
1111

12+
"code.gitea.io/gitea/modules/json"
1213
"code.gitea.io/gitea/modules/log"
1314
)
1415

@@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
2930
type ChannelUniqueQueue struct {
3031
*WorkerPool
3132
lock sync.Mutex
32-
table map[Data]bool
33+
table map[string]bool
3334
shutdownCtx context.Context
3435
shutdownCtxCancel context.CancelFunc
3536
terminateCtx context.Context
@@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
5455
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
5556

5657
queue := &ChannelUniqueQueue{
57-
table: map[Data]bool{},
58+
table: map[string]bool{},
5859
shutdownCtx: shutdownCtx,
5960
shutdownCtxCancel: shutdownCtxCancel,
6061
terminateCtx: terminateCtx,
@@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
6566
}
6667
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
6768
for _, datum := range data {
69+
// No error is possible here because PushFunc ensures that this can be marshalled
70+
bs, _ := json.Marshal(datum)
71+
6872
queue.lock.Lock()
69-
delete(queue.table, datum)
73+
delete(queue.table, string(bs))
7074
queue.lock.Unlock()
75+
7176
handle(datum)
7277
}
7378
}, config.WorkerPoolConfiguration)
@@ -94,23 +99,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
9499
if !assignableTo(data, q.exemplar) {
95100
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
96101
}
102+
103+
bs, err := json.Marshal(data)
104+
if err != nil {
105+
return err
106+
}
97107
q.lock.Lock()
98108
locked := true
99109
defer func() {
100110
if locked {
101111
q.lock.Unlock()
102112
}
103113
}()
104-
if _, ok := q.table[data]; ok {
114+
if _, ok := q.table[string(bs)]; ok {
105115
return ErrAlreadyInQueue
106116
}
107117
// FIXME: We probably need to implement some sort of limit here
108118
// If the downstream queue blocks this table will grow without limit
109-
q.table[data] = true
119+
q.table[string(bs)] = true
110120
if fn != nil {
111121
err := fn()
112122
if err != nil {
113-
delete(q.table, data)
123+
delete(q.table, string(bs))
114124
return err
115125
}
116126
}
@@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
122132

123133
// Has checks if the data is in the queue
124134
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
135+
bs, err := json.Marshal(data)
136+
if err != nil {
137+
return false, err
138+
}
139+
125140
q.lock.Lock()
126141
defer q.lock.Unlock()
127-
_, has := q.table[data]
142+
_, has := q.table[string(bs)]
128143
return has, nil
129144
}
130145

Diff for: modules/setting/mailer.go

-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
// Mailer represents mail service.
1717
type Mailer struct {
1818
// Mailer
19-
QueueLength int
2019
Name string
2120
From string
2221
FromName string
@@ -54,7 +53,6 @@ func newMailService() {
5453
}
5554

5655
MailService = &Mailer{
57-
QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
5856
Name: sec.Key("NAME").MustString(AppName),
5957
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
6058
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),

Diff for: modules/setting/queue.go

+39-25
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
package setting
66

77
import (
8-
"fmt"
98
"path/filepath"
9+
"strconv"
1010
"time"
1111

1212
"code.gitea.io/gitea/modules/log"
13+
ini "gopkg.in/ini.v1"
1314
)
1415

1516
// QueueSettings represent the settings for a queue from the ini
@@ -106,11 +107,8 @@ func NewQueueService() {
106107

107108
// Now handle the old issue_indexer configuration
108109
section := Cfg.Section("queue.issue_indexer")
109-
sectionMap := map[string]bool{}
110-
for _, key := range section.Keys() {
111-
sectionMap[key.Name()] = true
112-
}
113-
if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" {
110+
directlySet := toDirectlySetKeysMap(section)
111+
if !directlySet["TYPE"] && defaultType == "" {
114112
switch Indexer.IssueQueueType {
115113
case LevelQueueType:
116114
_, _ = section.NewKey("TYPE", "level")
@@ -125,37 +123,53 @@ func NewQueueService() {
125123
Indexer.IssueQueueType)
126124
}
127125
}
128-
if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 {
129-
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
126+
if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
127+
_, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
130128
}
131-
if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 {
132-
_, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
129+
if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
130+
_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
133131
}
134-
if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" {
132+
if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
135133
_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
136134
}
137-
if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" {
135+
if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
138136
_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
139137
}
140138

141139
// Handle the old mailer configuration
142-
section = Cfg.Section("queue.mailer")
143-
sectionMap = map[string]bool{}
144-
for _, key := range section.Keys() {
145-
sectionMap[key.Name()] = true
146-
}
147-
if _, ok := sectionMap["LENGTH"]; !ok {
148-
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
149-
}
140+
handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))
150141

151142
// Handle the old test pull requests configuration
152143
// Please note this will be a unique queue
153-
section = Cfg.Section("queue.pr_patch_checker")
154-
sectionMap = map[string]bool{}
144+
handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))
145+
146+
// Handle the old mirror queue configuration
147+
// Please note this will be a unique queue
148+
handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
149+
}
150+
151+
// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
152+
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
153+
func handleOldLengthConfiguration(queueName string, value int) {
154+
// Don't override with 0
155+
if value <= 0 {
156+
return
157+
}
158+
159+
section := Cfg.Section("queue." + queueName)
160+
directlySet := toDirectlySetKeysMap(section)
161+
if !directlySet["LENGTH"] {
162+
_, _ = section.NewKey("LENGTH", strconv.Itoa(value))
163+
}
164+
}
165+
166+
// toDirectlySetKeysMap returns a bool map of keys directly set by this section
167+
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
168+
// but this section does not.
169+
func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
170+
sectionMap := map[string]bool{}
155171
for _, key := range section.Keys() {
156172
sectionMap[key.Name()] = true
157173
}
158-
if _, ok := sectionMap["LENGTH"]; !ok {
159-
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
160-
}
174+
return sectionMap
161175
}

Diff for: modules/setting/repository.go

-4
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ var (
2929
DefaultPrivate string
3030
DefaultPushCreatePrivate bool
3131
MaxCreationLimit int
32-
MirrorQueueLength int
33-
PullRequestQueueLength int
3432
PreferredLicenses []string
3533
DisableHTTPGit bool
3634
AccessControlAllowOrigin string
@@ -142,8 +140,6 @@ var (
142140
DefaultPrivate: RepoCreatingLastUserVisibility,
143141
DefaultPushCreatePrivate: true,
144142
MaxCreationLimit: -1,
145-
MirrorQueueLength: 1000,
146-
PullRequestQueueLength: 1000,
147143
PreferredLicenses: []string{"Apache License 2.0", "MIT License"},
148144
DisableHTTPGit: false,
149145
AccessControlAllowOrigin: "",

0 commit comments

Comments
 (0)