Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add doctor commands to interrogate levelDB queues #18732

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b06b088
Ensure that conflict status is reset if there are no conflicts
zeripath Feb 11, 2022
db1931c
Add some more trace and debug logging to process and testpatch
zeripath Feb 11, 2022
bd066ca
Add doctor commands for looking at the level DB
zeripath Feb 11, 2022
2bce719
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Feb 13, 2022
1475c62
As per review
zeripath Feb 13, 2022
93d8275
refix process manager trace ended
zeripath Feb 13, 2022
53232c4
No revive - we want this to stutter
zeripath Feb 13, 2022
1500a0d
Update modules/doctor/queue.go
zeripath Mar 27, 2022
a9ae5c3
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Mar 27, 2022
4e02994
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jun 4, 2022
a639e44
remove trace logging from process manager
zeripath Jun 4, 2022
2821ddd
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jul 12, 2022
bd5ca04
use constants for queues
zeripath Jul 13, 2022
bbe2915
Merge branch 'main' into reset-conflict-status
6543 Jul 13, 2022
bdfa176
Merge branch 'main' into reset-conflict-status
zeripath Jul 16, 2022
4680195
placate linter
zeripath Jul 17, 2022
760c257
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jul 17, 2022
fc7301d
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jan 31, 2023
2e9cdfb
placate the bloody linter
zeripath Jan 31, 2023
ff707de
replacate lint.
zeripath Feb 1, 2023
9e8f61b
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Feb 26, 2023
900bb10
remove mismerged files
zeripath Feb 26, 2023
99c2b4b
add actions job queue name
zeripath Feb 26, 2023
71e16d6
restore licenses deleted by mistake
zeripath Feb 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions modules/doctor/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package doctor

import (
"context"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/nosql"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
zeripath marked this conversation as resolved.
Show resolved Hide resolved

"gitea.com/lunny/levelqueue"
)

var levelqueueTypes = []string{
string(queue.PersistableChannelQueueType),
string(queue.PersistableChannelUniqueQueueType),
string(queue.LevelQueueType),
string(queue.LevelUniqueQueueType),
}

func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) error {
for _, name := range queue.KnownUniqueQueueNames {
q := setting.GetQueueSettings(string(name))
if q.Type == "" {
q.Type = string(queue.PersistableChannelQueueType)
}
found := false
for _, typ := range levelqueueTypes {
if typ == q.Type {
found = true
break
}
}
if !found {
logger.Info("Queue: %s\nType: %s\nNo LevelDB", q.Name, q.Type)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
continue
}

connection := q.ConnectionString
if connection == "" {
connection = q.DataDir
}

db, err := nosql.GetManager().GetLevelDB(connection)
if err != nil {
logger.Error("Queue: %s\nUnable to open DB connection %q: %v", q.Name, connection, err)
return err
}
defer db.Close()

prefix := q.Name

iQueue, err := levelqueue.NewQueue(db, []byte(prefix), false)
if err != nil {
logger.Error("Queue: %s\nUnable to open Queue component: %v", q.Name, err)
return err
}

iSet, err := levelqueue.NewSet(db, []byte(prefix+"-unique"), false)
if err != nil {
logger.Error("Queue: %s\nUnable to open Set component: %v", q.Name, err)
return err
}

qLen := iQueue.Len()
sMembers, err := iSet.Members()
if err != nil {
logger.Error("Queue: %s\nUnable to get members of Set component: %v", q.Name, err)
return err
}
sLen := len(sMembers)

if int(qLen) == sLen {
if qLen == 0 {
logger.Info("Queue: %s\nType: %s\nLevelDB: %s", q.Name, q.Type, "empty")
zeripath marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.Info("Queue: %s\nType: %s\nLevelDB contains: %d entries", q.Name, q.Type, qLen)
}
continue
}
logger.Warn("Queue: %s\nType: %s\nContains different numbers of elements in Queue component %d to Set component %d", q.Name, q.Type, qLen, sLen)
if !autofix {
continue
}

// Empty out the old set members
for _, member := range sMembers {
_, err := iSet.Remove(member)
if err != nil {
logger.Error("Queue: %s\nUnable to remove Set member %s: %v", q.Name, string(member), err)
return err
}
}

// Now iterate across the queue
for i := int64(0); i < qLen; i++ {
// Pop from the left
qData, err := iQueue.LPop()
if err != nil {
logger.Error("Queue: %s\nUnable to LPop out: %v", q.Name, err)
return err
}
// And add to the right
err = iQueue.RPush(qData)
if err != nil {
logger.Error("Queue: %s\nUnable to RPush back: %v", q.Name, err)
return err
}
// And add back to the set
_, err = iSet.Add(qData)
if err != nil {
logger.Error("Queue: %s\nUnable to add back in to Set: %v", q.Name, err)
return err
}
}
}
return nil
}

