Skip to content

Commit

Permalink
add OptOnStartCombined (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Jan 20, 2024
1 parent ef89e36 commit 8c04049
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 7 deletions.
21 changes: 21 additions & 0 deletions actor/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (b *CombineBuilder) Build() Actor {
a := &combinedActor{
actors: b.actors,
onStopFunc: b.options.Combined.OnStopFunc,
onStartFunc: b.options.Combined.OnStartFunc,
stopTogether: b.options.Combined.StopTogether,
stopping: &atomic.Bool{},
}
Expand All @@ -44,8 +45,10 @@ func (b *CombineBuilder) WithOptions(opt ...CombinedOption) *CombineBuilder {
type combinedActor struct {
actors []Actor
onStopFunc func()
onStartFunc func(Context)
stopTogether bool

ctx *context
runningCount atomic.Int32
running bool
runningLock sync.Mutex
Expand All @@ -70,6 +73,11 @@ func (a *combinedActor) onActorStopped() {
if a.stopTogether && a.stopping.CompareAndSwap(false, true) {
// Run stop in goroutine because wrapped actor
// should not wait for other actors to stop.
//
// Also if a.Stop() is called from same gorutine it would
// be recoursive call without exit condition. Therfore
// it is need to call a.Stop() from other goroutine,
// regardless of first invariant.
go a.Stop()
}
}
Expand All @@ -82,6 +90,12 @@ func (a *combinedActor) Stop() {
return
}

if ctx := a.ctx; ctx != nil {
ctx.end()

a.ctx = nil
}

a.running = false
a.runningLock.Unlock()

Expand All @@ -98,11 +112,18 @@ func (a *combinedActor) Start() {
return
}

ctx := newContext()
a.ctx = ctx

a.stopping.Store(false)
a.running = true

a.runningLock.Unlock()

if fn := a.onStartFunc; fn != nil {
fn(ctx)
}

for _, actor := range a.actors {
a.runningCount.Add(1)
actor.Start()
Expand Down
29 changes: 23 additions & 6 deletions actor/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ func Test_Combine_OptStopTogether(t *testing.T) {
}
}

func Test_Combine_OptOnStop(t *testing.T) {
func Test_Combine_OptOnStopOptOnStart(t *testing.T) {
t.Parallel()

const actorsCount = 5

onStatC, onStartOpt := createCombinedOnStartOption(t, 1)
onStopC, onStopOpt := createCombinedOnStopOption(t, 1)
actors := createActors(actorsCount)

a := Combine(actors...).
WithOptions(onStopOpt).
WithOptions(onStopOpt, onStartOpt).
Build()

a.Start()
Expand All @@ -94,6 +95,7 @@ func Test_Combine_OptOnStop(t *testing.T) {
a.Stop() // should have no effect
a.Stop() // should have no effect
assert.Equal(t, `🌚`, <-onStopC)
assert.Equal(t, `🌞`, <-onStatC)
}

func Test_Combine_OptOnStop_AfterActorStops(t *testing.T) {
Expand Down Expand Up @@ -143,14 +145,29 @@ func createActor(i int, opts ...Option) Actor {
func createCombinedOnStopOption(t *testing.T, count int) (<-chan any, CombinedOption) {
t.Helper()

onStopC := make(chan any, count)
onStopFunc := func() {
c := make(chan any, count)
fn := func() {
select {
case onStopC <- `🌚`:
case c <- `🌚`:
default:
t.Fatal("onStopFunc should be called only once")
}
}

return onStopC, OptOnStopCombined(onStopFunc)
return c, OptOnStopCombined(fn)
}

func createCombinedOnStartOption(t *testing.T, count int) (<-chan any, CombinedOption) {
t.Helper()

c := make(chan any, count)
fn := func(_ Context) {
select {
case c <- `🌞`:
default:
t.Fatal("onStart should be called only once")
}
}

return c, OptOnStartCombined(fn)
}
10 changes: 9 additions & 1 deletion actor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,20 @@ func OptStopTogether() CombinedOption {
}
}

// OptOnStopCombined will is called after all combined actors are stopped.
// OptOnStopCombined is called after all combined actors are stopped.
func OptOnStopCombined(f func()) CombinedOption {
return func(o *options) {
o.Combined.OnStopFunc = f
}
}

// OptOnStartCombined is called before all.
func OptOnStartCombined(f func(Context)) CombinedOption {
return func(o *options) {
o.Combined.OnStartFunc = f
}
}

type (
option func(o *options)

Expand All @@ -99,6 +106,7 @@ type optionsActor struct {
type optionsCombined struct {
StopTogether bool
OnStopFunc func()
OnStartFunc func(Context)
}

type optionsMailbox struct {
Expand Down
16 changes: 16 additions & 0 deletions actor/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,20 @@ func testCombinedOptions(t *testing.T) {
assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Mailbox)
}

{ // Assert that OnStartCombined will be set
opts := NewOptions(OptOnStartCombined(func(Context) {}))
assert.NotNil(t, opts.Combined.OnStartFunc)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Mailbox)
}

{ // Assert that OnStopCombined will be set
opts := NewOptions(OptOnStopCombined(func() {}))
assert.NotNil(t, opts.Combined.OnStopFunc)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Mailbox)
}
}

0 comments on commit 8c04049

Please sign in to comment.