Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wxiaoguang committed Nov 15, 2024
1 parent 4121f95 commit 211fc18
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 189 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/pull-db-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ jobs:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
# official mysql image doesn't support MYSQL_EXTRA_FLAGS
image: bitnami/mysql:8.0
env:
MYSQL_ALLOW_EMPTY_PASSWORD: true
ALLOW_EMPTY_PASSWORD: true
MYSQL_DATABASE: testgitea
# https://dev.mysql.com/doc/refman/8.0/en/optimizing-innodb-diskio.html
MYSQL_EXTRA_FLAGS: >
--disable-log-bin
--innodb-buffer-pool-size=1G
--innodb-doublewrite=off
--innodb-flush-log-at-trx-commit=0
--innodb-flush-method=nosync
ports:
- "3306:3306"
elasticsearch:
Expand Down
15 changes: 5 additions & 10 deletions models/migrations/base/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"code.gitea.io/gitea/models/unittest"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/testlogger"

Expand Down Expand Up @@ -91,21 +90,19 @@ func PrepareTestEnv(t *testing.T, skip int, syncModels ...any) (*xorm.Engine, fu
}

func MainTest(m *testing.M) {
log.RegisterEventWriter("test", testlogger.NewTestLoggerWriter)
testlogger.Init()

giteaRoot := base.SetupGiteaRoot()
if giteaRoot == "" {
fmt.Println("Environment variable $GITEA_ROOT not set")
os.Exit(1)
testlogger.Fatalf("Environment variable $GITEA_ROOT not set\n")
}
giteaBinary := "gitea"
if runtime.GOOS == "windows" {
giteaBinary += ".exe"
}
setting.AppPath = filepath.Join(giteaRoot, giteaBinary)
if _, err := os.Stat(setting.AppPath); err != nil {
fmt.Printf("Could not find gitea binary at %s\n", setting.AppPath)
os.Exit(1)
testlogger.Fatalf("Could not find gitea binary at %s\n", setting.AppPath)
}

giteaConf := os.Getenv("GITEA_CONF")
Expand All @@ -122,17 +119,15 @@ func MainTest(m *testing.M) {

tmpDataPath, err := os.MkdirTemp("", "data")
if err != nil {
fmt.Printf("Unable to create temporary data path %v\n", err)
os.Exit(1)
testlogger.Fatalf("Unable to create temporary data path %v\n", err)
}

setting.CustomPath = filepath.Join(setting.AppWorkPath, "custom")
setting.AppDataPath = tmpDataPath

unittest.InitSettings()
if err = git.InitFull(context.Background()); err != nil {
fmt.Printf("Unable to InitFull: %v\n", err)
os.Exit(1)
testlogger.Fatalf("Unable to InitFull: %v\n", err)
}
setting.LoadDBSetting()
setting.InitLoggersForTest()
Expand Down
6 changes: 6 additions & 0 deletions modules/log/color.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ type ColoredValue struct {
colors []ColorAttribute
}

var _ fmt.Formatter = (*ColoredValue)(nil)

func (c *ColoredValue) Format(f fmt.State, verb rune) {
_, _ = f.Write(ColorBytes(c.colors...))
s := fmt.Sprintf(fmt.FormatString(f, verb), c.v)
_, _ = f.Write([]byte(s))
_, _ = f.Write(resetBytes)
}

func (c *ColoredValue) Value() any {
return c.v
}

func NewColoredValue(v any, color ...ColorAttribute) *ColoredValue {
return &ColoredValue{v: v, colors: color}
}
Expand Down
30 changes: 18 additions & 12 deletions modules/queue/workergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
// if none of the items were handled, it should back-off for a few seconds
// in this case the handler (eg: document indexer) may have encountered some errors/failures
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
if q.isFlushing.Load() {
return // do not requeue items when flushing, since all items failed, requeue them will continue failing.
}
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
select {
case <-q.ctxRun.Done():
Expand Down Expand Up @@ -193,6 +196,9 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
// doFlush flushes the queue: it tries to read all items from the queue and handles them.
// It is for testing purpose only. It's not designed to work for a cluster.
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
q.isFlushing.Store(true)
defer q.isFlushing.Store(false)

log.Debug("Queue %q starts flushing", q.GetName())
defer log.Debug("Queue %q finishes flushing", q.GetName())

Expand Down Expand Up @@ -236,6 +242,9 @@ loop:
emptyCounter := 0
for {
select {
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
Expand All @@ -251,9 +260,6 @@ loop:
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
}
return
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case <-time.After(20 * time.Millisecond):
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
Expand Down Expand Up @@ -331,6 +337,15 @@ func (q *WorkerPoolQueue[T]) doRun() {
var batchDispatchC <-chan time.Time = infiniteTimerC
for {
select {
case flush := <-q.flushChan:
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
q.doDispatchBatchToWorker(wg, skipFlushChan)
q.doFlush(wg, flush)
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
Expand All @@ -349,20 +364,11 @@ func (q *WorkerPoolQueue[T]) doRun() {
case <-batchDispatchC:
batchDispatchC = infiniteTimerC
q.doDispatchBatchToWorker(wg, q.flushChan)
case flush := <-q.flushChan:
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
q.doDispatchBatchToWorker(wg, skipFlushChan)
q.doFlush(wg, flush)
case err := <-wg.popItemErr:
if !q.isCtxRunCanceled() {
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
}
return
case <-q.ctxRun.Done():
log.Debug("Queue %q is shutting down", q.GetName())
return
}
}
}
5 changes: 3 additions & 2 deletions modules/queue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type WorkerPoolQueue[T any] struct {
baseConfig *BaseConfig
baseQueue baseQueue

batchChan chan []T
flushChan chan flushType
batchChan chan []T
flushChan chan flushType
isFlushing atomic.Bool

batchLength int
workerNum int
Expand Down
105 changes: 62 additions & 43 deletions modules/testlogger/testlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
)

var (
prefix string
SlowTest = 10 * time.Second
SlowFlush = 5 * time.Second
prefix string
TestTimeout = 10 * time.Minute
TestSlowRun = 10 * time.Second
TestSlowFlush = 5 * time.Second
)

var WriterCloser = &testLoggerWriterCloser{}
Expand Down Expand Up @@ -89,79 +90,97 @@ func (w *testLoggerWriterCloser) Reset() {
w.Unlock()
}

// Printf takes a format and args and prints the string to os.Stdout
func Printf(format string, args ...any) {
if !log.CanColorStdout {
for i := 0; i < len(args); i++ {
if c, ok := args[i].(*log.ColoredValue); ok {
args[i] = c.Value()
}
}
}
_, _ = fmt.Fprintf(os.Stdout, format, args...)
}

// PrintCurrentTest prints the current test to os.Stdout
func PrintCurrentTest(t testing.TB, skip ...int) func() {
t.Helper()
start := time.Now()
runStart := time.Now()
actualSkip := util.OptionalArg(skip) + 1
_, filename, line, _ := runtime.Caller(actualSkip)

if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line)
} else {
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line)
}
Printf("=== %s (%s:%d)\n", log.NewColoredValue(t.Name()), strings.TrimPrefix(filename, prefix), line)

WriterCloser.pushT(t)
return func() {
took := time.Since(start)
if took > SlowTest {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow)))
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took)
timeoutChecker := time.AfterFunc(TestTimeout, func() {
l := 128 * 1024
var stack []byte
for {
stack = make([]byte, l)
n := runtime.Stack(stack, true)
if n <= l {
stack = stack[:n]
break
}
l = n
}
timer := time.AfterFunc(SlowFlush, func() {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), SlowFlush)
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush)
}
Printf("!!! %s ... timeout: %v ... stacktrace:\n%s\n\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestTimeout, string(stack))
})
return func() {
flushStart := time.Now()
slowFlushChecker := time.AfterFunc(TestSlowFlush, func() {
Printf("+++ %s ... still flushing after %v ...\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestSlowFlush)
})
if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil {
t.Errorf("Flushing queues failed with error %v", err)
}
timer.Stop()
flushTook := time.Since(start) - took
if flushTook > SlowFlush {
if log.CanColorStdout {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed)))
} else {
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook)
}
}
WriterCloser.popT()
}
}
slowFlushChecker.Stop()
timeoutChecker.Stop()

