Skip to content

Commit

Permalink
add onStop() option for combined actor (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Oct 29, 2023
1 parent 200a727 commit 7a4cb1c
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 52 deletions.
112 changes: 90 additions & 22 deletions actor/combine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package actor

import "sync/atomic"
import (
"sync"
"sync/atomic"
)

// Combine returns builder which is used to create single Actor that combines all
// specified actors into one.
Expand All @@ -20,16 +23,16 @@ type CombineBuilder struct {

// Build returns combined Actor.
func (b *CombineBuilder) Build() Actor {
combined := &combinedActor{
stopping: &atomic.Bool{},
actors: b.actors,
a := &combinedActor{
actors: b.actors,
onStopFunc: b.options.Combined.OnStopFunc,
stopTogether: b.options.Combined.StopTogether,
stopping: &atomic.Bool{},
}

if b.options.Combined.StopTogether {
combined.actors = wrapActors(combined.actors, combined.Stop)
}
a.actors = wrapActors(a.actors, a.onActorStopped)

return combined
return a
}

// WithOptions adds options for combined actor.
Expand All @@ -39,53 +42,118 @@ func (b *CombineBuilder) WithOptions(opt ...CombinedOption) *CombineBuilder {
}

type combinedActor struct {
stopping *atomic.Bool
actors []Actor
actors []Actor
onStopFunc func()
stopTogether bool

runningCount atomic.Int32
running bool
runningLock sync.Mutex
stopping *atomic.Bool
}

func (a *combinedActor) onActorStopped() {
a.runningLock.Lock()

runningCount := a.runningCount.Add(-1)
wasRunning := a.running
a.running = runningCount != 0

a.runningLock.Unlock()

// Last actor to end should call onStopFunc
if runningCount == 0 && wasRunning && a.onStopFunc != nil {
a.onStopFunc()
}

// First actor to stop should stop other actors
if a.stopTogether && a.stopping.CompareAndSwap(false, true) {
// Run stop in goroutine because wrapped actor
// should not wait for other actors to stop.
go a.Stop()
}
}

func (a *combinedActor) Stop() {
if !a.stopping.CompareAndSwap(false, true) {
a.runningLock.Lock()

if !a.running {
a.runningLock.Unlock()
return
}

for _, a := range a.actors {
a.Stop()
a.running = false
a.runningLock.Unlock()

for _, actor := range a.actors {
actor.Stop()
}
}

func (a *combinedActor) Start() {
a.runningLock.Lock()

if a.running {
a.runningLock.Unlock()
return
}

a.stopping.Store(false)
a.running = true

a.runningLock.Unlock()

for _, a := range a.actors {
a.Start()
for _, actor := range a.actors {
a.runningCount.Add(1)
actor.Start()
}
}

func wrapActors(actors []Actor, onStop func()) []Actor {
func wrapActors(
actors []Actor,
onStopFunc func(),
) []Actor {
wrapActorStruct := func(a *actor) *actor {
prevOnStopFunc := a.options.Actor.OnStopFunc

a.options.Actor.OnStopFunc = func() {
if prevOnStopFunc != nil {
prevOnStopFunc()
}

go onStop()
onStopFunc()
}

return a
}

wrapCombinedActorStruct := func(a *combinedActor) *combinedActor {
prevOnStopFunc := a.onStopFunc

a.onStopFunc = func() {
if prevOnStopFunc != nil {
prevOnStopFunc()
}

onStopFunc()
}

return a
}

wrapActorInterface := func(a Actor) Actor {
return &wrappedActor{
actor: a,
onStopFn: onStop,
actor: a,
onStopFunc: onStopFunc,
}
}

for i, a := range actors {
switch v := a.(type) {
case *actor:
actors[i] = wrapActorStruct(v)
case *combinedActor:
actors[i] = wrapCombinedActorStruct(v)
default:
actors[i] = wrapActorInterface(v)
}
Expand All @@ -95,8 +163,8 @@ func wrapActors(actors []Actor, onStop func()) []Actor {
}

type wrappedActor struct {
actor Actor
onStopFn func()
actor Actor
onStopFunc func()
}

func (a *wrappedActor) Start() {
Expand All @@ -105,5 +173,5 @@ func (a *wrappedActor) Start() {

func (a *wrappedActor) Stop() {
a.actor.Stop()
go a.onStopFn()
a.onStopFunc()
}
141 changes: 111 additions & 30 deletions actor/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package actor_test

import (
"math/rand"
"testing"

"github.com/stretchr/testify/assert"

. "github.com/vladopajic/go-actor/actor"
)

func Test_Combine_TestSuite(t *testing.T) {
t.Parallel()

const actorsCount = 10

TestSuite(t, func() Actor {
actors := createActors(actorsCount)

return Combine(actors...).Build()
})
}

// Test asserts that all Start and Stop is
// delegated to all combined actors.
func Test_Combine(t *testing.T) {
Expand All @@ -18,17 +29,10 @@ func Test_Combine(t *testing.T) {

onStartC := make(chan any, actorsCount)
onStopC := make(chan any, actorsCount)
actors := make([]Actor, actorsCount)

for i := 0; i < actorsCount; i++ {
actors[i] = New(newWorker(),
OptOnStart(func(Context) { onStartC <- `🌞` }),
OptOnStop(func() { onStopC <- `🌚` }),
)
}
onStart := OptOnStart(func(Context) { onStartC <- `🌞` })
onStop := OptOnStop(func() { onStopC <- `🌚` })
actors := createActors(actorsCount, onStart, onStop)

// Assert that starting and stopping combined actors
// will start and stop all individual actors
a := Combine(actors...).Build()

// Start combined actor and wait for all actors to be started
Expand All @@ -42,34 +46,111 @@ func Test_Combine(t *testing.T) {
assert.Len(t, onStopC, actorsCount)
}

// Test_Combine_StopTogether asserts that all actors will end as soon
// Test_Combine_OptStopTogether asserts that all actors will end as soon
// as first actors ends.
func Test_Combine_StopTogether(t *testing.T) {
func Test_Combine_OptStopTogether(t *testing.T) {
t.Parallel()

const actorsCount = 5
const actorsCount = 5 * 2

onStartC := make(chan any, actorsCount)
onStopC := make(chan any, actorsCount)
actors := make([]Actor, actorsCount)
for i := 0; i < actorsCount/2+1; i++ {
onStartC := make(chan any, actorsCount)
onStopC := make(chan any, actorsCount)
onStart := OptOnStart(func(Context) { onStartC <- `🌞` })
onStop := OptOnStop(func() { onStopC <- `🌚` })
actors := createActors(actorsCount/2, onStart, onStop)

onStart := OptOnStart(func(Context) { onStartC <- `🌞` })
onStop := OptOnStop(func() { onStopC <- `🌚` })
// append one more actor to actors list
cmb := Combine(createActors(actorsCount/2, onStart, onStop)...).Build()
actors = append(actors, cmb)

for i := 0; i < actorsCount; i++ {
if i%2 == 0 { // case with actorStruct wrapper
actors[i] = New(newWorker(), onStart, onStop)
} else { // case with actorInterface wrapper
actors[i] = Idle(onStart, onStop)
}
a := Combine(actors...).WithOptions(OptStopTogether()).Build()

a.Start()
drainC(onStartC, actorsCount)

// stop actor and assert that all actors will be stopped
actors[i].Stop()
drainC(onStopC, actorsCount)
}
}

a := Combine(actors...).WithOptions(OptStopTogether()).Build()
func Test_Combine_OptOnStop(t *testing.T) {
t.Parallel()

const actorsCount = 5

onStopC, onStopOpt := createCombinedOnStopOption(t, 1)
actors := createActors(actorsCount)

a := Combine(actors...).
WithOptions(onStopOpt).
Build()

a.Start()
drainC(onStartC, actorsCount)

// Stop random actor and assert that all actors will be stopped
actors[rand.Int31n(actorsCount)].Stop() //nolint:gosec // weak random is fine
drainC(onStopC, actorsCount)
a.Stop()
a.Stop() // should have no effect
a.Stop() // should have no effect
a.Stop() // should have no effect
assert.Equal(t, `🌚`, <-onStopC)
}

func Test_Combine_OptOnStop_AfterActorStops(t *testing.T) {
t.Parallel()

const actorsCount = 5 * 2

for i := 0; i < actorsCount/2+1; i++ {
onStopC, onStopOpt := createCombinedOnStopOption(t, 2)
actors := createActors(actorsCount / 2)

// append one more actor to actors list
cmb := Combine(createActors(actorsCount / 2)...).WithOptions(onStopOpt).Build()
actors = append(actors, cmb)

a := Combine(actors...).
WithOptions(onStopOpt, OptStopTogether()).
Build()

a.Start()

actors[i].Stop()
assert.Equal(t, `🌚`, <-onStopC)
assert.Equal(t, `🌚`, <-onStopC)
a.Stop() // should have no effect
}
}

func createActors(count int, opts ...Option) []Actor {
actors := make([]Actor, count)

for i := 0; i < count; i++ {
actors[i] = createActor(i, opts...)
}

return actors
}

func createActor(i int, opts ...Option) Actor {
if i%2 == 0 {
return New(newWorker(), opts...)
}

return Idle(opts...)
}

func createCombinedOnStopOption(t *testing.T, count int) (<-chan any, CombinedOption) {
t.Helper()

onStopC := make(chan any, count)
onStopFunc := func() {
select {
case onStopC <- `🌚`:
default:
t.Fatal("onStopFunc should be called only once")
}
}

return onStopC, OptOnStopCombined(onStopFunc)
}
8 changes: 8 additions & 0 deletions actor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func OptStopTogether() CombinedOption {
}
}

// OptOnStopCombined will is called after all combined actors are stopped.
func OptOnStopCombined(f func()) CombinedOption {
return func(o *options) {
o.Combined.OnStopFunc = f
}
}

type (
option func(o *options)

Expand All @@ -78,6 +85,7 @@ type options struct {

Combined struct {
StopTogether bool
OnStopFunc func()
}

Mailbox struct {
Expand Down

0 comments on commit 7a4cb1c

Please sign in to comment.