diff --git a/modules/doctor/queue.go b/modules/doctor/queue.go new file mode 100644 index 0000000000000..251906d395d67 --- /dev/null +++ b/modules/doctor/queue.go @@ -0,0 +1,210 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package doctor + +import ( + "context" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/nosql" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" + + "gitea.com/lunny/levelqueue" +) + +var levelqueueTypes = []string{ + string(queue.PersistableChannelQueueType), + string(queue.PersistableChannelUniqueQueueType), + string(queue.LevelQueueType), + string(queue.LevelUniqueQueueType), +} + +func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) error { + for _, name := range queue.KnownUniqueQueueNames { + q := setting.GetQueueSettings(string(name)) + if q.Type == "" { + q.Type = string(queue.PersistableChannelQueueType) + } + found := false + for _, typ := range levelqueueTypes { + if typ == q.Type { + found = true + break + } + } + if !found { + logger.Info("Queue: %s\nType: %s\nNo LevelDB", q.Name, q.Type) + continue + } + + connection := q.ConnectionString + if connection == "" { + connection = q.DataDir + } + + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + logger.Error("Queue: %s\nUnable to open DB connection %q: %v", q.Name, connection, err) + return err + } + defer db.Close() + + prefix := q.Name + + iQueue, err := levelqueue.NewQueue(db, []byte(prefix), false) + if err != nil { + logger.Error("Queue: %s\nUnable to open Queue component: %v", q.Name, err) + return err + } + + iSet, err := levelqueue.NewSet(db, []byte(prefix+"-unique"), false) + if err != nil { + logger.Error("Queue: %s\nUnable to open Set component: %v", q.Name, err) + return err + } + + qLen := iQueue.Len() + sMembers, err := iSet.Members() + if err != nil { + logger.Error("Queue: %s\nUnable to get members of Set component: %v", q.Name, err) + return err + } + sLen := len(sMembers) + + if int(qLen) == sLen { + if qLen == 0 { + logger.Info("Queue: %s\nType: %s\nLevelDB: %s", q.Name, q.Type, "empty") + } else { + logger.Info("Queue: %s\nType: %s\nLevelDB contains: %d entries", q.Name, q.Type, qLen) + } + continue + } + logger.Warn("Queue: %s\nType: %s\nContains different numbers of elements in Queue component %d to Set component %d", q.Name, q.Type, qLen, sLen) + if !autofix { + continue + } + + // Empty out the old set members + for _, member := range sMembers { + _, err := iSet.Remove(member) + if err != nil { + logger.Error("Queue: %s\nUnable to remove Set member %s: %v", q.Name, string(member), err) + return err + } + } + + // Now iterate across the queue + for i := int64(0); i < qLen; i++ { + // Pop from the left + qData, err := iQueue.LPop() + if err != nil { + logger.Error("Queue: %s\nUnable to LPop out: %v", q.Name, err) + return err + } + // And add to the right + err = iQueue.RPush(qData) + if err != nil { + logger.Error("Queue: %s\nUnable to RPush back: %v", q.Name, err) + return err + } + // And add back to the set + _, err = iSet.Add(qData) + if err != nil { + logger.Error("Queue: %s\nUnable to add back in to Set: %v", q.Name, err) + return err + } + } + } + return nil +} + +func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error { + connections := []string{} + queueNames := make([]string, 0, len(queue.KnownUniqueQueueNames)+len(queue.KnownQueueNames)) + for _, name := range queue.KnownUniqueQueueNames { + queueNames = append(queueNames, string(name)) + } + for _, name := range queue.KnownQueueNames { + queueNames = append(queueNames, string(name)) + } + + for _, name := range queueNames { + q := setting.GetQueueSettings(name) + if q.Type == "" { + q.Type = string(queue.PersistableChannelQueueType) + } + found := false + for _, typ := range levelqueueTypes { + if typ == q.Type { + found = true + break + } + } + if !found { + continue + } + if q.ConnectionString != "" { + found := false + for _, connection := range connections { + if connection == q.ConnectionString { + found = true + break + } + } + if !found { + connections = append(connections, q.ConnectionString) + } + continue + } + found = false + for _, connection := range connections { + if connection == q.DataDir { + found = true + break + } + } + if !found { + connections = append(connections, q.DataDir) + } + } + + for _, connection := range connections { + logger.Info("LevelDB: %s", connection) + db, err := nosql.GetManager().GetLevelDB(connection) + if err != nil { + logger.Error("Connection: %q Unable to open DB: %v", connection, err) + return err + } + defer db.Close() + iter := db.NewIterator(nil, nil) + for iter.Next() { + logger.Info("%s\n%s", log.NewColoredIDValue(string(iter.Key())), string(iter.Value())) + } + iter.Release() + } + return nil +} + +func init() { + Register(&Check{ + Title: "Check if there are corrupt level uniquequeues", + Name: "uniquequeues-corrupt", + IsDefault: false, + Run: checkUniqueQueues, + AbortIfFailed: false, + SkipDatabaseInitialization: false, + Priority: 1, + }) + + Register(&Check{ + Title: "List all entries in leveldb", + Name: "queues-listdb", + IsDefault: false, + Run: queueListDB, + AbortIfFailed: false, + SkipDatabaseInitialization: false, + Priority: 1, + }) +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 027d13555c054..7fe3e6bdfd31a 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -165,7 +165,7 @@ func Init() { return unhandled } - indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) + indexerQueue = queue.CreateUniqueQueue(queue.CodeIndexerQueueName, handler, &IndexerData{}) if indexerQueue == nil { log.Fatal("Unable to create codes indexer queue") } diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 55d3c7bc0914a..633f56ade2d97 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -157,7 +157,7 @@ func InitIssueIndexer(syncReindex bool) { return nil } - issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) + issueIndexerQueue = queue.CreateQueue(queue.IssueIndexerQueueName, handler, &IndexerData{}) if issueIndexerQueue == nil { log.Fatal("Unable to create issue indexer queue") diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index 32379f2859d86..5d0e8d9377a2e 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -27,9 +27,9 @@ func handle(data ...queue.Data) []queue.Data { } func initStatsQueue() error { - statsQueue = queue.CreateUniqueQueue("repo_stats_update", handle, int64(0)) + statsQueue = queue.CreateUniqueQueue(queue.RepoStatsUpdateQueueName, handle, int64(0)) if statsQueue == nil { - return fmt.Errorf("Unable to create repo_stats_update Queue") + return fmt.Errorf("unable to create repo_stats_update Queue") } go graceful.GetManager().RunWithShutdownFns(statsQueue.Run) diff --git a/modules/mirror/mirror.go b/modules/mirror/mirror.go index 37b4c2ac95788..da96524a9dd7c 100644 --- a/modules/mirror/mirror.go +++ b/modules/mirror/mirror.go @@ -33,7 +33,7 @@ func StartSyncMirrors(queueHandle func(data ...queue.Data) []queue.Data) { if !setting.Mirror.Enabled { return } - mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest)) + mirrorQueue = queue.CreateUniqueQueue(queue.MirrorQueueName, queueHandle, new(SyncRequest)) go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index 73ea9227482ff..5812092d157ee 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -37,7 +37,7 @@ var _ base.Notifier = ¬ificationService{} // NewNotifier create a new notificationService notifier func NewNotifier() base.Notifier { ns := ¬ificationService{} - ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{}) + ns.issueQueue = queue.CreateQueue(queue.NotificationQueueName, ns.handle, issueNotificationOpts{}) return ns } diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 1e5259fcfba62..fc173fccef58f 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -12,6 +12,52 @@ import ( "code.gitea.io/gitea/modules/setting" ) +// UniqueQueueName represents an expected name for an UniqueQueue +type UniqueQueueName string + +// list of all expected UniqueQueues +const ( + CodeIndexerQueueName UniqueQueueName = "code_indexer" + RepoStatsUpdateQueueName UniqueQueueName = "repo_stats_update" + MirrorQueueName UniqueQueueName = "mirror" + PRPatchQueueName UniqueQueueName = "pr_patch_checker" + RepoArchiveQueueName UniqueQueueName = "repo-archive" + PRAutoMergeQueueName UniqueQueueName = "pr_auto_merge" + WebhookDeliveryQueueName UniqueQueueName = "webhook_sender" + ActionsJobQueueName UniqueQueueName = "actions_ready_job" +) + +// KnownUniqueQueueNames represents the list of expected unique queues +var KnownUniqueQueueNames = []UniqueQueueName{ + CodeIndexerQueueName, + RepoStatsUpdateQueueName, + MirrorQueueName, + PRPatchQueueName, + RepoArchiveQueueName, + ActionsJobQueueName, +} + +// QueueName represents an expected name for Queue +type QueueName string //nolint // allow this to stutter + +// list of all expected Queues +const ( + IssueIndexerQueueName QueueName = "issue_indexer" + NotificationQueueName QueueName = "notification-service" + MailerQueueName QueueName = "mail" + PushUpdateQueueName QueueName = "push_update" + TaskQueueName QueueName = "task" +) + +// KnownQueueNames represents the list of expected queues +var KnownQueueNames = []QueueName{ + IssueIndexerQueueName, + NotificationQueueName, + MailerQueueName, + PushUpdateQueueName, + TaskQueueName, +} + func validType(t string) (Type, error) { if len(t) == 0 { return PersistableChannelQueueType, nil @@ -36,8 +82,19 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) { } // CreateQueue for name with provided handler and exemplar -func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { - q, cfg := getQueueSettings(name) +func CreateQueue(name QueueName, handle HandlerFunc, exemplar interface{}) Queue { + found := false + for _, expected := range KnownQueueNames { + if name == expected { + found = true + break + } + } + if !found { + log.Warn("%s is not an expected name for an Queue", name) + } + + q, cfg := getQueueSettings(string(name)) if len(cfg) == 0 { return nil } @@ -57,7 +114,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { MaxAttempts: q.MaxAttempts, Config: cfg, QueueLength: q.QueueLength, - Name: name, + Name: string(name), }, exemplar) } if err != nil { @@ -78,8 +135,19 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { } // CreateUniqueQueue for name with provided handler and exemplar -func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue { - q, cfg := getQueueSettings(name) +func CreateUniqueQueue(name UniqueQueueName, handle HandlerFunc, exemplar interface{}) UniqueQueue { + found := false + for _, expected := range KnownUniqueQueueNames { + if name == expected { + found = true + break + } + } + if !found { + log.Warn("%s is not an expected name for an UniqueQueue", name) + } + + q, cfg := getQueueSettings(string(name)) if len(cfg) == 0 { return nil } @@ -106,6 +174,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un MaxAttempts: q.MaxAttempts, Config: cfg, QueueLength: q.QueueLength, + Name: string(name), }, exemplar) } if err != nil { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index b32128cb82148..2c8e380cd68ef 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -530,7 +530,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { case <-paused: log.Trace("Worker for Queue %d Pausing", p.qid) if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) + log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data) if unhandled := p.handle(data...); unhandled != nil { log.Error("Unhandled Data in queue %d", p.qid) } @@ -564,7 +564,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { // go back around case <-ctx.Done(): if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) + log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data) if unhandled := p.handle(data...); unhandled != nil { log.Error("Unhandled Data in queue %d", p.qid) } @@ -576,7 +576,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if !ok { // the dataChan has been closed - we should finish up: if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) + log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data) if unhandled := p.handle(data...); unhandled != nil { log.Error("Unhandled Data in queue %d", p.qid) } @@ -589,7 +589,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { util.StopTimer(timer) if len(data) >= p.batchLength { - log.Trace("Handling: %d data, %v", len(data), data) + log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data) if unhandled := p.handle(data...); unhandled != nil { log.Error("Unhandled Data in queue %d", p.qid) } @@ -601,7 +601,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { case <-timer.C: delay = time.Millisecond * 100 if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) + log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data) if unhandled := p.handle(data...); unhandled != nil { log.Error("Unhandled Data in queue %d", p.qid) } diff --git a/services/actions/init.go b/services/actions/init.go index 3fd03eeb6f26c..5e762910e8c3c 100644 --- a/services/actions/init.go +++ b/services/actions/init.go @@ -15,7 +15,7 @@ func Init() { return } - jobEmitterQueue = queue.CreateUniqueQueue("actions_ready_job", jobEmitterQueueHandle, new(jobUpdate)) + jobEmitterQueue = queue.CreateUniqueQueue(queue.ActionsJobQueueName, jobEmitterQueueHandle, new(jobUpdate)) go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run) notification.RegisterNotifier(NewNotifier()) diff --git a/services/automerge/automerge.go b/services/automerge/automerge.go index 99460476403a9..5e3adbf863aa1 100644 --- a/services/automerge/automerge.go +++ b/services/automerge/automerge.go @@ -29,7 +29,7 @@ var prAutoMergeQueue queue.UniqueQueue // Init runs the task queue to that handles auto merges func Init() error { - prAutoMergeQueue = queue.CreateUniqueQueue("pr_auto_merge", handle, "") + prAutoMergeQueue = queue.CreateUniqueQueue(queue.PRAutoMergeQueueName, handle, "") if prAutoMergeQueue == nil { return fmt.Errorf("Unable to create pr_auto_merge Queue") } diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go index 91cc8cb405e0b..a3e77dd8e5b1f 100644 --- a/services/mailer/mailer.go +++ b/services/mailer/mailer.go @@ -369,7 +369,7 @@ func NewContext(ctx context.Context) { Sender = &smtpSender{} } - mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data { + mailQueue = queue.CreateQueue(queue.MailerQueueName, func(data ...queue.Data) []queue.Data { for _, datum := range data { msg := datum.(*Message) gomailMsg := msg.ToMessage() diff --git a/services/pull/check.go b/services/pull/check.go index 02d901541461f..e460b7622b541 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -183,6 +183,7 @@ func checkAndUpdateStatus(ctx context.Context, pr *issues_model.PullRequest) { return } + log.Trace("Updating %-v: Status:%d Conflicts:%s Protected:%s", pr, pr.Status, pr.ConflictedFiles, pr.ChangedProtectedFiles) if err := pr.UpdateColsIfNotMerged(ctx, "merge_base", "status", "conflicted_files", "changed_protected_files"); err != nil { log.Error("Update[%-v]: %v", pr, err) } @@ -370,6 +371,8 @@ func testPR(id int64) { } return } + log.Trace("%-v: patch tested new Status:%d ConflictedFiles:%s ChangedProtectedFiles:%s", pr, pr.Status, pr.ConflictedFiles, pr.ChangedProtectedFiles) + checkAndUpdateStatus(ctx, pr) } @@ -389,7 +392,7 @@ func CheckPRsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin // Init runs the task queue to test all the checking status pull requests func Init() error { - prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "") + prPatchCheckerQueue = queue.CreateUniqueQueue(queue.PRPatchQueueName, handle, "") if prPatchCheckerQueue == nil { return fmt.Errorf("Unable to create pr_patch_checker Queue") diff --git a/services/pull/patch.go b/services/pull/patch.go index c2ccc75bdccdd..31a601e870471 100644 --- a/services/pull/patch.go +++ b/services/pull/patch.go @@ -303,11 +303,13 @@ func checkConflicts(ctx context.Context, pr *issues_model.PullRequest, gitRepo * treeHash, _, err = git.NewCommand(ctx, "write-tree").RunStdString(&git.RunOpts{Dir: tmpBasePath}) if err != nil { lsfiles, _, _ := git.NewCommand(ctx, "ls-files", "-u").RunStdString(&git.RunOpts{Dir: tmpBasePath}) + log.Debug("Unable to write unconflicted tree for PR[%d] %s/%s#%d. Error: %v", pr.ID, pr.BaseRepo.OwnerName, pr.BaseRepo.Name, pr.Index, err) return false, fmt.Errorf("unable to write unconflicted tree: %w\n`git ls-files -u`:\n%s", err, lsfiles) } treeHash = strings.TrimSpace(treeHash) baseTree, err := gitRepo.GetTree("base") if err != nil { + log.Debug("Unable to get base tree for PR[%d] %s/%s#%d. Error: %v", pr.ID, pr.BaseRepo.OwnerName, pr.BaseRepo.Name, pr.Index, err) return false, err } diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go index 1da4425cfc6e9..e98fa3b5b5476 100644 --- a/services/repository/archiver/archiver.go +++ b/services/repository/archiver/archiver.go @@ -314,9 +314,9 @@ func Init() error { return nil } - archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) + archiverQueue = queue.CreateUniqueQueue(queue.RepoArchiveQueueName, handler, new(ArchiveRequest)) if archiverQueue == nil { - return errors.New("unable to create codes indexer queue") + return errors.New("unable to create repo archiver queue") } go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run) diff --git a/services/repository/push.go b/services/repository/push.go index 8aa8be6aa2fd6..52c866b4b0a02 100644 --- a/services/repository/push.go +++ b/services/repository/push.go @@ -43,7 +43,7 @@ func handle(data ...queue.Data) []queue.Data { } func initPushQueue() error { - pushQueue = queue.CreateQueue("push_update", handle, []*repo_module.PushUpdateOptions{}) + pushQueue = queue.CreateQueue(queue.PushUpdateQueueName, handle, []*repo_module.PushUpdateOptions{}) if pushQueue == nil { return errors.New("unable to create push_update Queue") } diff --git a/services/task/task.go b/services/task/task.go index 41bc07f2f6eb7..41841bc33d41f 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -31,16 +31,16 @@ func Run(t *admin_model.Task) error { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: - return fmt.Errorf("Unknown task type: %d", t.Type) + return fmt.Errorf("unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { - taskQueue = queue.CreateQueue("task", handle, &admin_model.Task{}) + taskQueue = queue.CreateQueue(queue.TaskQueueName, handle, &admin_model.Task{}) if taskQueue == nil { - return fmt.Errorf("Unable to create Task Queue") + return fmt.Errorf("unable to create Task Queue") } go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index e389b1f9fe670..32d7ac5fa52f1 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -282,7 +282,7 @@ func Init() error { }, } - hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, int64(0)) + hookQueue = queue.CreateUniqueQueue(queue.WebhookDeliveryQueueName, handle, int64(0)) if hookQueue == nil { return fmt.Errorf("Unable to create webhook_sender Queue") }