// Printf takes a format and args and prints the string to os.Stdout
func Printf(format string, args ...any) {
if log.CanColorStdout {
for i := 0; i < len(args); i++ {
args[i] = log.NewColoredValue(args[i])
runDuration := time.Since(runStart)
flushDuration := time.Since(flushStart)
if runDuration > TestSlowRun {
Printf("+++ %s is a slow test (run: %v, flush: %v)\n", log.NewColoredValue(t.Name(), log.Bold, log.FgYellow), runDuration, flushDuration)
}
WriterCloser.popT()
}
_, _ = fmt.Fprintf(os.Stdout, "\t"+format, args...)
}

// TestLogEventWriter is a logger which will write to the testing log
type TestLogEventWriter struct {
*log.EventWriterBaseImpl
}

// NewTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
func NewTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
// newTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
func newTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
w := &TestLogEventWriter{}
w.EventWriterBaseImpl = log.NewEventWriterBase(name, "test-log-writer", mode)
w.OutputWriteCloser = WriterCloser
return w
}

func init() {
func Init() {
const relFilePath = "modules/testlogger/testlogger.go"
_, filename, _, _ := runtime.Caller(0)
if !strings.HasSuffix(filename, relFilePath) {
panic("source code file path doesn't match expected: " + relFilePath)
}
prefix = strings.TrimSuffix(filename, relFilePath)

log.RegisterEventWriter("test", newTestLoggerWriter)

duration, err := time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_RUN"))
if err == nil && duration > 0 {
TestSlowRun = duration
}

duration, err = time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_FLUSH"))
if err == nil && duration > 0 {
TestSlowFlush = duration
}
}

func Fatalf(format string, args ...any) {
Printf(format+"\n", args...)
os.Exit(1)
}
14 changes: 2 additions & 12 deletions tests/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,8 @@ We appreciate that some testing machines may not be very powerful and
the default timeouts for declaring a slow test or a slow clean-up flush
may not be appropriate.

You can either:

* Within the test ini file set the following section:

```ini
[integration-tests]
SLOW_TEST = 10s ; 10s is the default value
SLOW_FLUSH = 5S ; 5s is the default value
```

* Set the following environment variables:
You can set the following environment variables:

```bash
GITEA_SLOW_TEST_TIME="10s" GITEA_SLOW_FLUSH_TIME="5s" make test-sqlite
GITEA_TEST_SLOW_RUN="10s" GITEA_TEST_SLOW_FLUSH="5s" make test-sqlite
```
Loading

0 comments on commit 211fc18

Please sign in to comment.