Skip to content

Commit 58dfaf3

Browse files
wxiaoguangwolfogreGiteaBot
authored
Improve queue & process & stacktrace (#24636)
Although some features are mixed together in this PR, this PR is not that large, and these features are all related. Actually there are more than 70 lines are for a toy "test queue", so this PR is quite simple. Major features: 1. Allow site admin to clear a queue (remove all items in a queue) * Because there is no transaction, the "unique queue" could be corrupted in rare cases, that's unfixable. * eg: the item is in the "set" but not in the "list", so the item would never be able to be pushed into the queue. * Now site admin could simply clear the queue, then everything becomes correct, the lost items could be re-pushed into queue by future operations. 3. Split the "admin/monitor" to separate pages 4. Allow to download diagnosis report * In history, there were many users reporting that Gitea queue gets stuck, or Gitea's CPU is 100% * With diagnosis report, maintainers could know what happens clearly The diagnosis report sample: [gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip) , use "go tool pprof profile.dat" to view the report. Screenshots: ![image](https://github.com/go-gitea/gitea/assets/2114189/320659b4-2eda-4def-8dc0-5ea08d578063) ![image](https://github.com/go-gitea/gitea/assets/2114189/c5c46fae-9dc0-44ca-8cd3-57beedc5035e) ![image](https://github.com/go-gitea/gitea/assets/2114189/6168a811-42a1-4e64-a263-0617a6c8c4fe) --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: Giteabot <teabot@gitea.io>
1 parent b3af748 commit 58dfaf3

20 files changed

+387
-228
lines changed

modules/queue/base_channel.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
8787
func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
8888
q.mu.Lock()
8989
defer q.mu.Unlock()
90-
90+
if !q.isUnique {
91+
return false, nil
92+
}
9193
return q.set.Contains(string(data)), nil
9294
}
9395

@@ -107,7 +109,9 @@ func (q *baseChannel) Close() error {
107109
defer q.mu.Unlock()
108110

109111
close(q.c)
110-
q.set = container.Set[string]{}
112+
if q.isUnique {
113+
q.set = container.Set[string]{}
114+
}
111115

112116
return nil
113117
}
@@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error {
119123
for q.c != nil && len(q.c) > 0 {
120124
<-q.c
121125
}
126+
127+
if q.isUnique {
128+
q.set = container.Set[string]{}
129+
}
122130
return nil
123131
}

modules/queue/base_levelqueue_unique.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,20 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
7777
}
7878
lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))
7979

80+
for lq.q.Len() > 0 {
81+
if _, err := lq.q.LPop(); err != nil {
82+
return err
83+
}
84+
}
85+
86+
// the "set" must be cleared after the "list" because there is no transaction.
87+
// it's better to have duplicate items than losing items.
8088
members, err := lq.set.Members()
8189
if err != nil {
8290
return err // seriously corrupted
8391
}
8492
for _, v := range members {
8593
_, _ = lq.set.Remove(v)
8694
}
87-
for lq.q.Len() > 0 {
88-
if _, err = lq.q.LPop(); err != nil {
89-
return err
90-
}
91-
}
9295
return nil
9396
}

modules/queue/base_redis.go

