diff --git a/os/grpool/grpool.go b/os/grpool/grpool.go index 5d03507114e..dd9dcf7e623 100644 --- a/os/grpool/grpool.go +++ b/os/grpool/grpool.go @@ -9,11 +9,14 @@ package grpool import ( "context" + "time" "github.com/gogf/gf/v2/container/glist" "github.com/gogf/gf/v2/container/gtype" "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/gtimer" + "github.com/gogf/gf/v2/util/grand" ) // Func is the pool function which contains context parameter. @@ -26,15 +29,20 @@ type RecoverFunc func(ctx context.Context, err error) type Pool struct { limit int // Max goroutine count limit. count *gtype.Int // Current running goroutine count. - list *glist.List // Job list for asynchronous job adding purpose. + list *glist.List // List for asynchronous job adding purpose. closed *gtype.Bool // Is pool closed or not. } -type internalPoolItem struct { +type localPoolItem struct { Ctx context.Context Func Func } +const ( + minTimerDuration = 500 * time.Millisecond + maxTimerDuration = 1500 * time.Millisecond +) + // Default goroutine pool. var ( pool = New() @@ -53,6 +61,8 @@ func New(limit ...int) *Pool { if len(limit) > 0 && limit[0] > 0 { p.limit = limit[0] } + timerDuration := grand.D(minTimerDuration, maxTimerDuration) + gtimer.Add(context.Background(), timerDuration, p.supervisor) return p } @@ -66,8 +76,8 @@ func Add(ctx context.Context, f Func) error { // The optional `recoverFunc` is called when any panic during executing of `userFunc`. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. // The job will be executed asynchronously. -func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error { - return pool.AddWithRecover(ctx, userFunc, recoverFunc...) +func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error { + return pool.AddWithRecover(ctx, userFunc, recoverFunc) } // Size returns current goroutine count of default goroutine pool. @@ -84,42 +94,68 @@ func Jobs() int { // The job will be executed asynchronously. func (p *Pool) Add(ctx context.Context, f Func) error { for p.closed.Val() { - return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed") + return gerror.NewCode( + gcode.CodeInvalidOperation, + "goroutine pool is already closed", + ) } - p.list.PushFront(&internalPoolItem{ + p.list.PushFront(&localPoolItem{ Ctx: ctx, Func: f, }) + // Check and fork new worker. + p.checkAndFork() + return nil +} + +// checkAndFork checks and creates a new goroutine worker. +// Note that the worker dies if the job function panics and the job has no recover handling. +func (p *Pool) checkAndFork() { // Check whether fork new goroutine or not. var n int for { n = p.count.Val() if p.limit != -1 && n >= p.limit { // No need fork new goroutine. - return nil + return } if p.count.Cas(n, n+1) { // Use CAS to guarantee atomicity. break } } - p.fork() - return nil + // Create job function in goroutine. + go func() { + defer p.count.Add(-1) + + var ( + listItem interface{} + poolItem *localPoolItem + ) + for !p.closed.Val() { + listItem = p.list.PopBack() + if listItem == nil { + return + } + poolItem = listItem.(*localPoolItem) + poolItem.Func(poolItem.Ctx) + } + }() } // AddWithRecover pushes a new job to the pool with specified recover function. // The optional `recoverFunc` is called when any panic during executing of `userFunc`. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. // The job will be executed asynchronously. -func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error { +func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error { return p.Add(ctx, func(ctx context.Context) { defer func() { if exception := recover(); exception != nil { - if len(recoverFunc) > 0 && recoverFunc[0] != nil { + if recoverFunc != nil { if v, ok := exception.(error); ok && gerror.HasStack(v) { - recoverFunc[0](ctx, v) + recoverFunc(ctx, v) } else { - recoverFunc[0](ctx, gerror.Newf(`%+v`, exception)) + recoverFunc(ctx, gerror.Newf(`%+v`, exception)) } } } @@ -146,27 +182,6 @@ func (p *Pool) Jobs() int { return p.list.Size() } -// fork creates a new goroutine worker. -// Note that the worker dies if the job function panics. -func (p *Pool) fork() { - go func() { - defer p.count.Add(-1) - - var ( - listItem interface{} - poolItem *internalPoolItem - ) - for !p.closed.Val() { - if listItem = p.list.PopBack(); listItem != nil { - poolItem = listItem.(*internalPoolItem) - poolItem.Func(poolItem.Ctx) - } else { - return - } - } - }() -} - // IsClosed returns if pool is closed. func (p *Pool) IsClosed() bool { return p.closed.Val() diff --git a/os/grpool/grpool_supervisor.go b/os/grpool/grpool_supervisor.go new file mode 100644 index 00000000000..d57e9695c28 --- /dev/null +++ b/os/grpool/grpool_supervisor.go @@ -0,0 +1,30 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package grpool + +import ( + "context" + + "github.com/gogf/gf/v2/os/gtimer" +) + +// supervisor checks the job list and fork new worker goroutine to handle the job +// if there are jobs but no workers in pool. +func (p *Pool) supervisor(ctx context.Context) { + if p.IsClosed() { + gtimer.Exit() + } + if p.list.Size() > 0 && p.count.Val() == 0 { + var number = p.list.Size() + if p.limit > 0 { + number = p.limit + } + for i := 0; i < number; i++ { + p.checkAndFork() + } + } +} diff --git a/os/grpool/grpool_z_unit_test.go b/os/grpool/grpool_z_unit_test.go index ceb0b922558..eb73c5fa46f 100644 --- a/os/grpool/grpool_z_unit_test.go +++ b/os/grpool/grpool_z_unit_test.go @@ -4,8 +4,6 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://github.com/gogf/gf. -// go test *.go -bench=".*" -count=1 - package grpool_test import ( @@ -22,19 +20,23 @@ import ( func Test_Basic(t *testing.T) { gtest.C(t, func(t *gtest.T) { var ( + err error wg = sync.WaitGroup{} array = garray.NewArray(true) size = 100 ) wg.Add(size) for i := 0; i < size; i++ { - grpool.Add(ctx, func(ctx context.Context) { + err = grpool.Add(ctx, func(ctx context.Context) { array.Append(1) wg.Done() }) + t.AssertNil(err) } wg.Wait() + time.Sleep(100 * time.Millisecond) + t.Assert(array.Len(), size) t.Assert(grpool.Jobs(), 0) t.Assert(grpool.Size(), 0) @@ -64,6 +66,7 @@ func Test_Limit1(t *testing.T) { func Test_Limit2(t *testing.T) { gtest.C(t, func(t *gtest.T) { var ( + err error wg = sync.WaitGroup{} array = garray.NewArray(true) size = 100 @@ -71,10 +74,11 @@ func Test_Limit2(t *testing.T) { ) wg.Add(size) for i := 0; i < size; i++ { - pool.Add(ctx, func(ctx context.Context) { + err = pool.Add(ctx, func(ctx context.Context) { defer wg.Done() array.Append(1) }) + t.AssertNil(err) } wg.Wait() t.Assert(array.Len(), size) @@ -111,18 +115,25 @@ func Test_Limit3(t *testing.T) { func Test_AddWithRecover(t *testing.T) { gtest.C(t, func(t *gtest.T) { - array := garray.NewArray(true) - grpool.AddWithRecover(ctx, func(ctx context.Context) { + var ( + err error + array = garray.NewArray(true) + ) + err = grpool.AddWithRecover(ctx, func(ctx context.Context) { array.Append(1) panic(1) }, func(ctx context.Context, err error) { array.Append(1) }) - grpool.AddWithRecover(ctx, func(ctx context.Context) { + t.AssertNil(err) + err = grpool.AddWithRecover(ctx, func(ctx context.Context) { panic(1) array.Append(1) - }) + }, nil) + t.AssertNil(err) + time.Sleep(500 * time.Millisecond) + t.Assert(array.Len(), 2) }) }