Skip to content

Commit

Permalink
feat(rx): add on error xxx and observe operators (#184)
Browse files Browse the repository at this point in the history
feat(rx): add observe operator (#184)

feat(rx): add on error operators (#184)
  • Loading branch information
plastikfan committed Apr 11, 2024
1 parent cd2dffd commit 7b9765d
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 8 deletions.
29 changes: 29 additions & 0 deletions rx/observable-operator-observe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
)

var _ = Describe("Observable operator", func() {
Context("Observe", func() {
When("observable", func() {
It("🧪 should: receive all emitted items", func() {
// rxgo: Test_Observable_Observe
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
got := make([]int, 0)
ch := testObservable[int](ctx, 1, 2, 3).Observe()
for item := range ch {
got = append(got, item.V)
}
Expect(got).To(ContainElements([]int{1, 2, 3}))
})
})
})
})
79 changes: 79 additions & 0 deletions rx/observable-operator-on-error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("OnErrorResumeNext", func() {
When("error occurs", func() {
It("🧪 should: continue with next iterable", func() {
// rxgo: Test_Observable_OnErrorResumeNext
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2, errFoo, 4).OnErrorResumeNext(
func(_ error) rx.Observable[int] {
return testObservable[int](ctx, 10, 20)
},
)
rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, 10, 20},
},
rx.HasNoError[int]{},
)
})
})
})

Context("OnErrorReturn", func() {
When("error occurs", func() {
It("🧪 should: emit translated error and continue", func() {
// rxgo: Test_Observable_OnErrorReturn
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, errFoo, 4, errBar, 6).OnErrorReturn(
func(_ error) int {
return -1
},
)
rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, -1, 4, -1, 6},
},
rx.HasNoError[int]{},
)
})
})
})

Context("OnErrorReturnItem", func() {
When("error occurs", func() {
It("🧪 should: emit translated error and continue", func() {
// rxgo: Test_Observable_OnErrorReturnItem
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, errFoo, 4, errBar, 6).OnErrorReturnItem(-1)
rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{1, 2, -1, 4, -1, 6},
},
rx.HasNoError[int]{},
)
})
})
})
})
116 changes: 109 additions & 7 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@ import (
"github.com/cenkalti/backoff/v4"
)

// func isZero[T any](limit T) bool {
// val := reflect.ValueOf(limit).Interface()
// zero := reflect.Zero(reflect.TypeOf(limit)).Interface()

// return val != zero
// }

// All determines whether all items emitted by an Observable meet some criteria.
func (o *ObservableImpl[T]) All(predicate Predicate[T], opts ...Option[T]) Single[T] {
const (
Expand Down Expand Up @@ -1359,6 +1352,115 @@ func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] {
return o.iterable.Observe(opts...)
}

// OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking
// onError if it encounters an error.
func (o *ObservableImpl[T]) OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &onErrorResumeNextOperator[T]{
resumeSequence: resumeSequence,
}
}, forceSeq, bypassGather, opts...)
}

type onErrorResumeNextOperator[T any] struct {
resumeSequence ErrorToObservable[T]
}

func (op *onErrorResumeNextOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T]) {
item.SendContext(ctx, dst)
}

func (op *onErrorResumeNextOperator[T]) err(_ context.Context, item Item[T],
_ chan<- Item[T], operatorOptions operatorOptions[T],
) {
operatorOptions.resetIterable(op.resumeSequence(item.E))
}

func (op *onErrorResumeNextOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *onErrorResumeNextOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T]) {
}

func (o *ObservableImpl[T]) OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &onErrorReturnOperator[T]{
resumeFunc: resumeFunc,
}
}, forceSeq, bypassGather, opts...)
}

type onErrorReturnOperator[T any] struct {
resumeFunc ErrorFunc[T]
}

func (op *onErrorReturnOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T]) {
item.SendContext(ctx, dst)
}

func (op *onErrorReturnOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
Of[T](op.resumeFunc(item.E)).SendContext(ctx, dst)
}

func (op *onErrorReturnOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *onErrorReturnOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T]) {
}

func (o *ObservableImpl[T]) OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T] {
const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &onErrorReturnItemOperator[T]{
resume: resume,
}
}, true, false, opts...)
}

type onErrorReturnItemOperator[T any] struct {
resume T
}

func (op *onErrorReturnItemOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
item.SendContext(ctx, dst)
}

func (op *onErrorReturnItemOperator[T]) err(ctx context.Context, _ Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
Of(op.resume).SendContext(ctx, dst)
}

func (op *onErrorReturnItemOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *onErrorReturnItemOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// ToSlice collects all items from an Observable and emit them in a slice and
// an optional error. Cannot be run in parallel.
func (o *ObservableImpl[T]) ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error) {
Expand Down
4 changes: 3 additions & 1 deletion rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type Observable[T any] interface {
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Map(apply Func[T], opts ...Option[T]) Observable[T]
Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]

OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T]
OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]
OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed

ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
Expand Down

0 comments on commit 7b9765d

Please sign in to comment.