diff --git a/rx/factory-connectable_test.go b/rx/factory-connectable_test.go index bea62f4..d721ed8 100644 --- a/rx/factory-connectable_test.go +++ b/rx/factory-connectable_test.go @@ -80,11 +80,10 @@ func testConnectableSingle[T any](obs Observable[T]) { defer cancel() eg, _ := errgroup.WithContext(ctx) - expected := []interface{}{1, 2, 3} - nbConsumers := 3 wg := sync.WaitGroup{} + wg.Add(nbConsumers) // Before Connect() is called we create multiple observers // We check all observers receive the same items @@ -114,9 +113,9 @@ func testConnectableSingle[T any](obs Observable[T]) { Expect(eg.Wait()).Error().To(BeNil()) } -func testConnectableComposed[T any](obs Observable[T]) { - obs = obs.Map(func(_ context.Context, v int) (int, error) { - return v + 1, nil +func testConnectableComposed[T any](obs Observable[T], increment Func[T]) { + obs = obs.Map(func(ctx context.Context, v T) (T, error) { + return increment(ctx, v) // expect client to implement with with v+1 }, WithPublishStrategy[T]()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/rx/factory.go b/rx/factory.go index b5c2e9e..f93f14c 100644 --- a/rx/factory.go +++ b/rx/factory.go @@ -270,15 +270,6 @@ func Just[T any](values ...T) func(opts ...Option[T]) Observable[T] { } } -// JustN creates an Observable with the provided numbers. -func JustN[T any](numbers ...int) func(opts ...Option[T]) Observable[T] { - return func(opts ...Option[T]) Observable[T] { - return &ObservableImpl[T]{ - iterable: newJustNIterable[T](numbers...)(opts...), - } - } -} - // JustSingle is like JustItem in that it is defined for a single item iterable // but behaves like Just in that it returns a func. // This is probably not required, just defined for experimental purposes for now. diff --git a/rx/factory_test.go b/rx/factory_test.go index ea64993..567da94 100644 --- a/rx/factory_test.go +++ b/rx/factory_test.go @@ -325,16 +325,16 @@ var _ = Describe("Factory", func() { defer leaktest.Check(GinkgoT())() obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { - next <- rx.Num[int](1) - next <- rx.Num[int](2) - next <- rx.Num[int](3) + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) }}).Map(func(_ context.Context, i int) (_ int, _ error) { return i + 1, nil }).Map(func(_ context.Context, i int) (_ int, _ error) { return i + 1, nil }) - rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]()) - rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) }) }) @@ -343,15 +343,15 @@ var _ = Describe("Factory", func() { defer leaktest.Check(GinkgoT())() obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { - next <- rx.Num[int](1) - next <- rx.Num[int](2) - next <- rx.Num[int](3) + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) }}).Map(func(_ context.Context, i int) (_ int, _ error) { return i + 1, nil }, rx.WithObservationStrategy[int](rx.Eager)).Map(func(_ context.Context, i int) (_ int, _ error) { return i + 1, nil }) - rx.Assert(context.Background(), obs, rx.HasNumbers[int]([]int{3, 4, 5}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) // In the case of an eager observation, we already consumed the items produced by Defer // So if we create another subscription, it will be empty rx.Assert(context.Background(), obs, rx.IsEmpty[int](), rx.HasNoError[int]()) diff --git a/rx/item.go b/rx/item.go index cc60438..0e9eb85 100644 --- a/rx/item.go +++ b/rx/item.go @@ -50,7 +50,7 @@ func Ch[T any](ch any) Item[T] { return Item[T]{C: c} } - panic("invalid ch type") + panic("temp: invalid ch type") } // Tick creates a type safe tick instance diff --git a/rx/iterable-justn.go b/rx/iterable-justn.go deleted file mode 100644 index 5b24eb8..0000000 --- a/rx/iterable-justn.go +++ /dev/null @@ -1,48 +0,0 @@ -package rx - -import ( - "context" -) - -type justNIterable[T any] struct { - items []int - opts []Option[T] -} - -func newJustNIterable[T any](numbers ...int) func(opts ...Option[T]) Iterable[T] { - // do we turn items back into ints? - // - return func(opts ...Option[T]) Iterable[T] { - return &justNIterable[T]{ - items: numbers, - opts: opts, - } - } -} - -func (i *justNIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { - option := parseOptions(append(i.opts, opts...)...) - next := option.buildChannel() - items := make([]int, 0, len(i.items)) - items = append(items, i.items...) - - go SendNumbers(option.buildContext(emptyContext), next, CloseChannel, - items..., - ) - - return next -} - -// SendItems is an utility function that send a list of items and indicate a -// strategy on whether to close the channel once the function completes. -func SendNumbers[T any](ctx context.Context, - ch chan<- Item[T], strategy CloseChannelStrategy, numbers ...int, -) { - if strategy == CloseChannel { - defer close(ch) - } - - for _, current := range numbers { - Num[T](current).SendContext(ctx, ch) - } -} diff --git a/rx/limiters.go b/rx/limiters.go new file mode 100644 index 0000000..7151a19 --- /dev/null +++ b/rx/limiters.go @@ -0,0 +1,27 @@ +package rx + +import ( + "math" + + "golang.org/x/exp/constraints" +) + +func LimitComparator[T constraints.Ordered](a, b T) int { + if a == b { + return 0 + } + + if a < b { + return -1 + } + + return 1 +} + +func MaxInitLimitInt() int { + return math.MinInt +} + +func MinInitLimitInt() int { + return math.MaxInt +} diff --git a/rx/observable-operator.go b/rx/observable-operator.go index 11c2ecc..daa44e9 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -2,11 +2,16 @@ package rx import ( "context" - "fmt" - "math" "reflect" ) +func isZero[T any](limit T) bool { + val := reflect.ValueOf(limit).Interface() + zero := reflect.Zero(reflect.TypeOf(limit)).Interface() + + return val != zero +} + func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { return o.iterable.Observe(opts...) } @@ -46,7 +51,7 @@ func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed { } // Max determines and emits the maximum-valued item emitted by an Observable according to a comparator. -func (o *ObservableImpl[T]) Max(comparator Comparator[T], +func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T], ) OptionalSingle[T] { const ( @@ -57,24 +62,16 @@ func (o *ObservableImpl[T]) Max(comparator Comparator[T], return optionalSingle(o.parent, o, func() operator[T] { return &maxOperator[T]{ comparator: comparator, + max: initLimit(), empty: true, - maxN: math.MinInt, } }, forceSeq, bypassGather, opts...) } -func isZero[T any](limit T) bool { - val := reflect.ValueOf(limit).Interface() - zero := reflect.Zero(reflect.TypeOf(limit)).Interface() - - return val != zero -} - type maxOperator[T any] struct { comparator Comparator[T] empty bool max T - maxN int } func (op *maxOperator[T]) next(_ context.Context, @@ -82,23 +79,7 @@ func (op *maxOperator[T]) next(_ context.Context, ) { op.empty = false - // TODO(check): if op.max == nil { - // op.max = item.V - // } else { - // if op.comparator(op.max, item.V) < 0 { - // op.max = item.V - // } - // } - - if item.IsNumeric() { - if item.N > op.maxN { - op.maxN = item.N - } - - return - } - - if isZero(op.max) || (op.comparator(op.max, item.V) < 0) { + if op.comparator(op.max, item.V) < 0 { op.max = item.V } } @@ -135,7 +116,7 @@ func (op *maxOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { // there should be a way for the client to implement these types of checks // themselves, probably by passing in a new function like comparator. // - Num[T](op.maxN).SendContext(ctx, dst) + Of(op.max).SendContext(ctx, dst) } } @@ -143,12 +124,14 @@ func (op *maxOperator[T]) gatherNext(ctx context.Context, item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { // TODO(check): op.next(ctx, Of(item.V.(*maxOperator).max), dst, operatorOptions)÷ - op.next(ctx, Num[T](item.N), dst, operatorOptions) + op.next(ctx, Of(item.V), dst, operatorOptions) } // Min determines and emits the minimum-valued item emitted by an Observable // according to a comparator. -func (o *ObservableImpl[T]) Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] { +func (o *ObservableImpl[T]) Min(comparator Comparator[T], initLimit InitLimit[T], + opts ...Option[T], +) OptionalSingle[T] { const ( forceSeq = false bypassGather = false @@ -156,15 +139,15 @@ func (o *ObservableImpl[T]) Min(comparator Comparator[T], opts ...Option[T]) Opt return optionalSingle(o.parent, o, func() operator[T] { return &minOperator[T]{ + min: initLimit(), comparator: comparator, empty: true, - minN: math.MaxInt, } }, forceSeq, bypassGather, opts...) } // Map transforms the items emitted by an Observable by applying a function to each item. -func (o *ObservableImpl[T]) Map(apply FuncIntM[T], opts ...Option[T]) Observable[T] { +func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T] { const ( forceSeq = false bypassGather = true @@ -178,17 +161,17 @@ func (o *ObservableImpl[T]) Map(apply FuncIntM[T], opts ...Option[T]) Observable } type mapOperator[T any] struct { - apply FuncIntM[T] + apply Func[T] } func (op *mapOperator[T]) next(ctx context.Context, item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { - if !item.IsNumeric() { - panic(fmt.Errorf("not a number (%v)", item)) - } - - res, err := op.apply(ctx, item.N) + // no longer needed: if !item.IsNumeric() { + // panic(fmt.Errorf("not a number (%v)", item)) + // } + // + res, err := op.apply(ctx, item.V) if err != nil { Error[T](err).SendContext(ctx, dst) @@ -197,7 +180,7 @@ func (op *mapOperator[T]) next(ctx context.Context, return } - Num[T](res).SendContext(ctx, dst) + Of(res).SendContext(ctx, dst) } func (op *mapOperator[T]) err(ctx context.Context, @@ -224,7 +207,6 @@ type minOperator[T any] struct { comparator Comparator[T] empty bool min T - minN int limit func(value T) bool // represents min or max } @@ -233,23 +215,7 @@ func (op *minOperator[T]) next(_ context.Context, ) { op.empty = false - // TODO(check): if op.min == nil { - // op.min = item.V - // } else { - // if op.comparator(op.min, item.V) > 0 { - // op.min = item.V - // } - // } - - if item.IsNumeric() { - if item.N < op.minN { - op.minN = item.N - } - - return - } - - if !isZero(op.min) || (op.comparator(op.min, item.V) > 0) { + if op.comparator(op.min, item.V) > 0 { op.min = item.V } } @@ -262,7 +228,7 @@ func (op *minOperator[T]) err(ctx context.Context, func (op *minOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { if !op.empty { - Num[T](op.minN).SendContext(ctx, dst) + Of(op.min).SendContext(ctx, dst) } } @@ -270,5 +236,5 @@ func (op *minOperator[T]) gatherNext(ctx context.Context, item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { // TODO(check): op.next(ctx, Of(item.V.(*minOperator).min), dst, operatorOptions) - op.next(ctx, Num[T](item.N), dst, operatorOptions) + op.next(ctx, Of(item.V), dst, operatorOptions) } diff --git a/rx/observable.go b/rx/observable.go index 479bc20..4a2e836 100644 --- a/rx/observable.go +++ b/rx/observable.go @@ -6,29 +6,16 @@ import ( "sync/atomic" "github.com/emirpasic/gods/trees/binaryheap" - "golang.org/x/exp/constraints" ) -func LimitComparator[T constraints.Ordered](a, b T) int { - if a == b { - return 0 - } - - if a < b { - return -1 - } - - return 1 -} - type Observable[T any] interface { Iterable[T] Connect(ctx context.Context) (context.Context, Disposable) - Max(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] - Map(apply FuncIntM[T], opts ...Option[T]) Observable[T] - Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] + Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T] + Map(apply Func[T], opts ...Option[T]) Observable[T] + Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T] Run(opts ...Option[T]) Disposed } diff --git a/rx/option-single_test.go b/rx/option-single_test.go index 130e47c..ee45119 100644 --- a/rx/option-single_test.go +++ b/rx/option-single_test.go @@ -82,7 +82,7 @@ var _ = Describe("OptionSingle", func() { }) Context("Map", Ordered, func() { - var increment rx.FuncIntM[int] + var increment rx.Func[int] BeforeAll(func() { increment = func(_ context.Context, i int) (int, error) { @@ -91,26 +91,12 @@ var _ = Describe("OptionSingle", func() { }) When("Just", func() { - Context("foo ???", func() { - XIt("🧪 should: Map the single entity iterator", func() { - defer leaktest.Check(GinkgoT())() - - // This can't work because Max has been modified to work only - // with numbers (JustN, FuncN, ...) - // - single := rx.Just(42)().Max(rx.LimitComparator).Map(increment) - rx.Assert(context.Background(), single, rx.HasItem(43), rx.HasNoError[int]()) - }) - }) - }) - - When("JustN", func() { Context("foo ???", func() { It("🧪 should: Map the single entity iterator", func() { defer leaktest.Check(GinkgoT())() - single := rx.JustN[int](42)().Max(rx.LimitComparator).Map(increment) - rx.Assert(context.Background(), single, rx.HasNumber[int](43), rx.HasNoError[int]()) + single := rx.Just(42)().Max(rx.LimitComparator, rx.MaxInitLimitInt).Map(increment) + rx.Assert(context.Background(), single, rx.HasItem(43), rx.HasNoError[int]()) }) }) @@ -118,8 +104,8 @@ var _ = Describe("OptionSingle", func() { It("🧪 should: turn the sequence into a Single iterable", func() { defer leaktest.Check(GinkgoT())() - single := rx.JustN[int](42, 48)().Max(rx.LimitComparator) - rx.Assert(context.Background(), single, rx.HasNumber[int](48), rx.HasNoError[int]()) + single := rx.Just(42, 48)().Max(rx.LimitComparator, rx.MaxInitLimitInt) + rx.Assert(context.Background(), single, rx.HasItem(48), rx.HasNoError[int]()) }) }) @@ -127,8 +113,8 @@ var _ = Describe("OptionSingle", func() { It("🧪 should: turn the sequence into a Single iterable", func() { defer leaktest.Check(GinkgoT())() - single := rx.JustN[int](42, 48)().Min(rx.LimitComparator) - rx.Assert(context.Background(), single, rx.HasNumber[int](42), rx.HasNoError[int]()) + single := rx.Just(42, 48)().Min(rx.LimitComparator, rx.MinInitLimitInt) + rx.Assert(context.Background(), single, rx.HasItem(42), rx.HasNoError[int]()) }) }) }) diff --git a/rx/optional-single.go b/rx/optional-single.go index f8ff0e6..6c52950 100644 --- a/rx/optional-single.go +++ b/rx/optional-single.go @@ -2,14 +2,13 @@ package rx import ( "context" - "fmt" ) // OptionalSingle is an optional single. type OptionalSingle[T any] interface { Iterable[T] Get(opts ...Option[T]) (Item[T], error) - Map(apply FuncIntM[T], opts ...Option[T]) OptionalSingle[T] + Map(apply Func[T], opts ...Option[T]) OptionalSingle[T] Run(opts ...Option[T]) Disposed } @@ -39,7 +38,7 @@ func (o *OptionalSingleImpl[T]) Get(opts ...Option[T]) (Item[T], error) { for { select { case <-ctx.Done(): - return Item[T]{}, ctx.Err() + return optionalSingleEmpty, ctx.Err() case v, ok := <-observe: if !ok { return optionalSingleEmpty, nil @@ -51,7 +50,7 @@ func (o *OptionalSingleImpl[T]) Get(opts ...Option[T]) (Item[T], error) { } // Map transforms the items emitted by an OptionalSingle by applying a function to each item. -func (o *OptionalSingleImpl[T]) Map(apply FuncIntM[T], opts ...Option[T]) OptionalSingle[T] { +func (o *OptionalSingleImpl[T]) Map(apply Func[T], opts ...Option[T]) OptionalSingle[T] { const ( forceSeq = false bypassGather = true @@ -68,17 +67,17 @@ func (o *OptionalSingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { } type mapOperatorOptionalSingle[T any] struct { - apply FuncIntM[T] + apply Func[T] } func (op *mapOperatorOptionalSingle[T]) next(ctx context.Context, item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { - if !item.IsNumeric() { - panic(fmt.Errorf("not a number (%v)", item)) - } - - res, err := op.apply(ctx, item.N) + // no longer needed: if !item.IsNumeric() { + // panic(fmt.Errorf("not a number (%v)", item)) + // } + // + res, err := op.apply(ctx, item.V) if err != nil { dst <- Error[T](err) @@ -87,7 +86,7 @@ func (op *mapOperatorOptionalSingle[T]) next(ctx context.Context, return } - dst <- Num[T](res) + dst <- Of(res) } func (op *mapOperatorOptionalSingle[T]) err(ctx context.Context, diff --git a/rx/types.go b/rx/types.go index aca323a..9396131 100644 --- a/rx/types.go +++ b/rx/types.go @@ -13,6 +13,12 @@ type ( // - A negative value if the first argument is less than the second // - A positive value if the first argument is greater than the second Comparator[T any] func(T, T) int + + // InitLimit defines a function to be used with Min and Max operators that defines + // a limit initialiser, that is to say, for Max we need to initialise the internal + // maximum reference point to be minimum value for type T and the reverse for the + // Min operator. + InitLimit[T any] func() T // ItemToObservable defines a function that computes an observable from an item. ItemToObservable[T any] func(Item[T]) Observable[T] // ErrorToObservable defines a function that transforms an observable from an error. @@ -25,7 +31,7 @@ type ( // To solve the problem of being able to map values across different // types, the FuncIntM type will be modified to take an extra type // parameter 'O' which represents the 'Other' type, ie we map from - // a value of type 'T' to a value of type 'O' (FuncIntM[T, O any]). With + // a value of type 'T' to a value of type 'O' (Func[T, O any]). With // this in place, we should be able to define a pipeline that starts // off with values of type T, and end up with values of type O via a // Map operator. We'll have to make sure that any intermediate @@ -40,7 +46,7 @@ type ( // With generics, Map is a very awkward operator that needs special attention. // In the short term, what we can say is that the base functionality only // allows mapping to different values within the same type. - FuncIntM[T any] func(context.Context, int) (int, error) + // FuncIntM[T any] func(context.Context, int) (int, error) // FuncN defines a function that computes a value from N input values. FuncN[T any] func(...T) T // ErrorFunc defines a function that computes a value from an error.