Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve testing and try to fix MySQL hanging #32515

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/pull-db-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,15 @@ jobs:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
# the bitnami mysql image has more options than the official one, it's easier to customize
image: bitnami/mysql:8.0
env:
MYSQL_ALLOW_EMPTY_PASSWORD: true
ALLOW_EMPTY_PASSWORD: true
MYSQL_DATABASE: testgitea
ports:
- "3306:3306"
options: >-
--mount type=tmpfs,destination=/bitnami/mysql/data
elasticsearch:
image: elasticsearch:7.5.0
env:
Expand Down Expand Up @@ -188,7 +191,8 @@ jobs:
- name: run migration tests
run: make test-mysql-migration
- name: run tests
run: make integration-test-coverage
# run: make integration-test-coverage (at the moment, no coverage is really handled)
run: make test-mysql
env:
TAGS: bindata
RACE_ENABLED: true
Expand Down
15 changes: 5 additions & 10 deletions models/migrations/base/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"code.gitea.io/gitea/models/unittest"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/testlogger"

Expand Down Expand Up @@ -91,21 +90,19 @@ func PrepareTestEnv(t *testing.T, skip int, syncModels ...any) (*xorm.Engine, fu
}

func MainTest(m *testing.M) {
log.RegisterEventWriter("test", testlogger.NewTestLoggerWriter)
testlogger.Init()

giteaRoot := base.SetupGiteaRoot()
if giteaRoot == "" {
fmt.Println("Environment variable $GITEA_ROOT not set")
os.Exit(1)
testlogger.Fatalf("Environment variable $GITEA_ROOT not set\n")
}
giteaBinary := "gitea"
if runtime.GOOS == "windows" {
giteaBinary += ".exe"
}
setting.AppPath = filepath.Join(giteaRoot, giteaBinary)
if _, err := os.Stat(setting.AppPath); err != nil {
fmt.Printf("Could not find gitea binary at %s\n", setting.AppPath)
os.Exit(1)
testlogger.Fatalf("Could not find gitea binary at %s\n", setting.AppPath)
}

giteaConf := os.Getenv("GITEA_CONF")
Expand All @@ -122,17 +119,15 @@ func MainTest(m *testing.M) {

tmpDataPath, err := os.MkdirTemp("", "data")
if err != nil {
fmt.Printf("Unable to create temporary data path %v\n", err)
os.Exit(1)
testlogger.Fatalf("Unable to create temporary data path %v\n", err)
}

setting.CustomPath = filepath.Join(setting.AppWorkPath, "custom")
setting.AppDataPath = tmpDataPath

unittest.InitSettings()
if err = git.InitFull(context.Background()); err != nil {
fmt.Printf("Unable to InitFull: %v\n", err)
os.Exit(1)
testlogger.Fatalf("Unable to InitFull: %v\n", err)
}
setting.LoadDBSetting()
setting.InitLoggersForTest()
Expand Down
6 changes: 6 additions & 0 deletions modules/log/color.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ type ColoredValue struct {
colors []ColorAttribute
}

var _ fmt.Formatter = (*ColoredValue)(nil)

func (c *ColoredValue) Format(f fmt.State, verb rune) {
_, _ = f.Write(ColorBytes(c.colors...))
s := fmt.Sprintf(fmt.FormatString(f, verb), c.v)
_, _ = f.Write([]byte(s))
_, _ = f.Write(resetBytes)
}

func (c *ColoredValue) Value() any {
return c.v
}

func NewColoredValue(v any, color ...ColorAttribute) *ColoredValue {
return &ColoredValue{v: v, colors: color}
}
Expand Down
34 changes: 21 additions & 13 deletions modules/queue/workergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
)

func init() {
unhandledItemRequeueDuration.Store(int64(5 * time.Second))
unhandledItemRequeueDuration.Store(int64(time.Second))
}

// workerGroup is a group of workers to work with a WorkerPoolQueue
Expand Down Expand Up @@ -104,7 +104,12 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
// if none of the items were handled, it should back-off for a few seconds
// in this case the handler (eg: document indexer) may have encountered some errors/failures
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
if q.isFlushing.Load() {
return // do not requeue items when flushing, since all items failed, requeue them will continue failing.
}
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
// TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush).
// It could debounce the requeue operation, and try to requeue the items in the future.
select {
case <-q.ctxRun.Done():
case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
Expand Down Expand Up @@ -193,6 +198,9 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
// doFlush flushes the queue: it tries to read all items from the queue and handles them.
// It is for testing purpose only. It's not designed to work for a cluster.
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
q.isFlushing.Store(true)
defer q.isFlushing.Store(false)

log.Debug("Queue %q starts flushing", q.GetName())
defer log.Debug("Queue %q finishes flushing", q.GetName())

Expand Down Expand Up @@ -236,6 +244,9 @@ loop:
emptyCounter := 0
for {
select {
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
Expand All @@ -251,9 +262,6 @@ loop:
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
}
return
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case <-time.After(20 * time.Millisecond):
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
Expand Down Expand Up @@ -331,6 +339,15 @@ func (q *WorkerPoolQueue[T]) doRun() {
var batchDispatchC <-chan time.Time = infiniteTimerC
for {
select {
case flush := <-q.flushChan:
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
q.doDispatchBatchToWorker(wg, skipFlushChan)
q.doFlush(wg, flush)
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
Expand All @@ -349,20 +366,11 @@ func (q *WorkerPoolQueue[T]) doRun() {
case <-batchDispatchC:
batchDispatchC = infiniteTimerC
q.doDispatchBatchToWorker(wg, q.flushChan)
case flush := <-q.flushChan:
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
q.doDispatchBatchToWorker(wg, skipFlushChan)
q.doFlush(wg, flush)
case err := <-wg.popItemErr:
if !q.isCtxRunCanceled() {
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
}
return
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
}
}
}
5 changes: 3 additions & 2 deletions modules/queue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type WorkerPoolQueue[T any] struct {
baseConfig *BaseConfig
baseQueue baseQueue

batchChan chan []T
flushChan chan flushType
batchChan chan []T
flushChan chan flushType
isFlushing atomic.Bool

batchLength int
workerNum int
Expand Down
105 changes: 62 additions & 43 deletions modules/testlogger/testlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
)

var (
prefix string
SlowTest = 10 * time.Second
SlowFlush = 5 * time.Second
prefix string
TestTimeout = 10 * time.Minute
TestSlowRun = 10 * time.Second
TestSlowFlush = 1 * time.Second
)

var WriterCloser = &testLoggerWriterCloser{}
Expand Down Expand Up @@ -89,79 +90,97 @@ func (w *testLoggerWriterCloser) Reset() {
w.Unlock()
}

// Printf takes a format and args and prints the string to os.Stdout
func Printf(format string, args ...any) {
if !log.CanColorStdout {
for i := 0; i < len(args); i++ {
if c, ok := args[i].(*log.ColoredValue); ok {
args[i] = c.Value()
}
}
}
_, _ = fmt.Fprintf(os.Stdout, format, args...)
}

// PrintCurrentTest prints the current test to os.Stdout
func PrintCurrentTest(t testing.TB, skip ...int) func() {
t.Helper()
start := time.Now()
runStart := time.Now()
actualSkip := util.OptionalArg(skip) + 1
_, filename, line, _ := runtime.Caller(actualSkip)

if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line)
} else {
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line)
}
Printf("=== %s (%s:%d)\n", log.NewColoredValue(t.Name()), strings.TrimPrefix(filename, prefix), line)

WriterCloser.pushT(t)
return func() {
took := time.Since(start)
if took > SlowTest {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow)))
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took)
timeoutChecker := time.AfterFunc(TestTimeout, func() {
l := 128 * 1024
var stack []byte
for {
stack = make([]byte, l)
n := runtime.Stack(stack, true)
if n <= l {
stack = stack[:n]
break
}
l = n
}
timer := time.AfterFunc(SlowFlush, func() {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), SlowFlush)
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush)
}
Printf("!!! %s ... timeout: %v ... stacktrace:\n%s\n\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestTimeout, string(stack))
})
return func() {
flushStart := time.Now()
slowFlushChecker := time.AfterFunc(TestSlowFlush, func() {
Printf("+++ %s ... still flushing after %v ...\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestSlowFlush)
})
if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil {
t.Errorf("Flushing queues failed with error %v", err)
}
timer.Stop()
flushTook := time.Since(start) - took
if flushTook > SlowFlush {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed)))
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook)
}
}
WriterCloser.popT()
}
}
slowFlushChecker.Stop()
timeoutChecker.Stop()

