Skip to content

Commit

Permalink
add option to stop mailbox after receiving all elements from queue (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Nov 11, 2023
1 parent 1f1cbcc commit 329821f
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 39 deletions.
6 changes: 4 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ linters-settings:
- github.com/vladopajic/go-actor
forbidigo:
forbid:
- 'time.Sleep*(# Do not sleep)?'
- 'time\.Sleep*(# Do not sleep)?'
- 'panic*(# Do not panic)?'
- 'os.Exit*(# Do not exit)?'
- 'os\.Exit*(# Do not exit)?'
- p: ^fmt\.Print*$
msg: Do not commit print statements.
gomoddirectives:
retract-allow-no-explanation: true
maintidx:
Expand Down
8 changes: 4 additions & 4 deletions actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func (w *worker) DoWork(c Context) WorkerStatus {
func New(w Worker, opt ...Option) Actor {
return &actor{
worker: w,
options: newOptions(opt),
options: newOptions(opt).Actor,
}
}

type actor struct {
worker Worker
options options
options optionsActor
ctx *context
workEndedSigC chan struct{}
workerRunning bool
Expand Down Expand Up @@ -152,7 +152,7 @@ func (a *actor) onStart() {
w.OnStart(a.ctx)
}

if fn := a.options.Actor.OnStartFunc; fn != nil {
if fn := a.options.OnStartFunc; fn != nil {
fn(a.ctx)
}
}
Expand All @@ -162,7 +162,7 @@ func (a *actor) onStop() {
w.OnStop()
}

if fn := a.options.Actor.OnStopFunc; fn != nil {
if fn := a.options.OnStopFunc; fn != nil {
fn()
}
}
Expand Down
4 changes: 2 additions & 2 deletions actor/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func wrapActors(
onStopFunc func(),
) []Actor {
wrapActorStruct := func(a *actor) *actor {
prevOnStopFunc := a.options.Actor.OnStopFunc
prevOnStopFunc := a.options.OnStopFunc

a.options.Actor.OnStopFunc = func() {
a.options.OnStopFunc = func() {
if prevOnStopFunc != nil {
prevOnStopFunc()
}
Expand Down
13 changes: 10 additions & 3 deletions actor/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ const (
MinQueueCapacity = minQueueCapacity
)

type ActorImpl = actor
type (
ActorImpl = actor
OptionsMailbox = optionsMailbox
)

func NewActorImpl(w Worker, opt ...Option) *ActorImpl {
a := New(w, opt...)
Expand Down Expand Up @@ -38,9 +41,13 @@ func NewZeroOptions() options {
func NewMailboxWorker[T any](
sendC,
receiveC chan T,
queue *queue[T],
mOpts optionsMailbox,
) *mailboxWorker[T] {
return newMailboxWorker(sendC, receiveC, queue)
return newMailboxWorker(sendC, receiveC, mOpts)
}

func (w *mailboxWorker[T]) Queue() *queue[T] {
return w.queue
}

func NewQueue[T any](capacity, minimum int) *queue[T] {
Expand Down
36 changes: 24 additions & 12 deletions actor/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func NewMailboxes[T any](count int, opt ...MailboxOption) []Mailbox[T] {
// will never block, all messages are going to be queued and Actors on
// receiving end of the Mailbox will get all messages in FIFO order.
func NewMailbox[T any](opt ...MailboxOption) Mailbox[T] {
mOpts := newOptions(opt).Mailbox
options := newOptions(opt).Mailbox

if mOpts.AsChan {
c := make(chan T, mOpts.Capacity)
if options.AsChan {
c := make(chan T, options.Capacity)

return &mailbox[T]{
Actor: Idle(OptOnStop(func() { close(c) })),
Expand All @@ -75,8 +75,7 @@ func NewMailbox[T any](opt ...MailboxOption) Mailbox[T] {
var (
sendC = make(chan T)
receiveC = make(chan T)
queue = newQueue[T](mOpts.Capacity, mOpts.MinCapacity)
w = newMailboxWorker(sendC, receiveC, queue)
w = newMailboxWorker(sendC, receiveC, options)
)

return &mailbox[T]{
Expand Down Expand Up @@ -109,47 +108,60 @@ type mailboxWorker[T any] struct {
receiveC chan T
sendC chan T
queue *queue[T]
options optionsMailbox
}

func newMailboxWorker[T any](
sendC,
receiveC chan T,
queue *queue[T],
options optionsMailbox,
) *mailboxWorker[T] {
queue := newQueue[T](options.Capacity, options.MinCapacity)

return &mailboxWorker[T]{
sendC: sendC,
receiveC: receiveC,
queue: queue,
options: options,
}
}

func (w *mailboxWorker[T]) DoWork(c Context) WorkerStatus {
if w.queue.IsEmpty() {
select {
case <-c.Done():
return WorkerEnd

case value := <-w.sendC:
w.queue.PushBack(value)
return WorkerContinue

case <-c.Done():
return WorkerEnd
}
}

select {
case <-c.Done():
return WorkerEnd

case w.receiveC <- w.queue.Front():
w.queue.PopFront()
return WorkerContinue

case value := <-w.sendC:
w.queue.PushBack(value)
return WorkerContinue

case <-c.Done():
return WorkerEnd
}
}

func (w *mailboxWorker[T]) OnStop() {
// close sendC to prevent anyone from writing to this mailbox
close(w.sendC)

// close receiveC channel after all data from queue is received
if w.options.StopAfterReceivingAll {
for !w.queue.IsEmpty() {
w.receiveC <- w.queue.PopFront()
}
}

close(w.receiveC)
}
65 changes: 62 additions & 3 deletions actor/mailbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ func Test_MailboxWorker_EndSignal(t *testing.T) {

sendC := make(chan any)
receiveC := make(chan any)
q := NewQueue[any](0, 0)
options := OptionsMailbox{}

w := NewMailboxWorker(sendC, receiveC, q)
w := NewMailboxWorker(sendC, receiveC, options)
assert.NotNil(t, w)

// Worker should signal end with empty queue
assert.Equal(t, WorkerEnd, w.DoWork(ContextEnded()))

// Worker should signal end with non-empty queue
q.PushBack(`🌹`)
w.Queue().PushBack(`🌹`)
assert.Equal(t, WorkerEnd, w.DoWork(ContextEnded()))
}

Expand Down Expand Up @@ -181,6 +181,65 @@ func Test_MailboxOptAsChan(t *testing.T) {
})
}

// This test asserts that Mailbox will end only after all messages have been received.
func Test_Mailbox_OptEndAferReceivingAll(t *testing.T) {
t.Parallel()

const messagesCount = 1000

sendMessages := func(m Mailbox[any]) {
t.Helper()

for i := 0; i < messagesCount; i++ {
assert.NoError(t, m.Send(ContextStarted(), `🥥`))
}
}
assertGotAllMessages := func(m Mailbox[any]) {
t.Helper()

gotMessages := 0

for msg := range m.ReceiveC() {
assert.Equal(t, `🥥`, msg)
gotMessages++
}

assert.Equal(t, messagesCount, gotMessages)
}

t.Run("the-best-way", func(t *testing.T) {
t.Parallel()

m := NewMailbox[any](OptStopAfterReceivingAll())
m.Start()
sendMessages(m)

// Stop has to be called in goroutine because Stop is blocking until
// actor (mailbox) has fully ended. And current thread of execution is needed
// to read data from mailbox.
go m.Stop()

assertGotAllMessages(m)
})

t.Run("suboptimal-way", func(t *testing.T) {
t.Parallel()

m := NewMailbox[any](OptStopAfterReceivingAll())
m.Start()
sendMessages(m)

// This time we start gorotune which will read all messages from mailbox instead of
// stopping in separate goroutine.
// There are no guaranees that this gorutine will finish after Stop is called, so
// it could be the case that this gorotuine has received all messages from mailbox,
// even before mailbox was stopped. Which wouldn't correctly assert this feature.
go assertGotAllMessages(m)

m.Stop()
})
}

func assertSendReceive(t *testing.T, m Mailbox[any], val any) {
t.Helper()

Expand Down
39 changes: 26 additions & 13 deletions actor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func OptAsChan() MailboxOption {
}
}

// OptStopAfterReceivingAll will close ReceiveC channel of Mailbox
// after all messages have been received from this channel.
func OptStopAfterReceivingAll() MailboxOption {
return func(o *options) {
o.Mailbox.StopAfterReceivingAll = true
}
}

// OptStopTogether will stop all actors when any of combined
// actors is stopped.
func OptStopTogether() CombinedOption {
Expand All @@ -78,21 +86,26 @@ type (
)

type options struct {
Actor struct {
OnStartFunc func(Context)
OnStopFunc func()
}
Actor optionsActor
Combined optionsCombined
Mailbox optionsMailbox
}

Combined struct {
StopTogether bool
OnStopFunc func()
}
type optionsActor struct {
OnStartFunc func(Context)
OnStopFunc func()
}

Mailbox struct {
AsChan bool
Capacity int
MinCapacity int
}
type optionsCombined struct {
StopTogether bool
OnStopFunc func()
}

type optionsMailbox struct {
AsChan bool
Capacity int
MinCapacity int
StopAfterReceivingAll bool
}

func newOptions[T ~func(o *options)](opts []T) options {
Expand Down
8 changes: 8 additions & 0 deletions actor/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func testMailboxOptions(t *testing.T) {
assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}

{ // Assert that OptStopAfterReceivingAll will be set
opts := NewOptions(OptStopAfterReceivingAll())
assert.True(t, opts.Mailbox.StopAfterReceivingAll)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}
}

func testCombinedOptions(t *testing.T) {
Expand Down

0 comments on commit 329821f

Please sign in to comment.