Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Mirror Queue a queue #17326

Merged
merged 12 commits into from
Oct 17, 2021
4 changes: 2 additions & 2 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,10 @@ PATH =
;; Global limit of repositories per user, applied at creation time. -1 means no limit
;MAX_CREATION_LIMIT = -1
;;
;; Mirror sync queue length, increase if mirror syncing starts hanging
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
;MIRROR_QUEUE_LENGTH = 1000
;;
;; Patch test queue length, increase if pull request patch testing starts hanging
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
;PULL_REQUEST_QUEUE_LENGTH = 1000
;;
;; Preferred Licenses to place at the top of the List
Expand Down
24 changes: 21 additions & 3 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
`-1` means no limit.
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
as large as possible. Use caution when editing this value.
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
testing starts hanging.
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
the top of the list. Name must match file name in options/license or custom/options/license.
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
Expand Down Expand Up @@ -382,6 +382,8 @@ relation to port exhaustion.

## Queue (`queue` and `queue.*`)

Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`.

- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
- `LENGTH`: **20**: Maximal queue size before channel queues block
Expand All @@ -400,6 +402,22 @@ relation to port exhaustion.
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.

Gitea creates the following non-unique queues:

- `code_indexer`
- `issue_indexer`
- `notification-service`
- `task`
- `mail`
- `push_update`

And the following unique queues:

- `repo_stats_update`
- `repo-archive`
- `mirror`
- `pr_patch_checker`

## Admin (`admin`)

- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
Expand Down Expand Up @@ -588,7 +606,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
command or full path).
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`
zeripath marked this conversation as resolved.
Show resolved Hide resolved

## Cache (`cache`)

Expand Down
34 changes: 25 additions & 9 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)

Expand All @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
table map[Data]bool
table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
Expand All @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)

queue := &ChannelUniqueQueue{
table: map[Data]bool{},
table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
Expand All @@ -65,9 +66,14 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
queue.lock.Lock()
delete(queue.table, datum)
queue.lock.Unlock()
bs, err := json.Marshal(datum)
if err != nil {
log.Error("unable to marshal data: %v", datum)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
} else {
queue.lock.Lock()
delete(queue.table, string(bs))
queue.lock.Unlock()
}
zeripath marked this conversation as resolved.
Show resolved Hide resolved
handle(datum)
}
}, config.WorkerPoolConfiguration)
Expand All @@ -94,23 +100,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}

bs, err := json.Marshal(data)
if err != nil {
return err
}
q.lock.Lock()
locked := true
defer func() {
if locked {
q.lock.Unlock()
}
}()
if _, ok := q.table[data]; ok {
if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
q.table[data] = true
q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
delete(q.table, data)
delete(q.table, string(bs))
return err
}
}
Expand All @@ -122,9 +133,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {

// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
bs, err := json.Marshal(data)
if err != nil {
return false, err
}

Copy link
Contributor

@wxiaoguang wxiaoguang Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can introduce a func generateDataTaskKey(data interface{}) string to generate the keys for data. It's easier for developers to change/improve the key generating method in future.

ps: just a little concern, should we limit the generated key's length? eg: if a key is too long, we can use key[:lengthLimit] + "..." + sha256hex(data) instead.

Copy link
Contributor Author

@zeripath zeripath Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Json marshalability is tightly expected by the queues so it's not really replaceable. Unless you were thinking of something like allowing pushed Data to express some interface(s):

type UniqueKeyed interface {
   UniqueKey() string
}

type Conversion interface {
  ToQueue() ([]byte, error)
  FromQueue([]byte) error
}

type defaultConverter struct {
  Data
}

func (d defaultConverter) ToQueue() ([]byte, error) {
  return json.Marshal(d.Data)
}

func (d defaultConverter) FromQueue(bs []byte) error {
  return json.Unmarshal(bs, d.Data)
}

func AsConversion(data Data) Conversion {
   if conversion, ok := data.(Conversion); ok {
     return conversion
   }
   return defaultConverter{Data: data}
}

// with defaultUniqueKeyed structs and AsUniqueKeyed() similarly.

Then wired in as appropriate throughout the queues.


As this isn't exposed to users I think the size isn't too much of an issue - developers should think carefully about the size of things that they push in to any queue though and we as maintainers should be aware that these things shouldn't be too large.


In some ways as there is no current requirement for this - deciding to do this now we would be choosing an interface and API that would bind our hands but if you really think its needed then the above would be how I would do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean isn't that complex. I was thinking about:

func generateDataTaskKey(data interface{}) (string, err) {
    bs, err := json.Marshal(data)
    if err != nil {
        return err
    }
    return string(bs)
}

...

dataTaskKey, err := generateDataTaskKey(data)
...
delete(queue.table, dataTaskKey))
...
if _, ok := q.table[dataTaskKey]; ok {
...
}
...
q.table[dataTaskKey] = true
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with part a, the common function.
I am unsure about part b, I would rather say no as name clashes can then happen more easily.