+3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ func (q *baseRedis) Close() error {
123123
func (q *baseRedis) RemoveAll(ctx context.Context) error {
124124
q.mu.Lock()
125125
defer q.mu.Unlock()
126+
126127
c1 := q.client.Del(ctx, q.cfg.QueueFullName)
128+
// the "set" must be cleared after the "list" because there is no transaction.
129+
// it's better to have duplicate items than losing items.
127130
c2 := q.client.Del(ctx, q.cfg.SetFullName)
128131
if c1.Err() != nil {
129132
return c1.Err()

modules/queue/manager.go

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface {
3333
// FlushWithContext tries to make the handler process all items in the queue synchronously.
3434
// It is for testing purpose only. It's not designed to be used in a cluster.
3535
FlushWithContext(ctx context.Context, timeout time.Duration) error
36+
37+
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
38+
RemoveAllItems(ctx context.Context) error
3639
}
3740

3841
var manager *Manager

modules/queue/workerqueue.go

+5
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
130130
}
131131
}
132132

133+
// RemoveAllItems removes all items in the baes queue
134+
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
135+
return q.baseQueue.RemoveAll(ctx)
136+
}
137+
133138
func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
134139
bs, err := json.Marshal(data)
135140
if err != nil {

options/locale/locale_en-US.ini

+7-46
Original file line numberDiff line numberDiff line change
@@ -3040,8 +3040,9 @@ monitor.next = Next Time
30403040
monitor.previous = Previous Time
30413041
monitor.execute_times = Executions
30423042
monitor.process = Running Processes
3043-
monitor.stacktrace = Stacktraces
3044-
monitor.goroutines = %d Goroutines
3043+
monitor.stacktrace = Stacktrace
3044+
monitor.processes_count = %d Processes
3045+
monitor.download_diagnosis_report = Download diagnosis report
30453046
monitor.desc = Description
30463047
monitor.start = Start Time
30473048
monitor.execute_time = Execution Time
@@ -3050,6 +3051,7 @@ monitor.process.cancel = Cancel process
30503051
monitor.process.cancel_desc = Cancelling a process may cause data loss
30513052
monitor.process.cancel_notices = Cancel: <strong>%s</strong>?
30523053
monitor.process.children = Children
3054+
30533055
monitor.queues = Queues
30543056
monitor.queue = Queue: %s
30553057
monitor.queue.name = Name
@@ -3060,56 +3062,15 @@ monitor.queue.maxnumberworkers = Max Number of Workers
30603062
monitor.queue.numberinqueue = Number in Queue
30613063
monitor.queue.review = Review Config
30623064
monitor.queue.review_add = Review/Add Workers
3063-
monitor.queue.configuration = Initial Configuration
3064-
monitor.queue.nopool.title = No Worker Pool
3065-
monitor.queue.nopool.desc = This queue wraps other queues and does not itself have a worker pool.
3066-
monitor.queue.wrapped.desc = A wrapped queue wraps a slow starting queue, buffering queued requests in a channel. It does not have a worker pool itself.
3067-
monitor.queue.persistable-channel.desc = A persistable-channel wraps two queues, a channel queue that has its own worker pool and a level queue for persisted requests from previous shutdowns. It does not have a worker pool itself.
3068-
monitor.queue.flush = Flush worker
3069-
monitor.queue.pool.timeout = Timeout
3070-
monitor.queue.pool.addworkers.title = Add Workers
3071-
monitor.queue.pool.addworkers.submit = Add Workers
3072-
monitor.queue.pool.addworkers.desc = Add Workers to this pool with or without a timeout. If you set a timeout these workers will be removed from the pool after the timeout has lapsed.
3073-
monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers
3074-
monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout
3075-
monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero
3076-
monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0
3077-
monitor.queue.pool.flush.title = Flush Queue
3078-
monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out.
3079-
monitor.queue.pool.flush.submit = Add Flush Worker
3080-
monitor.queue.pool.flush.added = Flush Worker added for %[1]s
3081-
monitor.queue.pool.pause.title = Pause Queue
3082-
monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data
3083-
monitor.queue.pool.pause.submit = Pause Queue
3084-
monitor.queue.pool.resume.title = Resume Queue
3085-
monitor.queue.pool.resume.desc = Set this queue to resume work
3086-
monitor.queue.pool.resume.submit = Resume Queue
3087-
30883065
monitor.queue.settings.title = Pool Settings
3089-
monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups.
3090-
monitor.queue.settings.timeout = Boost Timeout
3091-
monitor.queue.settings.timeout.placeholder = Currently %[1]v
3092-
monitor.queue.settings.timeout.error = Timeout must be a golang duration eg. 5m or be 0
3093-
monitor.queue.settings.numberworkers = Boost Number of Workers
3094-
monitor.queue.settings.numberworkers.placeholder = Currently %[1]d
3095-
monitor.queue.settings.numberworkers.error = Number of Workers to add must be greater than or equal to zero
3066+
monitor.queue.settings.desc = Pools dynamically grow in response to their worker queue blocking.
30963067
monitor.queue.settings.maxnumberworkers = Max Number of workers
30973068
monitor.queue.settings.maxnumberworkers.placeholder = Currently %[1]d
30983069
monitor.queue.settings.maxnumberworkers.error = Max number of workers must be a number
30993070
monitor.queue.settings.submit = Update Settings
31003071
monitor.queue.settings.changed = Settings Updated
3101-
monitor.queue.settings.blocktimeout = Current Block Timeout
3102-
monitor.queue.settings.blocktimeout.value = %[1]v
3103-
3104-
monitor.queue.pool.none = This queue does not have a Pool
3105-
monitor.queue.pool.added = Worker Group Added
3106-
monitor.queue.pool.max_changed = Maximum number of workers changed
3107-
monitor.queue.pool.workers.title = Active Worker Groups
3108-
monitor.queue.pool.workers.none = No worker groups.
3109-
monitor.queue.pool.cancel = Shutdown Worker Group
3110-
monitor.queue.pool.cancelling = Worker Group shutting down
3111-
monitor.queue.pool.cancel_notices = Shutdown this group of %s workers?
3112-
monitor.queue.pool.cancel_desc = Leaving a queue without any worker groups may cause requests to block indefinitely.
3072+
monitor.queue.settings.remove_all_items = Remove all
3073+
monitor.queue.settings.remove_all_items_done = All items in the queue have been removed.
31133074
31143075
notices.system_notice_list = System Notices
31153076
notices.view_detail_header = View Notice Details

routers/web/admin/admin.go

+7-40
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
activities_model "code.gitea.io/gitea/models/activities"
1414
"code.gitea.io/gitea/modules/base"
1515
"code.gitea.io/gitea/modules/context"
16-
"code.gitea.io/gitea/modules/process"
17-
"code.gitea.io/gitea/modules/queue"
1816
"code.gitea.io/gitea/modules/setting"
1917
"code.gitea.io/gitea/modules/updatechecker"
2018
"code.gitea.io/gitea/modules/web"
@@ -24,7 +22,8 @@ import (
2422

2523
const (
2624
tplDashboard base.TplName = "admin/dashboard"
27-
tplMonitor base.TplName = "admin/monitor"
25+
tplCron base.TplName = "admin/cron"
26+
tplQueue base.TplName = "admin/queue"
2827
tplStacktrace base.TplName = "admin/stacktrace"
2928
tplQueueManage base.TplName = "admin/queue_manage"
3029
)
@@ -142,47 +141,15 @@ func DashboardPost(ctx *context.Context) {
142141
}
143142
}
144143
if form.From == "monitor" {
145-
ctx.Redirect(setting.AppSubURL + "/admin/monitor")
144+
ctx.Redirect(setting.AppSubURL + "/admin/monitor/cron")
146145
} else {
147146
ctx.Redirect(setting.AppSubURL + "/admin")
148147
}
149148
}
150149

