Skip to content

Commit

Permalink
add combine actor builder (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Oct 24, 2023
1 parent 21715ec commit 200a727
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
actor.New(pw),
actor.New(cw1),
actor.New(cw2),
)
).Build()

// Finally all actors are started and stopped at once
a.Start()
Expand Down
37 changes: 25 additions & 12 deletions actor/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,42 @@ package actor

import "sync/atomic"

// Combine returns single Actor which combines all specified actors into one.
// Calling Start or Stop function on this Actor will invoke respective function
// on all Actors provided to this function.
func Combine(actors ...Actor) Actor {
return &combinedActor{
stopping: &atomic.Bool{},
actors: actors,
// Combine returns builder which is used to create single Actor that combines all
// specified actors into one.
//
// Calling Start or Stop function on combined Actor will invoke respective
// function on all underlying Actors.
func Combine(actors ...Actor) *CombineBuilder {
return &CombineBuilder{
actors: actors,
}
}

// CombineAndStopTogether returns single Actor which combines all specified actors into one,
// just like Combine function, and additionally actors combined with this function will end
// together as soon as any of supplied actors ends.
func CombineAndStopTogether(actors ...Actor) Actor {
type CombineBuilder struct {
actors []Actor
options options
}

// Build returns combined Actor.
func (b *CombineBuilder) Build() Actor {
combined := &combinedActor{
stopping: &atomic.Bool{},
actors: b.actors,
}

combined.actors = wrapActors(actors, combined.Stop)
if b.options.Combined.StopTogether {
combined.actors = wrapActors(combined.actors, combined.Stop)
}

return combined
}

// WithOptions adds options for combined actor.
func (b *CombineBuilder) WithOptions(opt ...CombinedOption) *CombineBuilder {
b.options = newOptions(opt)
return b
}

type combinedActor struct {
stopping *atomic.Bool
actors []Actor
Expand Down
8 changes: 4 additions & 4 deletions actor/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_Combine(t *testing.T) {

// Assert that starting and stopping combined actors
// will start and stop all individual actors
a := Combine(actors...)
a := Combine(actors...).Build()

// Start combined actor and wait for all actors to be started
a.Start()
Expand All @@ -42,9 +42,9 @@ func Test_Combine(t *testing.T) {
assert.Len(t, onStopC, actorsCount)
}

// Test_CombineAndStopTogether asserts that all actors will end as soon
// Test_Combine_StopTogether asserts that all actors will end as soon
// as first actors ends.
func Test_CombineAndStopTogether(t *testing.T) {
func Test_Combine_StopTogether(t *testing.T) {
t.Parallel()

const actorsCount = 5
Expand All @@ -64,7 +64,7 @@ func Test_CombineAndStopTogether(t *testing.T) {
}
}

a := CombineAndStopTogether(actors...)
a := Combine(actors...).WithOptions(OptStopTogether()).Build()

a.Start()
drainC(onStartC, actorsCount)
Expand Down
2 changes: 1 addition & 1 deletion actor/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *context) End() {
c.end()
}

func NewOptions(opts ...Option) options {
func NewOptions[T ~func(o *options)](opts ...T) options {
return newOptions(opts)
}

Expand Down
6 changes: 3 additions & 3 deletions actor/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func FromMailboxes[T any](mm []Mailbox[T]) Actor {
a[i] = m
}

return Combine(a...)
return Combine(a...).Build()
}

// FanOut spawns new goroutine in which messages received by receiveC are forwarded
Expand All @@ -46,7 +46,7 @@ func FanOut[T any, MS MailboxSender[T]](receiveC <-chan T, senders []MS) {
}

// NewMailboxes returns slice of new Mailbox instances with specified count.
func NewMailboxes[T any](count int, opt ...Option) []Mailbox[T] {
func NewMailboxes[T any](count int, opt ...MailboxOption) []Mailbox[T] {
mm := make([]Mailbox[T], count)
for i := 0; i < count; i++ {
mm[i] = NewMailbox[T](opt...)
Expand All @@ -59,7 +59,7 @@ func NewMailboxes[T any](count int, opt ...Option) []Mailbox[T] {
// Mailbox is much like native go channel, except that writing to the Mailbox
// will never block, all messages are going to be queued and Actors on
// receiving end of the Mailbox will get all messages in FIFO order.
func NewMailbox[T any](opt ...Option) Mailbox[T] {
func NewMailbox[T any](opt ...MailboxOption) Mailbox[T] {
mOpts := newOptions(opt).Mailbox

if mOpts.AsChan {
Expand Down
2 changes: 1 addition & 1 deletion actor/mailbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Test_FanOut(t *testing.T) {
// Fan out inMbx
FanOut(inMbx.ReceiveC(), fanMbxx)

a := Combine(inMbx, FromMailboxes(fanMbxx))
a := Combine(inMbx, FromMailboxes(fanMbxx)).Build()

a.Start()
defer a.Stop()
Expand Down
37 changes: 26 additions & 11 deletions actor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ func OptOnStop(f func()) Option {

// OptCapacity sets initial Mailbox queue capacity.
// Value must be power of 2.
func OptCapacity(capacity int) Option {
func OptCapacity(capacity int) MailboxOption {
return func(o *options) {
o.Mailbox.Capacity = capacity
}
}

// OptMinCapacity sets minimum Mailbox queue capacity.
// Value must be power of 2.
func OptMinCapacity(minCapacity int) Option {
func OptMinCapacity(minCapacity int) MailboxOption {
return func(o *options) {
o.Mailbox.MinCapacity = minCapacity
}
}

// OptMailbox sets all Mailbox capacity options at once.
func OptMailbox(capacity, minCapacity int) Option {
func OptMailbox(capacity, minCapacity int) MailboxOption {
return func(o *options) {
o.Mailbox.Capacity = capacity
o.Mailbox.MinCapacity = minCapacity
Expand All @@ -48,36 +48,51 @@ func OptMailbox(capacity, minCapacity int) Option {

// OptAsChan makes Mailbox to function as wrapper for
// native go channel.
func OptAsChan() Option {
func OptAsChan() MailboxOption {
return func(o *options) {
o.Mailbox.AsChan = true
}
}

type Option func(o *options)
// OptStopTogether will stop all actors when any of combined
// actors is stopped.
func OptStopTogether() CombinedOption {
return func(o *options) {
o.Combined.StopTogether = true
}
}

type (
option func(o *options)

Option option
MailboxOption option
CombinedOption option
)

type options struct {
Actor struct {
OnStartFunc func(Context)
OnStopFunc func()
}

Combined struct {
StopTogether bool
}

Mailbox struct {
AsChan bool
Capacity int
MinCapacity int
}
}

func (o *options) apply(opts []Option) {
func newOptions[T ~func(o *options)](opts []T) options {
o := &options{}

for _, opt := range opts {
opt(o)
}
}

func newOptions(opts []Option) options {
o := &options{}
o.apply(opts)

return *o
}
54 changes: 50 additions & 4 deletions actor/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,101 @@ import (
func TestOptions(t *testing.T) {
t.Parallel()

{ // NewOptions with no options should equal to zero options
opts := NewOptions()
assert.Equal(t, NewZeroOptions(), opts)
}
// NewOptions with no options should equal to zero options
assert.Equal(t, NewZeroOptions(), NewOptions[Option]())

testActorOptions(t)
testMailboxOptions(t)
testCombinedOptions(t)
}

func testActorOptions(t *testing.T) {
t.Helper()

{ // Assert that OnStartFunc will be set
opts := NewOptions(OptOnStart(func(Context) {}))
assert.NotNil(t, opts.Actor.OnStartFunc)
assert.Nil(t, opts.Actor.OnStopFunc)

assert.Empty(t, opts.Mailbox)
assert.Empty(t, opts.Combined)
}

{ // Assert that OnStopFunc will be set
opts := NewOptions(OptOnStop(func() {}))
assert.NotNil(t, opts.Actor.OnStopFunc)
assert.Nil(t, opts.Actor.OnStartFunc)

assert.Empty(t, opts.Mailbox)
assert.Empty(t, opts.Combined)
}

{ // Assert that OnStartFunc and OnStopFunc will be set
opts := NewOptions(OptOnStart(func(Context) {}), OptOnStop(func() {}))
assert.NotNil(t, opts.Actor.OnStartFunc)
assert.NotNil(t, opts.Actor.OnStopFunc)

assert.Empty(t, opts.Mailbox)
assert.Empty(t, opts.Combined)
}
}

func testMailboxOptions(t *testing.T) {
t.Helper()

{ // Assert that OptCapacity will be set
opts := NewOptions(OptCapacity(16))
assert.Equal(t, 16, opts.Mailbox.Capacity)
assert.Equal(t, 0, opts.Mailbox.MinCapacity)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}

{ // Assert that OptMinCapacity will be set
opts := NewOptions(OptMinCapacity(32))
assert.Equal(t, 0, opts.Mailbox.Capacity)
assert.Equal(t, 32, opts.Mailbox.MinCapacity)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}

{ // Assert that OptCapacity and OptMinCapacity will be set
opts := NewOptions(OptCapacity(16), OptMinCapacity(32))
assert.Equal(t, 16, opts.Mailbox.Capacity)
assert.Equal(t, 32, opts.Mailbox.MinCapacity)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}

{ // Assert that OptCapacity and OptMinCapacity will be set
opts := NewOptions(OptMailbox(16, 32))
assert.Equal(t, 16, opts.Mailbox.Capacity)
assert.Equal(t, 32, opts.Mailbox.MinCapacity)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}

{ // Assert that OptAsChan will be set
opts := NewOptions(OptAsChan())
assert.True(t, opts.Mailbox.AsChan)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Combined)
}
}

func testCombinedOptions(t *testing.T) {
t.Helper()

{ // Assert that StopTogether will be set
opts := NewOptions(OptStopTogether())
assert.True(t, opts.Combined.StopTogether)

assert.Empty(t, opts.Actor)
assert.Empty(t, opts.Mailbox)
}
}

0 comments on commit 200a727

Please sign in to comment.