Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rx): reset reduce acc with native 0 (#231) #237

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rx/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ var _ = Describe("Factory", func() {
})

When("given: custom structure", func() {
It("🧪 should: ", func() {
It("🧪 should: create observable without error", func() {
// Test_Just_CustomStructure
defer leaktest.Check(GinkgoT())()

Expand All @@ -745,7 +745,7 @@ var _ = Describe("Factory", func() {
})

When("given: channel", func() {
XIt("🧪 should: ???", func() {
XIt("🧪 should: ???", decorators.Label("sending chan not supported yet"), func() {
// Test_Just_Channel
defer leaktest.Check(GinkgoT())()

Expand Down Expand Up @@ -1039,7 +1039,7 @@ var _ = Describe("Factory", func() {

Context("NominatedRangeIterator", func() {
When("positive count", func() {
It("🧪 should: create observable", decorators.Label("need pointer receiver on T"), func() {
It("🧪 should: create observable", func() {
// Test_Range
defer leaktest.Check(GinkgoT())()

Expand Down
5 changes: 2 additions & 3 deletions rx/observable-operator-average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"

"github.com/snivilised/lorax/rx"
)
Expand Down Expand Up @@ -146,8 +145,8 @@ var _ = Describe("Observable operator", func() {
})

Context("Parallel/Error", func() {
Context("given: foo", func() {
XIt("🧪 should: ", decorators.Label("broken average.gatherNext"), func() {
Context("given: invalid input", func() {
It("🧪 should: result in error", func() {
// rxgo: Test_Observable_AverageFloat32_Parallel_Error
defer leaktest.Check(GinkgoT())()

Expand Down
34 changes: 17 additions & 17 deletions rx/observable-operator-reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"
"github.com/snivilised/lorax/enums"
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
XContext("Reduce", decorators.Label("broken by reduce acc"), func() {
Context("Reduce", func() {
When("using Range", func() {
It("🧪 should: compute reduction ok", func() {
// rxgo: Test_Observable_Reduce
Expand All @@ -45,9 +44,9 @@ var _ = Describe("Observable operator", func() {
obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce( // 1, 10000
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
return acc.V + num.Num(), nil
}).Reduce(
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
return acc.V + item.V, nil
},
)
rx.Assert(ctx, obs,
Expand Down Expand Up @@ -104,7 +103,7 @@ var _ = Describe("Observable operator", func() {

Context("Parallel", func() {
When("using Range", func() {
XIt("🧪 should: compute reduction ok", decorators.Label("repairing"), func() {
It("🧪 should: compute reduction ok", func() {
// rxgo: Test_Observable_Reduce_Parallel
defer leaktest.Check(GinkgoT())()

Expand All @@ -113,10 +112,10 @@ var _ = Describe("Observable operator", func() {

obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(6),
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
return acc.Num() + num.Num(), nil
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
return acc.V + item.V, nil
}, rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs,
Expand All @@ -131,7 +130,7 @@ var _ = Describe("Observable operator", func() {

Context("Parallel/Error", func() {
When("using Range", func() {
XIt("🧪 should: result in error", func() {
It("🧪 should: result in error", func() {
// rxgo: Test_Observable_Reduce_Parallel_Error
defer leaktest.Check(GinkgoT())()

Expand All @@ -141,11 +140,11 @@ var _ = Describe("Observable operator", func() {
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
if num.Num() == 1000 {
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
if item.V == 1000 {
return 0, errFoo
}
return acc.Num() + num.Num(), nil
return acc.V + item.V, nil
}, rx.WithContext[int](ctx), rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs,
Expand All @@ -157,21 +156,22 @@ var _ = Describe("Observable operator", func() {
})

When("error with error strategy", func() {
XIt("🧪 should: result in error", func() {
It("🧪 should: result in error", func() {
// rxgo: Test_Observable_Reduce_Parallel_WithErrorStrategy
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
if num.Num() == 1 {
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
if item.V == 1 {
return 0, errFoo
}
return acc.Num() + num.Num(), nil
return acc.V + item.V, nil
}, rx.WithCPUPool[int](), rx.WithErrorStrategy[int](enums.ContinueOnError),
)
rx.Assert(ctx, obs,
Expand Down
3 changes: 1 addition & 2 deletions rx/observable-operator-sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Sum", func() {
When("principle", func() {
XIt("🧪 should: return sum", decorators.Label("broken by reduce acc"), func() {
It("🧪 should: return sum", func() {
// rxgo: Test_Observable_SumFloat32_OnlyFloat32
defer leaktest.Check(GinkgoT())()

Expand Down
8 changes: 4 additions & 4 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (op *allOperator[T]) gatherNext(ctx context.Context, item Item[T],
}
}

// Average calculates the average of numbers emitted by an Observable and emits the result.
func (o *ObservableImpl[T]) Average(opts ...Option[T],
) Single[T] {
const (
Expand Down Expand Up @@ -1496,7 +1497,7 @@ func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSi
return optionalSingle(o.parent, o, func() operator[T] {
return &reduceOperator[T]{
apply: apply,
acc: Num[T](0), // acc needs to be a Num: bomb!!!
acc: Zero[T](),
empty: true,
}
}, forceSeq, bypassGather, opts...)
Expand All @@ -1512,7 +1513,7 @@ func (op *reduceOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
op.empty = false
v, err := op.apply(ctx, op.acc, item) // bomb!!!
v, err := op.apply(ctx, op.acc, item)

if err != nil {
Error[T](err).SendContext(ctx, dst)
Expand Down Expand Up @@ -2233,10 +2234,9 @@ func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) O

// Sum calculates the average emitted by an Observable and emits the result
func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T] {
options := parseOptions[T]()
options := parseOptions(opts...)
calc := options.calc()

// TODO: bomb!!!: do we use Num?
return o.Reduce(func(_ context.Context, acc, item Item[T]) (T, error) {
if calc == nil {
var (
Expand Down
Loading