diff --git a/Makefile b/Makefile index 6ce0725..180eb66 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ test: - go test -race -v -timeout 1m ./... + go test -race -v -timeout 1m -count=3 ./... coverage: go test -race -v -timeout 1m -coverprofile=coverage.out -covermode=atomic ./... \ No newline at end of file diff --git a/group.go b/group.go index 0ba530e..19ee5ef 100644 --- a/group.go +++ b/group.go @@ -3,6 +3,7 @@ package pond import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -103,9 +104,13 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) { index := int(g.nextIndex.Add(1) - 1) g.taskWaitGroup.Add(1) + fmt.Printf("submitting task %d\n", index) err := g.pool.Go(func() { - defer g.taskWaitGroup.Done() + defer func() { + fmt.Printf("task %d done\n", index) + g.taskWaitGroup.Done() + }() // Check if the context has been cancelled to prevent running tasks that are not needed if err := g.future.Context().Err(); err != nil { diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 470485a..d35d9bc 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -27,10 +27,9 @@ func NewDispatcher[T any](ctx context.Context, dispatchFunc func([]T), batchSize dispatcher := &Dispatcher[T]{ ctx: ctx, buffer: linkedbuffer.NewLinkedBuffer[T](10, batchSize), - bufferHasElements: make(chan struct{}, 1), + bufferHasElements: make(chan struct{}, 2), // This channel needs to have size 2 in case an element is written to the buffer while the dispatcher is processing elements dispatchFunc: dispatchFunc, batchSize: batchSize, - closed: atomic.Bool{}, } dispatcher.waitGroup.Add(1)