func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
connections := []string{}
queueNames := make([]string, 0, len(queue.KnownUniqueQueueNames)+len(queue.KnownQueueNames))
for _, name := range queue.KnownUniqueQueueNames {
queueNames = append(queueNames, string(name))
}
for _, name := range queue.KnownQueueNames {
queueNames = append(queueNames, string(name))
}

for _, name := range queueNames {
q := setting.GetQueueSettings(name)
if q.Type == "" {
q.Type = string(queue.PersistableChannelQueueType)
}
found := false
for _, typ := range levelqueueTypes {
if typ == q.Type {
found = true
break
}
}
if !found {
continue
}
if q.ConnectionString != "" {
found := false
for _, connection := range connections {
if connection == q.ConnectionString {
found = true
zeripath marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if !found {
connections = append(connections, q.ConnectionString)
}
continue
}
found = false
for _, connection := range connections {
if connection == q.DataDir {
found = true
zeripath marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if !found {
connections = append(connections, q.DataDir)
}
}

for _, connection := range connections {
logger.Info("LevelDB: %s", connection)
db, err := nosql.GetManager().GetLevelDB(connection)
if err != nil {
logger.Error("Connection: %q Unable to open DB: %v", connection, err)
return err
}
defer db.Close()
iter := db.NewIterator(nil, nil)
for iter.Next() {
logger.Info("%s\n%s", log.NewColoredIDValue(string(iter.Key())), string(iter.Value()))
}
iter.Release()
}
return nil
}

func init() {
Register(&Check{
Title: "Check if there are corrupt level uniquequeues",
Name: "uniquequeues-corrupt",
IsDefault: false,
Run: checkUniqueQueues,
AbortIfFailed: false,
SkipDatabaseInitialization: false,
Priority: 1,
})

Register(&Check{
Title: "List all entries in leveldb",
Name: "queues-listdb",
IsDefault: false,
Run: queueListDB,
AbortIfFailed: false,
SkipDatabaseInitialization: false,
Priority: 1,
})
}
2 changes: 1 addition & 1 deletion modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Init() {
return unhandled
}

indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
indexerQueue = queue.CreateUniqueQueue(queue.CodeIndexerQueueName, handler, &IndexerData{})
if indexerQueue == nil {
log.Fatal("Unable to create codes indexer queue")
}
Expand Down
2 changes: 1 addition & 1 deletion modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func InitIssueIndexer(syncReindex bool) {
return nil
}

issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
issueIndexerQueue = queue.CreateQueue(queue.IssueIndexerQueueName, handler, &IndexerData{})

if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
Expand Down
4 changes: 2 additions & 2 deletions modules/indexer/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func handle(data ...queue.Data) []queue.Data {
}

func initStatsQueue() error {
statsQueue = queue.CreateUniqueQueue("repo_stats_update", handle, int64(0))
statsQueue = queue.CreateUniqueQueue(queue.RepoStatsUpdateQueueName, handle, int64(0))
if statsQueue == nil {
return fmt.Errorf("Unable to create repo_stats_update Queue")
return fmt.Errorf("unable to create repo_stats_update Queue")
}

go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)
Expand Down
2 changes: 1 addition & 1 deletion modules/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func StartSyncMirrors(queueHandle func(data ...queue.Data) []queue.Data) {
if !setting.Mirror.Enabled {
return
}
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
mirrorQueue = queue.CreateUniqueQueue(queue.MirrorQueueName, queueHandle, new(SyncRequest))

go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/notification/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ base.Notifier = &notificationService{}
// NewNotifier create a new notificationService notifier
func NewNotifier() base.Notifier {
ns := &notificationService{}
ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{})
ns.issueQueue = queue.CreateQueue(queue.NotificationQueueName, ns.handle, issueNotificationOpts{})
return ns
}

Expand Down
79 changes: 74 additions & 5 deletions modules/queue/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,52 @@ import (
"code.gitea.io/gitea/modules/setting"
)

// UniqueQueueName represents an expected name for an UniqueQueue
type UniqueQueueName string

// list of all expected UniqueQueues
const (
CodeIndexerQueueName UniqueQueueName = "code_indexer"
RepoStatsUpdateQueueName UniqueQueueName = "repo_stats_update"
MirrorQueueName UniqueQueueName = "mirror"
PRPatchQueueName UniqueQueueName = "pr_patch_checker"
RepoArchiveQueueName UniqueQueueName = "repo-archive"
PRAutoMergeQueueName UniqueQueueName = "pr_auto_merge"
WebhookDeliveryQueueName UniqueQueueName = "webhook_sender"
ActionsJobQueueName UniqueQueueName = "actions_ready_job"
)

// KnownUniqueQueueNames represents the list of expected unique queues
var KnownUniqueQueueNames = []UniqueQueueName{
CodeIndexerQueueName,
RepoStatsUpdateQueueName,
MirrorQueueName,
PRPatchQueueName,
RepoArchiveQueueName,
ActionsJobQueueName,
}

// QueueName represents an expected name for Queue
type QueueName string //nolint // allow this to stutter

// list of all expected Queues
const (
IssueIndexerQueueName QueueName = "issue_indexer"
NotificationQueueName QueueName = "notification-service"
MailerQueueName QueueName = "mail"
PushUpdateQueueName QueueName = "push_update"
TaskQueueName QueueName = "task"
)

// KnownQueueNames represents the list of expected queues
var KnownQueueNames = []QueueName{
IssueIndexerQueueName,
NotificationQueueName,
MailerQueueName,
PushUpdateQueueName,
TaskQueueName,
}

func validType(t string) (Type, error) {
if len(t) == 0 {
return PersistableChannelQueueType, nil
Expand All @@ -36,8 +82,19 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
}
zeripath marked this conversation as resolved.
Show resolved Hide resolved

// CreateQueue for name with provided handler and exemplar
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
q, cfg := getQueueSettings(name)
func CreateQueue(name QueueName, handle HandlerFunc, exemplar interface{}) Queue {
found := false
for _, expected := range KnownQueueNames {
if name == expected {
found = true
break
}
}
if !found {
log.Warn("%s is not an expected name for an Queue", name)
}

q, cfg := getQueueSettings(string(name))
if len(cfg) == 0 {
return nil
}
Expand All @@ -57,7 +114,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
MaxAttempts: q.MaxAttempts,
Config: cfg,
QueueLength: q.QueueLength,
Name: name,
Name: string(name),
}, exemplar)
}
if err != nil {
Expand All @@ -78,8 +135,19 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
}

// CreateUniqueQueue for name with provided handler and exemplar
func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
q, cfg := getQueueSettings(name)
func CreateUniqueQueue(name UniqueQueueName, handle HandlerFunc, exemplar interface{}) UniqueQueue {
found := false
for _, expected := range KnownUniqueQueueNames {
if name == expected {
found = true
break
}
}
if !found {
log.Warn("%s is not an expected name for an UniqueQueue", name)
}

q, cfg := getQueueSettings(string(name))
if len(cfg) == 0 {
return nil
}
Expand All @@ -106,6 +174,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un
MaxAttempts: q.MaxAttempts,
Config: cfg,
QueueLength: q.QueueLength,
Name: string(name),
}, exemplar)
}
if err != nil {
Expand Down
Loading