Skip to content

Commit

Permalink
ref(rx): revert numeric map (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 4, 2024
1 parent fe31256 commit 958b895
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 183 deletions.
9 changes: 4 additions & 5 deletions rx/factory-connectable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ func testConnectableSingle[T any](obs Observable[T]) {
defer cancel()

eg, _ := errgroup.WithContext(ctx)

expected := []interface{}{1, 2, 3}

nbConsumers := 3
wg := sync.WaitGroup{}

wg.Add(nbConsumers)
// Before Connect() is called we create multiple observers
// We check all observers receive the same items
Expand Down Expand Up @@ -114,9 +113,9 @@ func testConnectableSingle[T any](obs Observable[T]) {
Expect(eg.Wait()).Error().To(BeNil())
}

func testConnectableComposed[T any](obs Observable[T]) {
obs = obs.Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
func testConnectableComposed[T any](obs Observable[T], increment Func[T]) {
obs = obs.Map(func(ctx context.Context, v T) (T, error) {
return increment(ctx, v) // expect client to implement with with v+1
}, WithPublishStrategy[T]())

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
9 changes: 0 additions & 9 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,6 @@ func Just[T any](values ...T) func(opts ...Option[T]) Observable[T] {
}
}

// JustN creates an Observable with the provided numbers.
func JustN[T any](numbers ...int) func(opts ...Option[T]) Observable[T] {
return func(opts ...Option[T]) Observable[T] {
return &ObservableImpl[T]{
iterable: newJustNIterable[T](numbers...)(opts...),
}
}
}

// JustSingle is like JustItem in that it is defined for a single item iterable
// but behaves like Just in that it returns a func.
// This is probably not required, just defined for experimental purposes for now.
Expand Down
18 changes: 9 additions & 9 deletions rx/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,16 @@ var _ = Describe("Factory", func() {
defer leaktest.Check(GinkgoT())()

obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
next <- rx.Num[int](1)
next <- rx.Num[int](2)
next <- rx.Num[int](3)
next <- rx.Of(1)
next <- rx.Of(2)
next <- rx.Of(3)
}}).Map(func(_ context.Context, i int) (_ int, _ error) {
return i + 1, nil
}).Map(func(_ context.Context, i int) (_ int, _ error) {
return i + 1, nil
})
rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]())
rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]())
rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]())
rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]())
})
})

Expand All @@ -343,15 +343,15 @@ var _ = Describe("Factory", func() {
defer leaktest.Check(GinkgoT())()

obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
next <- rx.Num[int](1)
next <- rx.Num[int](2)
next <- rx.Num[int](3)
next <- rx.Of(1)
next <- rx.Of(2)
next <- rx.Of(3)
}}).Map(func(_ context.Context, i int) (_ int, _ error) {
return i + 1, nil
}, rx.WithObservationStrategy[int](rx.Eager)).Map(func(_ context.Context, i int) (_ int, _ error) {
return i + 1, nil
})
rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]())
rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]())
// In the case of an eager observation, we already consumed the items produced by Defer
// So if we create another subscription, it will be empty
rx.Assert(context.Background(), obs, rx.IsEmpty[int](), rx.HasNoError[int]())
Expand Down
2 changes: 1 addition & 1 deletion rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Ch[T any](ch any) Item[T] {
return Item[T]{C: c}
}

panic("invalid ch type")
panic("temp: invalid ch type")
}

// Tick creates a type safe tick instance
Expand Down
48 changes: 0 additions & 48 deletions rx/iterable-justn.go

This file was deleted.

27 changes: 27 additions & 0 deletions rx/limiters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rx

import (
"math"

"golang.org/x/exp/constraints"
)

func LimitComparator[T constraints.Ordered](a, b T) int {
if a == b {
return 0
}

if a < b {
return -1
}

return 1
}

func MaxInitLimitInt() int {
return math.MinInt
}

func MinInitLimitInt() int {
return math.MaxInt
}
88 changes: 27 additions & 61 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package rx

import (
"context"
"fmt"
"math"
"reflect"
)

func isZero[T any](limit T) bool {
val := reflect.ValueOf(limit).Interface()
zero := reflect.Zero(reflect.TypeOf(limit)).Interface()

return val != zero
}

func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] {
return o.iterable.Observe(opts...)
}
Expand Down Expand Up @@ -46,7 +51,7 @@ func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed {
}

// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
func (o *ObservableImpl[T]) Max(comparator Comparator[T],
func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T],
opts ...Option[T],
) OptionalSingle[T] {
const (
Expand All @@ -57,48 +62,24 @@ func (o *ObservableImpl[T]) Max(comparator Comparator[T],
return optionalSingle(o.parent, o, func() operator[T] {
return &maxOperator[T]{
comparator: comparator,
max: initLimit(),
empty: true,
maxN: math.MinInt,
}
}, forceSeq, bypassGather, opts...)
}

func isZero[T any](limit T) bool {
val := reflect.ValueOf(limit).Interface()
zero := reflect.Zero(reflect.TypeOf(limit)).Interface()

return val != zero
}

type maxOperator[T any] struct {
comparator Comparator[T]
empty bool
max T
maxN int
}

func (op *maxOperator[T]) next(_ context.Context,
item Item[T], _ chan<- Item[T], _ operatorOptions[T],
) {
op.empty = false

// TODO(check): if op.max == nil {
// op.max = item.V
// } else {
// if op.comparator(op.max, item.V) < 0 {
// op.max = item.V
// }
// }

if item.IsNumeric() {
if item.N > op.maxN {
op.maxN = item.N
}

return
}

if isZero(op.max) || (op.comparator(op.max, item.V) < 0) {
if op.comparator(op.max, item.V) < 0 {
op.max = item.V
}
}
Expand Down Expand Up @@ -135,36 +116,38 @@ func (op *maxOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
// there should be a way for the client to implement these types of checks
// themselves, probably by passing in a new function like comparator.
//
Num[T](op.maxN).SendContext(ctx, dst)
Of(op.max).SendContext(ctx, dst)
}
}

func (op *maxOperator[T]) gatherNext(ctx context.Context,
item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
// TODO(check): op.next(ctx, Of(item.V.(*maxOperator).max), dst, operatorOptions)÷
op.next(ctx, Num[T](item.N), dst, operatorOptions)
op.next(ctx, Of(item.V), dst, operatorOptions)
}

// Min determines and emits the minimum-valued item emitted by an Observable
// according to a comparator.
func (o *ObservableImpl[T]) Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] {
func (o *ObservableImpl[T]) Min(comparator Comparator[T], initLimit InitLimit[T],
opts ...Option[T],
) OptionalSingle[T] {
const (
forceSeq = false
bypassGather = false
)

return optionalSingle(o.parent, o, func() operator[T] {
return &minOperator[T]{
min: initLimit(),
comparator: comparator,
empty: true,
minN: math.MaxInt,
}
}, forceSeq, bypassGather, opts...)
}

// Map transforms the items emitted by an Observable by applying a function to each item.
func (o *ObservableImpl[T]) Map(apply FuncIntM[T], opts ...Option[T]) Observable[T] {
func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = false
bypassGather = true
Expand All @@ -178,17 +161,17 @@ func (o *ObservableImpl[T]) Map(apply FuncIntM[T], opts ...Option[T]) Observable
}

type mapOperator[T any] struct {
apply FuncIntM[T]
apply Func[T]
}

func (op *mapOperator[T]) next(ctx context.Context,
item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
if !item.IsNumeric() {
panic(fmt.Errorf("not a number (%v)", item))
}

res, err := op.apply(ctx, item.N)
// no longer needed: if !item.IsNumeric() {
// panic(fmt.Errorf("not a number (%v)", item))
// }
//
res, err := op.apply(ctx, item.V)

if err != nil {
Error[T](err).SendContext(ctx, dst)
Expand All @@ -197,7 +180,7 @@ func (op *mapOperator[T]) next(ctx context.Context,
return
}

Num[T](res).SendContext(ctx, dst)
Of(res).SendContext(ctx, dst)
}

func (op *mapOperator[T]) err(ctx context.Context,
Expand All @@ -224,7 +207,6 @@ type minOperator[T any] struct {
comparator Comparator[T]
empty bool
min T
minN int
limit func(value T) bool // represents min or max
}

Expand All @@ -233,23 +215,7 @@ func (op *minOperator[T]) next(_ context.Context,
) {
op.empty = false

// TODO(check): if op.min == nil {
// op.min = item.V
// } else {
// if op.comparator(op.min, item.V) > 0 {
// op.min = item.V
// }
// }

if item.IsNumeric() {
if item.N < op.minN {
op.minN = item.N
}

return
}

if !isZero(op.min) || (op.comparator(op.min, item.V) > 0) {
if op.comparator(op.min, item.V) > 0 {
op.min = item.V
}
}
Expand All @@ -262,13 +228,13 @@ func (op *minOperator[T]) err(ctx context.Context,

func (op *minOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
if !op.empty {
Num[T](op.minN).SendContext(ctx, dst)
Of(op.min).SendContext(ctx, dst)
}
}

func (op *minOperator[T]) gatherNext(ctx context.Context,
item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
// TODO(check): op.next(ctx, Of(item.V.(*minOperator).min), dst, operatorOptions)
op.next(ctx, Num[T](item.N), dst, operatorOptions)
op.next(ctx, Of(item.V), dst, operatorOptions)
}
Loading

0 comments on commit 958b895

Please sign in to comment.