Skip to content

Commit

Permalink
fix(linkedbuffer): fix bufferHasElements channel length
Browse files Browse the repository at this point in the history
  • Loading branch information
alitto committed Nov 10, 2024
1 parent f1d2a44 commit b15a1b5
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test:
go test -race -v -timeout 1m ./...
go test -race -v -timeout 1m -count=3 ./...

coverage:
go test -race -v -timeout 1m -coverprofile=coverage.out -covermode=atomic ./...
7 changes: 6 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pond
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -103,9 +104,13 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) {
index := int(g.nextIndex.Add(1) - 1)

g.taskWaitGroup.Add(1)
fmt.Printf("submitting task %d\n", index)

err := g.pool.Go(func() {
defer g.taskWaitGroup.Done()
defer func() {
fmt.Printf("task %d done\n", index)
g.taskWaitGroup.Done()
}()

// Check if the context has been cancelled to prevent running tasks that are not needed
if err := g.future.Context().Err(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func TestTaskGroupWaitWithContextCanceledAndOngoingTasks(t *testing.T) {
func TestTaskGroupWithStoppedPool(t *testing.T) {

pool := NewPool(100)
pool.EnableDebug()

pool.StopAndWait()

Expand All @@ -186,6 +187,7 @@ func TestTaskGroupWithStoppedPool(t *testing.T) {
func TestTaskGroupWithContextCanceled(t *testing.T) {

pool := NewPool(100)
pool.EnableDebug()

group := pool.NewGroup()

Expand Down Expand Up @@ -227,6 +229,7 @@ func TestTaskGroupWithNoTasks(t *testing.T) {
func TestTaskGroupCanceledShouldSkipRemainingTasks(t *testing.T) {

pool := NewPool(1)
pool.EnableDebug()

group := pool.NewGroup()

Expand Down Expand Up @@ -255,6 +258,8 @@ func TestTaskGroupCanceledShouldSkipRemainingTasks(t *testing.T) {
func TestTaskGroupWithCustomContext(t *testing.T) {
pool := NewPool(1)

pool.EnableDebug()

ctx, cancel := context.WithCancel(context.Background())

group := pool.NewGroupContext(ctx)
Expand All @@ -281,6 +286,7 @@ func TestTaskGroupWithCustomContext(t *testing.T) {

func TestTaskGroupStop(t *testing.T) {
pool := NewPool(1)
pool.EnableDebug()

group := pool.NewGroup()

Expand All @@ -306,6 +312,7 @@ func TestTaskGroupStop(t *testing.T) {

func TestTaskGroupDone(t *testing.T) {
pool := NewPool(10)
pool.EnableDebug()

group := pool.NewGroup()

Expand Down
8 changes: 6 additions & 2 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Dispatcher[T any] struct {
waitGroup sync.WaitGroup
batchSize int
closed atomic.Bool
Debug bool
}

// NewDispatcher creates a generic dispatcher that can receive values from multiple goroutines in a thread-safe manner
Expand All @@ -27,10 +28,9 @@ func NewDispatcher[T any](ctx context.Context, dispatchFunc func([]T), batchSize
dispatcher := &Dispatcher[T]{
ctx: ctx,
buffer: linkedbuffer.NewLinkedBuffer[T](10, batchSize),
bufferHasElements: make(chan struct{}, 1),
bufferHasElements: make(chan struct{}, 2), // This channel needs to have size 2 in case an element is written to the buffer while the dispatcher is processing elements
dispatchFunc: dispatchFunc,
batchSize: batchSize,
closed: atomic.Bool{},
}

dispatcher.waitGroup.Add(1)
Expand All @@ -39,6 +39,10 @@ func NewDispatcher[T any](ctx context.Context, dispatchFunc func([]T), batchSize
return dispatcher
}

func (d *Dispatcher[T]) EnableDebug() {
d.buffer.Debug = true

Check warning on line 43 in internal/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

internal/dispatcher/dispatcher.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}

// Write writes values to the dispatcher
func (d *Dispatcher[T]) Write(values ...T) error {
// Check if the dispatcher has been closed
Expand Down
15 changes: 9 additions & 6 deletions internal/linkedbuffer/linkedbuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package linkedbuffer

import (
"fmt"
"sync"
"sync/atomic"
)
Expand All @@ -17,7 +18,8 @@ type LinkedBuffer[T any] struct {
maxCapacity int
writeCount atomic.Uint64
readCount atomic.Uint64
mutex sync.RWMutex
mutex sync.Mutex
Debug bool
}

func NewLinkedBuffer[T any](initialCapacity, maxCapacity int) *LinkedBuffer[T] {
Expand Down Expand Up @@ -78,28 +80,29 @@ func (b *LinkedBuffer[T]) Write(values []T) {

// Read reads values from the buffer and returns the number of elements read
func (b *LinkedBuffer[T]) Read(values []T) int {
b.mutex.Lock()
defer b.mutex.Unlock()

var readBuffer *Buffer[T]

for {
b.mutex.RLock()
readBuffer = b.readBuffer
b.mutex.RUnlock()

// Read element
n, err := readBuffer.Read(values)

if b.Debug {
fmt.Printf("read %d elements: %v\n", n, values)
}

Check warning on line 96 in internal/linkedbuffer/linkedbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/linkedbuffer/linkedbuffer.go#L95-L96

Added lines #L95 - L96 were not covered by tests

if err == ErrEOF {
// Move to next buffer
b.mutex.Lock()
if readBuffer.next == nil {
b.mutex.Unlock()
return n
}
if b.readBuffer != readBuffer.next {
b.readBuffer = readBuffer.next
}
b.mutex.Unlock()
continue
}

Expand Down
6 changes: 6 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type Pool interface {

// Creates a new task group with the specified context.
NewGroupContext(ctx context.Context) TaskGroup

EnableDebug()
}

// pool is an implementation of the Pool interface.
Expand Down Expand Up @@ -139,6 +141,10 @@ func (p *pool) updateMetrics(err error) {
}
}

func (d *pool) EnableDebug() {
d.dispatcher.EnableDebug()
}

func (p *pool) Go(task func()) error {
if err := p.dispatcher.Write(task); err != nil {
return ErrPoolStopped
Expand Down

0 comments on commit b15a1b5

Please sign in to comment.