diff --git a/rx/observable-operator-run_test.go b/rx/observable-operator-run_test.go new file mode 100644 index 0000000..ee16bba --- /dev/null +++ b/rx/observable-operator-run_test.go @@ -0,0 +1,57 @@ +package rx_test + +import ( + "context" + + "github.com/fortytw2/leaktest" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok +) + +var _ = Describe("Observable operator", func() { + Context("Run", func() { + When("principle", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_Run + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := make([]int, 0) + <-testObservable[int](ctx, 1, 2, 3).Map( + func(_ context.Context, i int) (int, error) { + s = append(s, i) + + return i, nil + }, + ).Run() + + Expect(s).To(ContainElements([]int{1, 2, 3})) + }) + }) + + Context("Errors", func() { + When("error", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_Run_Error + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := make([]int, 0) + <-testObservable[int](ctx, 1, errFoo).Map( + func(_ context.Context, i int) (int, error) { + s = append(s, i) + + return i, nil + }, + ).Run() + + Expect(s).To(ContainElements([]int{1})) + }) + }) + }) + }) +}) diff --git a/rx/observable-operator.go b/rx/observable-operator.go index cd7933d..541107f 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -183,32 +183,6 @@ func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Dispo return ctx, Disposable(cancel) } -// Run creates an Observer without consuming the emitted items. -func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed { - dispose := make(chan struct{}) - option := parseOptions(opts...) - ctx := option.buildContext(o.parent) - - go func() { - defer close(dispose) - - observe := o.Observe(opts...) - - for { - select { - case <-ctx.Done(): - return - case _, ok := <-observe: - if !ok { - return - } - } - } - }() - - return dispose -} - func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T] { const ( forceSeq = false @@ -1641,6 +1615,32 @@ func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc, opts . } } +// Run creates an Observer without consuming the emitted items. +func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed { + dispose := make(chan struct{}) + option := parseOptions(opts...) + ctx := option.buildContext(o.parent) + + go func() { + defer close(dispose) + + observe := o.Observe(opts...) + + for { + select { + case <-ctx.Done(): + return + case _, ok := <-observe: + if !ok { + return + } + } + } + }() + + return dispose +} + // !!! // ToSlice collects all items from an Observable and emit them in a slice and diff --git a/rx/observable.go b/rx/observable.go index 9d3dc52..dee6c3f 100644 --- a/rx/observable.go +++ b/rx/observable.go @@ -43,10 +43,10 @@ type Observable[T any] interface { OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T] OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T] OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T] - Run(opts ...Option[T]) Disposed Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T] Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T] Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T] + Run(opts ...Option[T]) Disposed ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error) }