Skip to content

Commit

Permalink
feat(rx): add scan operator (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 12, 2024
1 parent 310c245 commit ac4a77b
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
55 changes: 55 additions & 0 deletions rx/observable-operator-scan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Scan", func() {
When("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Scan
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3, 4, 5).Scan(
func(_ context.Context, x, y rx.Item[int]) (int, error) {
return x.V + y.V, nil
},
)
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 3, 6, 10, 15},
})
})
})

Context("Parallel", func() {
When("foo", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Scan_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3, 4, 5).Scan(
func(_ context.Context, x, y rx.Item[int]) (int, error) {
return x.V + y.V, nil
},
rx.WithCPUPool[int](),
)

rx.Assert(ctx, obs, rx.HasItemsNoOrder[int]{
Expected: []int{1, 3, 6, 10, 15},
})
})
})
})
})
})
51 changes: 51 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,57 @@ func (o *ObservableImpl[T]) Sample(iterable Iterable[T], opts ...Option[T]) Obse
}
}

// Scan apply a Func2 to each item emitted by an Observable, sequentially, and
// emit each successive value. Cannot be run in parallel.
func (o *ObservableImpl[T]) Scan(apply Func2[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &scanOperator[T]{
apply: apply,
}
}, forceSeq, bypassGather, opts...)
}

type scanOperator[T any] struct {
apply Func2[T]
current Item[T]
}

func (op *scanOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
v, err := op.apply(ctx, op.current, item)

if err != nil {
Error[T](err).SendContext(ctx, dst)
operatorOptions.stop()

return
}

it := Of(v)
it.SendContext(ctx, dst)
op.current = it
}

func (op *scanOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *scanOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *scanOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Observable[T any] interface {
Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed
Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
Scan(apply Func2[T], opts ...Option[T]) Observable[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down

0 comments on commit ac4a77b

Please sign in to comment.