151-
// Monitor show admin monitor page
152-
func Monitor(ctx *context.Context) {
153-
ctx.Data["Title"] = ctx.Tr("admin.monitor")
154-
ctx.Data["PageIsAdminMonitor"] = true
155-
ctx.Data["Processes"], ctx.Data["ProcessCount"] = process.GetManager().Processes(false, true)
150+
func CronTasks(ctx *context.Context) {
151+
ctx.Data["Title"] = ctx.Tr("admin.monitor.cron")
152+
ctx.Data["PageIsAdminMonitorCron"] = true
156153
ctx.Data["Entries"] = cron.ListTasks()
157-
ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
158-
159-
ctx.HTML(http.StatusOK, tplMonitor)
160-
}
161-
162-
// GoroutineStacktrace show admin monitor goroutines page
163-
func GoroutineStacktrace(ctx *context.Context) {
164-
ctx.Data["Title"] = ctx.Tr("admin.monitor")
165-
ctx.Data["PageIsAdminMonitor"] = true
166-
167-
processStacks, processCount, goroutineCount, err := process.GetManager().ProcessStacktraces(false, false)
168-
if err != nil {
169-
ctx.ServerError("GoroutineStacktrace", err)
170-
return
171-
}
172-
173-
ctx.Data["ProcessStacks"] = processStacks
174-
175-
ctx.Data["GoroutineCount"] = goroutineCount
176-
ctx.Data["ProcessCount"] = processCount
177-
178-
ctx.HTML(http.StatusOK, tplStacktrace)
179-
}
180-
181-
// MonitorCancel cancels a process
182-
func MonitorCancel(ctx *context.Context) {
183-
pid := ctx.Params("pid")
184-
process.GetManager().Cancel(process.IDType(pid))
185-
ctx.JSON(http.StatusOK, map[string]interface{}{
186-
"redirect": setting.AppSubURL + "/admin/monitor",
187-
})
154+
ctx.HTML(http.StatusOK, tplCron)
188155
}

routers/web/admin/diagnosis.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2023 The Gitea Authors.
2+
// SPDX-License-Identifier: MIT
3+
4+
package admin
5+
6+
import (
7+
"archive/zip"
8+
"fmt"
9+
"runtime/pprof"
10+
"time"
11+
12+
"code.gitea.io/gitea/modules/context"
13+
"code.gitea.io/gitea/modules/httplib"
14+
)
15+
16+
func MonitorDiagnosis(ctx *context.Context) {
17+
seconds := ctx.FormInt64("seconds")
18+
if seconds <= 5 {
19+
seconds = 5
20+
}
21+
if seconds > 300 {
22+
seconds = 300
23+
}
24+
25+
httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{
26+
ContentType: "application/zip",
27+
Disposition: "attachment",
28+
Filename: fmt.Sprintf("gitea-diagnosis-%s.zip", time.Now().Format("20060102-150405")),
29+
})
30+
31+
zipWriter := zip.NewWriter(ctx.Resp)
32+
defer zipWriter.Close()
33+
34+
f, err := zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-before.txt", Method: zip.Deflate, Modified: time.Now()})
35+
if err != nil {
36+
ctx.ServerError("Failed to create zip file", err)
37+
return
38+
}
39+
_ = pprof.Lookup("goroutine").WriteTo(f, 1)
40+
41+
f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "cpu-profile.dat", Method: zip.Deflate, Modified: time.Now()})
42+
if err != nil {
43+
ctx.ServerError("Failed to create zip file", err)
44+
return
45+
}
46+
47+
err = pprof.StartCPUProfile(f)
48+
if err == nil {
49+
time.Sleep(time.Duration(seconds) * time.Second)
50+
pprof.StopCPUProfile()
51+
} else {
52+
_, _ = f.Write([]byte(err.Error()))
53+
}
54+
55+
f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-after.txt", Method: zip.Deflate, Modified: time.Now()})
56+
if err != nil {
57+
ctx.ServerError("Failed to create zip file", err)
58+
return
59+
}
60+
_ = pprof.Lookup("goroutine").WriteTo(f, 1)
61+
}

