Skip to content

Commit

Permalink
feat(rx): add repeat operator (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 12, 2024
1 parent 9984e1b commit 8ddd6db
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
89 changes: 89 additions & 0 deletions rx/observable-operator-retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Retry", func() {
When("principle", func() {
It("🧪 should: retry", func() {
// rxgo: Test_Observable_Retry
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

i := 0
obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
next <- rx.Of(1)
next <- rx.Of(2)
if i == 2 {
next <- rx.Of(3)
} else {
i++
next <- rx.Error[int](errFoo)
}
}}).Retry(3, func(_ error) bool {
return true
})
rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, 1, 2, 1, 2, 3},
},
rx.HasNoError[int]{},
)
})
})

Context("Errors", func() {
When("retry error", func() {
It("🧪 should: retry", func() {
// rxgo: Test_Observable_Retry_Error_ShouldRetry
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
next <- rx.Of(1)
next <- rx.Of(2)
next <- rx.Error[int](errFoo)
}}).Retry(3, func(_ error) bool {
return true
})
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2, 1, 2, 1, 2, 1, 2},
}, rx.HasError[int]{
Expected: []error{errFoo},
})
})
})

When("retry error", func() {
It("🧪 should: not retry", func() {
// rxgo: Test_Observable_Retry_Error_ShouldNotRetry
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
next <- rx.Of(1)
next <- rx.Of(2)
next <- rx.Error[int](errFoo)
}}).Retry(3, func(_ error) bool {
return false
})
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2},
}, rx.HasError[int]{
Expected: []error{errFoo},
})
})
})
})
})
})
41 changes: 41 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,47 @@ func (op *repeatOperator[T]) gatherNext(_ context.Context, _ Item[T],
) {
}

// Retry retries if a source Observable sends an error, resubscribe to
// it in the hopes that it will complete without error. Cannot be run in parallel.
func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T] {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext(o.parent)

go func() {
observe := o.Observe(opts...)
loop:
for {
select {
case <-ctx.Done():
break loop
case i, ok := <-observe:
if !ok {
break loop
}

if i.IsError() {
count--

if count < 0 || !shouldRetry(i.E) {
i.SendContext(ctx, next)
break loop
}

observe = o.Observe(opts...)
} else {
i.SendContext(ctx, next)
}
}
}
close(next)
}()

return &ObservableImpl[T]{
iterable: newChannelIterable(next),
}
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Observable[T any] interface {
Run(opts ...Option[T]) Disposed
Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]
Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down
2 changes: 2 additions & 0 deletions rx/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type (
Unmarshaller[T any] func([]byte, T) error
// Producer defines a producer implementation.
Producer[T any] func(ctx context.Context, next chan<- Item[T])
// ShouldRetryFunc as used by Retry operator
ShouldRetryFunc func(error) bool
// Supplier defines a function that supplies a result from nothing.
Supplier[T any] func(ctx context.Context) Item[T]
// Disposed is a notification channel indicating when an Observable is closed.
Expand Down

0 comments on commit 8ddd6db

Please sign in to comment.