diff --git a/frame/g/g_func.go b/frame/g/g_func.go index c7092b17f8f..883b0924a20 100644 --- a/frame/g/g_func.go +++ b/frame/g/g_func.go @@ -17,6 +17,20 @@ import ( "github.com/gogf/gf/v2/util/gutil" ) +type ( + Func = gutil.Func // Func is the function which contains context parameter. + RecoverFunc = gutil.RecoverFunc // RecoverFunc is the panic recover function which contains context parameter. +) + +// Go creates a new asynchronous goroutine function with specified recover function. +// +// The parameter `recoverFunc` is called when any panic during executing of `goroutineFunc`. +// If `recoverFunc` is given nil, it ignores the panic from `goroutineFunc` and no panic will +// throw to parent goroutine. +func Go(ctx context.Context, goroutineFunc Func, recoverFunc RecoverFunc) { + gutil.Go(ctx, goroutineFunc, recoverFunc) +} + // NewVar returns a gvar.Var. func NewVar(i interface{}, safe ...bool) *Var { return gvar.New(i, safe...) diff --git a/frame/g/g_z_unit_test.go b/frame/g/g_z_unit_test.go index 80dc13b5de9..56bd9e87462 100644 --- a/frame/g/g_z_unit_test.go +++ b/frame/g/g_z_unit_test.go @@ -9,8 +9,10 @@ package g_test import ( "context" "os" + "sync" "testing" + "github.com/gogf/gf/v2/container/garray" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/test/gtest" "github.com/gogf/gf/v2/util/gutil" @@ -114,3 +116,19 @@ func Test_Object(t *testing.T) { t.AssertNE(g.Validator(), nil) }) } + +func Test_Go(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + ) + wg.Add(1) + g.Go(context.Background(), func(ctx context.Context) { + defer wg.Done() + array.Append(1) + }, nil) + wg.Wait() + t.Assert(array.Len(), 1) + }) +} diff --git a/os/grpool/grpool.go b/os/grpool/grpool.go index aac2f4329ad..1abf991950a 100644 --- a/os/grpool/grpool.go +++ b/os/grpool/grpool.go @@ -13,8 +13,6 @@ import ( "github.com/gogf/gf/v2/container/glist" "github.com/gogf/gf/v2/container/gtype" - "github.com/gogf/gf/v2/errors/gcode" - "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/os/gtimer" "github.com/gogf/gf/v2/util/grand" ) @@ -23,7 +21,7 @@ import ( type Func func(ctx context.Context) // RecoverFunc is the pool runtime panic recover function which contains context parameter. -type RecoverFunc func(ctx context.Context, err error) +type RecoverFunc func(ctx context.Context, exception error) // Pool manages the goroutines using pool. type Pool struct { @@ -33,161 +31,66 @@ type Pool struct { closed *gtype.Bool // Is pool closed or not. } +// localPoolItem is the job item storing in job list. type localPoolItem struct { - Ctx context.Context - Func Func + Ctx context.Context // Context. + Func Func // Job function. } const ( - minTimerDuration = 500 * time.Millisecond - maxTimerDuration = 1500 * time.Millisecond + minSupervisorTimerDuration = 500 * time.Millisecond + maxSupervisorTimerDuration = 1500 * time.Millisecond ) // Default goroutine pool. var ( - pool = New() + defaultPool = New() ) // New creates and returns a new goroutine pool object. // The parameter `limit` is used to limit the max goroutine count, // which is not limited in default. func New(limit ...int) *Pool { - p := &Pool{ - limit: -1, - count: gtype.NewInt(), - list: glist.New(true), - closed: gtype.NewBool(), - } + var ( + pool = &Pool{ + limit: -1, + count: gtype.NewInt(), + list: glist.New(true), + closed: gtype.NewBool(), + } + timerDuration = grand.D( + minSupervisorTimerDuration, + maxSupervisorTimerDuration, + ) + ) if len(limit) > 0 && limit[0] > 0 { - p.limit = limit[0] + pool.limit = limit[0] } - timerDuration := grand.D(minTimerDuration, maxTimerDuration) - gtimer.Add(context.Background(), timerDuration, p.supervisor) - return p + gtimer.Add(context.Background(), timerDuration, pool.supervisor) + return pool } -// Add pushes a new job to the pool using default goroutine pool. +// Add pushes a new job to the default goroutine pool. // The job will be executed asynchronously. func Add(ctx context.Context, f Func) error { - return pool.Add(ctx, f) + return defaultPool.Add(ctx, f) } -// AddWithRecover pushes a new job to the pool with specified recover function. +// AddWithRecover pushes a new job to the default pool with specified recover function. +// // The optional `recoverFunc` is called when any panic during executing of `userFunc`. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. // The job will be executed asynchronously. func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error { - return pool.AddWithRecover(ctx, userFunc, recoverFunc) + return defaultPool.AddWithRecover(ctx, userFunc, recoverFunc) } // Size returns current goroutine count of default goroutine pool. func Size() int { - return pool.Size() + return defaultPool.Size() } // Jobs returns current job count of default goroutine pool. func Jobs() int { - return pool.Jobs() -} - -// Add pushes a new job to the pool. -// The job will be executed asynchronously. -func (p *Pool) Add(ctx context.Context, f Func) error { - for p.closed.Val() { - return gerror.NewCode( - gcode.CodeInvalidOperation, - "goroutine pool is already closed", - ) - } - p.list.PushFront(&localPoolItem{ - Ctx: ctx, - Func: f, - }) - // Check and fork new worker. - p.checkAndFork() - return nil -} - -// checkAndFork checks and creates a new goroutine worker. -// Note that the worker dies if the job function panics and the job has no recover handling. -func (p *Pool) checkAndFork() { - // Check whether fork new goroutine or not. - var n int - for { - n = p.count.Val() - if p.limit != -1 && n >= p.limit { - // No need fork new goroutine. - return - } - if p.count.Cas(n, n+1) { - // Use CAS to guarantee atomicity. - break - } - } - // Create job function in goroutine. - go func() { - defer p.count.Add(-1) - - var ( - listItem interface{} - poolItem *localPoolItem - ) - for !p.closed.Val() { - listItem = p.list.PopBack() - if listItem == nil { - return - } - poolItem = listItem.(*localPoolItem) - poolItem.Func(poolItem.Ctx) - } - }() -} - -// AddWithRecover pushes a new job to the pool with specified recover function. -// The optional `recoverFunc` is called when any panic during executing of `userFunc`. -// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. -// The job will be executed asynchronously. -func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error { - return p.Add(ctx, func(ctx context.Context) { - defer func() { - if exception := recover(); exception != nil { - if recoverFunc != nil { - if v, ok := exception.(error); ok && gerror.HasStack(v) { - recoverFunc(ctx, v) - } else { - recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception)) - } - } - } - }() - userFunc(ctx) - }) -} - -// Cap returns the capacity of the pool. -// This capacity is defined when pool is created. -// It returns -1 if there's no limit. -func (p *Pool) Cap() int { - return p.limit -} - -// Size returns current goroutine count of the pool. -func (p *Pool) Size() int { - return p.count.Val() -} - -// Jobs returns current job count of the pool. -// Note that, it does not return worker/goroutine count but the job/task count. -func (p *Pool) Jobs() int { - return p.list.Size() -} - -// IsClosed returns if pool is closed. -func (p *Pool) IsClosed() bool { - return p.closed.Val() -} - -// Close closes the goroutine pool, which makes all goroutines exit. -func (p *Pool) Close() { - p.closed.Set(true) + return defaultPool.Jobs() } diff --git a/os/grpool/grpool_pool.go b/os/grpool/grpool_pool.go new file mode 100644 index 00000000000..707e37e13ac --- /dev/null +++ b/os/grpool/grpool_pool.go @@ -0,0 +1,119 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package grpool + +import ( + "context" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" +) + +// Add pushes a new job to the pool. +// The job will be executed asynchronously. +func (p *Pool) Add(ctx context.Context, f Func) error { + for p.closed.Val() { + return gerror.NewCode( + gcode.CodeInvalidOperation, + "goroutine defaultPool is already closed", + ) + } + p.list.PushFront(&localPoolItem{ + Ctx: ctx, + Func: f, + }) + // Check and fork new worker. + p.checkAndForkNewGoroutineWorker() + return nil +} + +// AddWithRecover pushes a new job to the pool with specified recover function. +// +// The optional `recoverFunc` is called when any panic during executing of `userFunc`. +// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. +// The job will be executed asynchronously. +func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error { + return p.Add(ctx, func(ctx context.Context) { + defer func() { + if exception := recover(); exception != nil { + if recoverFunc != nil { + if v, ok := exception.(error); ok && gerror.HasStack(v) { + recoverFunc(ctx, v) + } else { + recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception)) + } + } + } + }() + userFunc(ctx) + }) +} + +// Cap returns the capacity of the pool. +// This capacity is defined when pool is created. +// It returns -1 if there's no limit. +func (p *Pool) Cap() int { + return p.limit +} + +// Size returns current goroutine count of the pool. +func (p *Pool) Size() int { + return p.count.Val() +} + +// Jobs returns current job count of the pool. +// Note that, it does not return worker/goroutine count but the job/task count. +func (p *Pool) Jobs() int { + return p.list.Size() +} + +// IsClosed returns if pool is closed. +func (p *Pool) IsClosed() bool { + return p.closed.Val() +} + +// Close closes the goroutine pool, which makes all goroutines exit. +func (p *Pool) Close() { + p.closed.Set(true) +} + +// checkAndForkNewGoroutineWorker checks and creates a new goroutine worker. +// Note that the worker dies if the job function panics and the job has no recover handling. +func (p *Pool) checkAndForkNewGoroutineWorker() { + // Check whether fork new goroutine or not. + var n int + for { + n = p.count.Val() + if p.limit != -1 && n >= p.limit { + // No need fork new goroutine. + return + } + if p.count.Cas(n, n+1) { + // Use CAS to guarantee atomicity. + break + } + } + + // Create job function in goroutine. + go func() { + defer p.count.Add(-1) + + var ( + listItem interface{} + poolItem *localPoolItem + ) + // Harding working, one by one, job never empty, worker never die. + for !p.closed.Val() { + listItem = p.list.PopBack() + if listItem == nil { + return + } + poolItem = listItem.(*localPoolItem) + poolItem.Func(poolItem.Ctx) + } + }() +} diff --git a/os/grpool/grpool_supervisor.go b/os/grpool/grpool_supervisor.go index d57e9695c28..f4a7a3b6c14 100644 --- a/os/grpool/grpool_supervisor.go +++ b/os/grpool/grpool_supervisor.go @@ -14,7 +14,7 @@ import ( // supervisor checks the job list and fork new worker goroutine to handle the job // if there are jobs but no workers in pool. -func (p *Pool) supervisor(ctx context.Context) { +func (p *Pool) supervisor(_ context.Context) { if p.IsClosed() { gtimer.Exit() } @@ -24,7 +24,7 @@ func (p *Pool) supervisor(ctx context.Context) { number = p.limit } for i := 0; i < number; i++ { - p.checkAndFork() + p.checkAndForkNewGoroutineWorker() } } } diff --git a/util/gutil/gutil_goroutine.go b/util/gutil/gutil_goroutine.go new file mode 100644 index 00000000000..111eae4d1b6 --- /dev/null +++ b/util/gutil/gutil_goroutine.go @@ -0,0 +1,45 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gutil + +import ( + "context" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" +) + +// Func is the function which contains context parameter. +type Func func(ctx context.Context) + +// RecoverFunc is the panic recover function which contains context parameter. +type RecoverFunc func(ctx context.Context, exception error) + +// Go creates a new asynchronous goroutine function with specified recover function. +// +// The parameter `recoverFunc` is called when any panic during executing of `goroutineFunc`. +// If `recoverFunc` is given nil, it ignores the panic from `goroutineFunc` and no panic will +// throw to parent goroutine. +func Go(ctx context.Context, goroutineFunc Func, recoverFunc RecoverFunc) { + if goroutineFunc == nil { + return + } + go func() { + defer func() { + if exception := recover(); exception != nil { + if recoverFunc != nil { + if v, ok := exception.(error); ok && gerror.HasStack(v) { + recoverFunc(ctx, v) + } else { + recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception)) + } + } + } + }() + goroutineFunc(ctx) + }() +} diff --git a/util/gutil/gutil_z_unit_goroutine_test.go b/util/gutil/gutil_z_unit_goroutine_test.go new file mode 100644 index 00000000000..8a86055a132 --- /dev/null +++ b/util/gutil/gutil_z_unit_goroutine_test.go @@ -0,0 +1,64 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gutil_test + +import ( + "context" + "sync" + "testing" + + "github.com/gogf/gf/v2/container/garray" + "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/util/gutil" +) + +func Test_Go(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + ) + wg.Add(1) + gutil.Go(ctx, func(ctx context.Context) { + defer wg.Done() + array.Append(1) + }, nil) + wg.Wait() + t.Assert(array.Len(), 1) + }) + // recover + gtest.C(t, func(t *gtest.T) { + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + ) + wg.Add(1) + gutil.Go(ctx, func(ctx context.Context) { + defer wg.Done() + panic("error") + array.Append(1) + }, nil) + wg.Wait() + t.Assert(array.Len(), 0) + }) + gtest.C(t, func(t *gtest.T) { + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + ) + wg.Add(1) + gutil.Go(ctx, func(ctx context.Context) { + panic("error") + }, func(ctx context.Context, exception error) { + defer wg.Done() + array.Append(exception) + }) + wg.Wait() + t.Assert(array.Len(), 1) + t.Assert(array.At(0), "error") + }) +}