Copy link
Contributor Author

@zeripath zeripath Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only unique queue implementation - if we were to change the keying from json marshalling here in the ChannelUniqueQueue we'd have to change it everywhere. This PR is simply a bugfix ensuring that the keying of the ChannelUniqueQueue is the same as the other queues .

If we want to make it possible to change the keying algorithm then we need to do something like above and wire it in correctly otherwise someone will simply change one queue type and think it's fine.

Whilst DRY is a good thing - if you make a helper you're saying this implementation can be changed easily when it absolutely cannot be.

Copy link
Member

@delvh delvh Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still would like this helper function, no matter whether that mechanism is easy to replace or not.
Then I'll approve the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly do not see where this would be helpful.

Please feel free to send me a PR with this helper function because I cannot see where it would be useful.

q.lock.Lock()
defer q.lock.Unlock()
_, has := q.table[data]
_, has := q.table[string(bs)]
return has, nil
}

Expand Down
2 changes: 0 additions & 2 deletions modules/setting/mailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// Mailer represents mail service.
type Mailer struct {
// Mailer
QueueLength int
Name string
From string
FromName string
Expand Down Expand Up @@ -54,7 +53,6 @@ func newMailService() {
}

MailService = &Mailer{
QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
Name: sec.Key("NAME").MustString(AppName),
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),
Expand Down
12 changes: 12 additions & 0 deletions modules/setting/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,16 @@ func NewQueueService() {
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
}

// Handle the old mirror queue configuration
// Please note this will be a unique queue
section = Cfg.Section("queue.mirror")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength))
}

zeripath marked this conversation as resolved.
Show resolved Hide resolved
}
99 changes: 66 additions & 33 deletions services/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,43 @@ package mirror
import (
"context"
"fmt"
"strconv"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
zeripath marked this conversation as resolved.
Show resolved Hide resolved
)

// mirrorQueue holds an UniqueQueue object of the mirror
var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
var mirrorQueue queue.UniqueQueue

// RequestType type of mirror request
type RequestType int

const (
// PullRequestType for pull mirrors
PullRequestType RequestType = iota
zeripath marked this conversation as resolved.
Show resolved Hide resolved
// PushRequestType for push mirrors
PushRequestType
)

// Request for the mirror queue
type Request struct {
Type RequestType
RepoID int64
}

// doMirror causes this request to mirror itself
func doMirror(ctx context.Context, req *Request) {
switch req.Type {
case PushRequestType:
_ = SyncPushMirror(ctx, req.RepoID)
case PullRequestType:
_ = SyncPullMirror(ctx, req.RepoID)
default:
log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
}
}

// Update checks and updates mirror repositories.
func Update(ctx context.Context) error {
Expand All @@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
log.Trace("Doing: Update")

handler := func(idx int, bean interface{}) error {
var item string
var item Request
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
item = fmt.Sprintf("pull %d", m.RepoID)
item = Request{
Type: PullRequestType,
RepoID: m.RepoID,
}
} else if m, ok := bean.(*models.PushMirror); ok {
if m.Repo == nil {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
item = fmt.Sprintf("push %d", m.ID)
item = Request{
Type: PushRequestType,
RepoID: m.RepoID,
}
} else {
log.Error("Unknown bean: %v", bean)
return nil
Expand All @@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Aborted")
default:
mirrorQueue.Add(item)
return nil
return mirrorQueue.Push(&item)
}
}

Expand All @@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
return nil
}

// syncMirrors checks and syncs mirrors.
// FIXME: graceful: this should be a persistable queue
func syncMirrors(ctx context.Context) {
// Start listening on new sync requests.
for {
select {
case <-ctx.Done():
mirrorQueue.Close()
return
case item := <-mirrorQueue.Queue():
id, _ := strconv.ParseInt(item[5:], 10, 64)
if strings.HasPrefix(item, "pull") {
_ = SyncPullMirror(ctx, id)
} else if strings.HasPrefix(item, "push") {
_ = SyncPushMirror(ctx, id)
} else {
log.Error("Unknown item in queue: %v", item)
}
mirrorQueue.Remove(item)
}
func queueHandle(data ...queue.Data) {
for _, datum := range data {
req := datum.(*Request)
doMirror(graceful.GetManager().ShutdownContext(), req)
}
}

Expand All @@ -96,21 +110,40 @@ func InitSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
go graceful.GetManager().RunWithShutdownContext(syncMirrors)
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request))

go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}

// StartToMirror adds repoID to mirror queue
func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
go func() {
err := mirrorQueue.Push(&Request{
Type: PushRequestType,
RepoID: repoID,
})
if err != nil {
log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err)
}
}()
}

// AddPushMirrorToQueue adds the push mirror to the queue
func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
go func() {

err := mirrorQueue.Push(&Request{
Type: PullRequestType,
RepoID: mirrorID,
})
if err != nil {
log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}