Skip to content

Commit

Permalink
feat(rx): run operator (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 12, 2024
1 parent 8ddd6db commit 0937cae
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 27 deletions.
57 changes: 57 additions & 0 deletions rx/observable-operator-run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
)

var _ = Describe("Observable operator", func() {
Context("Run", func() {
When("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Run
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s := make([]int, 0)
<-testObservable[int](ctx, 1, 2, 3).Map(
func(_ context.Context, i int) (int, error) {
s = append(s, i)

return i, nil
},
).Run()

Expect(s).To(ContainElements([]int{1, 2, 3}))
})
})

Context("Errors", func() {
When("error", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Run_Error
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s := make([]int, 0)
<-testObservable[int](ctx, 1, errFoo).Map(
func(_ context.Context, i int) (int, error) {
s = append(s, i)

return i, nil
},
).Run()

Expect(s).To(ContainElements([]int{1}))
})
})
})
})
})
52 changes: 26 additions & 26 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,32 +183,6 @@ func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Dispo
return ctx, Disposable(cancel)
}

// Run creates an Observer without consuming the emitted items.
func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed {
dispose := make(chan struct{})
option := parseOptions(opts...)
ctx := option.buildContext(o.parent)

go func() {
defer close(dispose)

observe := o.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case _, ok := <-observe:
if !ok {
return
}
}
}
}()

return dispose
}

func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T] {
const (
forceSeq = false
Expand Down Expand Up @@ -1641,6 +1615,32 @@ func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc, opts .
}
}

// Run creates an Observer without consuming the emitted items.
func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed {
dispose := make(chan struct{})
option := parseOptions(opts...)
ctx := option.buildContext(o.parent)

go func() {
defer close(dispose)

observe := o.Observe(opts...)

for {
select {
case <-ctx.Done():
return
case _, ok := <-observe:
if !ok {
return
}
}
}
}()

return dispose
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
2 changes: 1 addition & 1 deletion rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type Observable[T any] interface {
OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T]
OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]
OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed
Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]
Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down

0 comments on commit 0937cae

Please sign in to comment.