routers/web/admin/queue.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,18 @@ import (
1212
"code.gitea.io/gitea/modules/setting"
1313
)
1414

15-
// Queue shows details for a specific queue
16-
func Queue(ctx *context.Context) {
15+
func Queues(ctx *context.Context) {
16+
if !setting.IsProd {
17+
initTestQueueOnce()
18+
}
19+
ctx.Data["Title"] = ctx.Tr("admin.monitor.queue")
20+
ctx.Data["PageIsAdminMonitorQueue"] = true
21+
ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
22+
ctx.HTML(http.StatusOK, tplQueue)
23+
}
24+
25+
// QueueManage shows details for a specific queue
26+
func QueueManage(ctx *context.Context) {
1727
qid := ctx.ParamsInt64("qid")
1828
mq := queue.GetManager().GetManagedQueue(qid)
1929
if mq == nil {
@@ -57,3 +67,23 @@ func QueueSet(ctx *context.Context) {
5767
ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
5868
ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
5969
}
70+
71+
func QueueRemoveAllItems(ctx *context.Context) {
72+
// Gitea's queue doesn't have transaction support
73+
// So in rare cases, the queue could be corrupted/out-of-sync
74+
// Site admin could remove all items from the queue to make it work again
75+
qid := ctx.ParamsInt64("qid")
76+
mq := queue.GetManager().GetManagedQueue(qid)
77+
if mq == nil {
78+
ctx.Status(http.StatusNotFound)
79+
return
80+
}
81+
82+
if err := mq.RemoveAllItems(ctx); err != nil {
83+
ctx.ServerError("RemoveAllItems", err)
84+
return
85+
}
86+
87+
ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.remove_all_items_done"))
88+
ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
89+
}

0 commit comments

Comments
 (0)