// Printf takes a format and args and prints the string to os.Stdout
func Printf(format string, args ...any) {
if log.CanColorStdout {
for i := 0; i < len(args); i++ {
args[i] = log.NewColoredValue(args[i])
runDuration := time.Since(runStart)
flushDuration := time.Since(flushStart)
if runDuration > TestSlowRun {
Printf("+++ %s is a slow test (run: %v, flush: %v)\n", log.NewColoredValue(t.Name(), log.Bold, log.FgYellow), runDuration, flushDuration)
}
WriterCloser.popT()
}
_, _ = fmt.Fprintf(os.Stdout, "\t"+format, args...)
}

// TestLogEventWriter is a logger which will write to the testing log
type TestLogEventWriter struct {
*log.EventWriterBaseImpl
}

// NewTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
func NewTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
// newTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
func newTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
w := &TestLogEventWriter{}
w.EventWriterBaseImpl = log.NewEventWriterBase(name, "test-log-writer", mode)
w.OutputWriteCloser = WriterCloser
return w
}

func init() {
func Init() {
const relFilePath = "modules/testlogger/testlogger.go"
_, filename, _, _ := runtime.Caller(0)
if !strings.HasSuffix(filename, relFilePath) {
panic("source code file path doesn't match expected: " + relFilePath)
}
prefix = strings.TrimSuffix(filename, relFilePath)

log.RegisterEventWriter("test", newTestLoggerWriter)

duration, err := time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_RUN"))
if err == nil && duration > 0 {
TestSlowRun = duration
}

duration, err = time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_FLUSH"))
if err == nil && duration > 0 {
TestSlowFlush = duration
}
}

func Fatalf(format string, args ...any) {
Printf(format+"\n", args...)
os.Exit(1)
}
14 changes: 2 additions & 12 deletions tests/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,8 @@ We appreciate that some testing machines may not be very powerful and
the default timeouts for declaring a slow test or a slow clean-up flush
may not be appropriate.

You can either:

* Within the test ini file set the following section:

```ini
[integration-tests]
SLOW_TEST = 10s ; 10s is the default value
SLOW_FLUSH = 5S ; 5s is the default value
```

* Set the following environment variables:
You can set the following environment variables:

```bash
GITEA_SLOW_TEST_TIME="10s" GITEA_SLOW_FLUSH_TIME="5s" make test-sqlite
GITEA_TEST_SLOW_RUN="10s" GITEA_TEST_SLOW_FLUSH="1s" make test-sqlite
```
4 changes: 2 additions & 2 deletions tests/integration/api_repo_file_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func TestAPIGetRawFileOrLFS(t *testing.T) {

t.Run("Partial Clone", doPartialGitClone(dstPath2, u))

lfs := lfsCommitAndPushTest(t, dstPath, littleSize)[0]
lfs := lfsCommitAndPushTest(t, dstPath, testFileSizeSmall)[0]

reqLFS := NewRequest(t, "GET", "/api/v1/repos/user2/repo1/media/"+lfs)
respLFS := MakeRequestNilResponseRecorder(t, reqLFS, http.StatusOK)
assert.Equal(t, littleSize, respLFS.Length)
assert.Equal(t, testFileSizeSmall, respLFS.Length)

doAPIDeleteRepository(httpContext)
})
Expand Down
Loading
Loading