diff --git a/rx/assert.go b/rx/assert.go index 3c0c437..c7fdc5a 100644 --- a/rx/assert.go +++ b/rx/assert.go @@ -9,32 +9,32 @@ import ( ) // AssertPredicate is a custom predicate based on the items. -type AssertPredicate[I any] func(items []I) error +type AssertPredicate[T any] func(items []T) error // RxAssert lists the Observable assertions. -type RxAssert[I any] interface { //nolint:revive // foo - apply(*rxAssert[I]) - itemsToBeChecked() (bool, []I) - itemsNoOrderedToBeChecked() (bool, []I) +type RxAssert[T any] interface { //nolint:revive // foo + apply(*rxAssert[T]) + itemsToBeChecked() (bool, []T) + itemsNoOrderedToBeChecked() (bool, []T) noItemsToBeChecked() bool someItemsToBeChecked() bool raisedErrorToBeChecked() (bool, error) raisedErrorsToBeChecked() (bool, []error) raisedAnErrorToBeChecked() (bool, error) notRaisedErrorToBeChecked() bool - itemToBeChecked() (bool, I) - noItemToBeChecked() (bool, I) - customPredicatesToBeChecked() (bool, []AssertPredicate[I]) + itemToBeChecked() (bool, T) + noItemToBeChecked() (bool, T) + customPredicatesToBeChecked() (bool, []AssertPredicate[T]) } -type rxAssert[I any] struct { - f func(*rxAssert[I]) +type rxAssert[T any] struct { + f func(*rxAssert[T]) checkHasItems bool checkHasNoItems bool checkHasSomeItems bool - items []I + items []T checkHasItemsNoOrder bool - itemsNoOrder []I + itemsNoOrder []T checkHasRaisedError bool err error checkHasRaisedErrors bool @@ -42,67 +42,67 @@ type rxAssert[I any] struct { checkHasRaisedAnError bool checkHasNotRaisedError bool checkHasItem bool - item I + item T checkHasNoItem bool checkHasCustomPredicate bool - customPredicates []AssertPredicate[I] + customPredicates []AssertPredicate[T] } -func (ass *rxAssert[I]) apply(do *rxAssert[I]) { +func (ass *rxAssert[T]) apply(do *rxAssert[T]) { ass.f(do) } -func (ass *rxAssert[I]) itemsToBeChecked() (b bool, i []I) { +func (ass *rxAssert[T]) itemsToBeChecked() (b bool, i []T) { return ass.checkHasItems, ass.items } -func (ass *rxAssert[I]) itemsNoOrderedToBeChecked() (b bool, i []I) { +func (ass *rxAssert[T]) itemsNoOrderedToBeChecked() (b bool, i []T) { return ass.checkHasItemsNoOrder, ass.itemsNoOrder } -func (ass *rxAssert[I]) noItemsToBeChecked() bool { +func (ass *rxAssert[T]) noItemsToBeChecked() bool { return ass.checkHasNoItems } -func (ass *rxAssert[I]) someItemsToBeChecked() bool { +func (ass *rxAssert[T]) someItemsToBeChecked() bool { return ass.checkHasSomeItems } -func (ass *rxAssert[I]) raisedErrorToBeChecked() (bool, error) { +func (ass *rxAssert[T]) raisedErrorToBeChecked() (bool, error) { return ass.checkHasRaisedError, ass.err } -func (ass *rxAssert[I]) raisedErrorsToBeChecked() (bool, []error) { +func (ass *rxAssert[T]) raisedErrorsToBeChecked() (bool, []error) { return ass.checkHasRaisedErrors, ass.errs } -func (ass *rxAssert[I]) raisedAnErrorToBeChecked() (bool, error) { +func (ass *rxAssert[T]) raisedAnErrorToBeChecked() (bool, error) { return ass.checkHasRaisedAnError, ass.err } -func (ass *rxAssert[I]) notRaisedErrorToBeChecked() bool { +func (ass *rxAssert[T]) notRaisedErrorToBeChecked() bool { return ass.checkHasNotRaisedError } -func (ass *rxAssert[I]) itemToBeChecked() (b bool, i I) { +func (ass *rxAssert[T]) itemToBeChecked() (b bool, i T) { return ass.checkHasItem, ass.item } -func (ass *rxAssert[I]) noItemToBeChecked() (b bool, i I) { +func (ass *rxAssert[T]) noItemToBeChecked() (b bool, i T) { return ass.checkHasNoItem, ass.item } -func (ass *rxAssert[I]) customPredicatesToBeChecked() (bool, []AssertPredicate[I]) { +func (ass *rxAssert[T]) customPredicatesToBeChecked() (bool, []AssertPredicate[T]) { return ass.checkHasCustomPredicate, ass.customPredicates } -func newAssertion[I any](f func(*rxAssert[I])) *rxAssert[I] { - return &rxAssert[I]{ +func newAssertion[T any](f func(*rxAssert[T])) *rxAssert[T] { + return &rxAssert[T]{ f: f, } } -func parseAssertions[I any](assertions ...RxAssert[I]) RxAssert[I] { - ass := new(rxAssert[I]) +func parseAssertions[T any](assertions ...RxAssert[T]) RxAssert[T] { + ass := new(rxAssert[T]) for _, assertion := range assertions { assertion.apply(ass) @@ -111,9 +111,9 @@ func parseAssertions[I any](assertions ...RxAssert[I]) RxAssert[I] { return ass } -func Assert[I any](ctx context.Context, iterable Iterable[I], assertions ...RxAssert[I]) { +func Assert[T any](ctx context.Context, iterable Iterable[T], assertions ...RxAssert[T]) { ass := parseAssertions(assertions...) - got := make([]I, 0) + got := make([]T, 0) errs := make([]error, 0) observe := iterable.Observe() @@ -211,51 +211,51 @@ loop: } } -func HasItems[I any](expectedItems []I) RxAssert[I] { - return newAssertion(func(ra *rxAssert[I]) { +func HasItems[T any](expectedItems []T) RxAssert[T] { + return newAssertion(func(ra *rxAssert[T]) { ra.checkHasItems = true ra.items = expectedItems }) } // HasItem checks if a single or optional single has a specific item. -func HasItem[I any](i I) RxAssert[I] { - return newAssertion(func(a *rxAssert[I]) { +func HasItem[T any](i T) RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { a.checkHasItem = true a.item = i }) } // IsNotEmpty checks that the observable produces some items. -func IsNotEmpty[I any]() RxAssert[I] { - return newAssertion(func(a *rxAssert[I]) { +func IsNotEmpty[T any]() RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { a.checkHasSomeItems = true }) } // IsEmpty checks that the observable has not produce any item. -func IsEmpty[I any]() RxAssert[I] { - return newAssertion(func(a *rxAssert[I]) { +func IsEmpty[T any]() RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { a.checkHasNoItems = true }) } -func HasError[I any](err error) RxAssert[I] { - return newAssertion(func(a *rxAssert[I]) { +func HasError[T any](err error) RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { a.checkHasRaisedError = true a.err = err }) } // HasAnError checks that the observable has produce an error. -func HasAnError[I any]() RxAssert[I] { - return newAssertion(func(a *rxAssert[I]) { +func HasAnError[T any]() RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { a.checkHasRaisedAnError = true }) } -func HasNoError[I any]() RxAssert[I] { - return newAssertion(func(ra *rxAssert[I]) { +func HasNoError[T any]() RxAssert[T] { + return newAssertion(func(ra *rxAssert[T]) { ra.checkHasNotRaisedError = true }) } diff --git a/rx/factory.go b/rx/factory.go index cc81ee2..cc660e6 100644 --- a/rx/factory.go +++ b/rx/factory.go @@ -2,37 +2,37 @@ package rx // Amb takes several Observables, emit all of the items from only the first of these Observables // to emit an item or notification. -func Amb[I any](observables []Observable[I], opts ...Option[I]) Observable[I] { +func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] { _, _ = observables, opts panic("Amb: NOT-IMPL") } // Empty creates an Observable with no item and terminate immediately. -func Empty[I any]() Observable[I] { - next := make(chan Item[I]) +func Empty[T any]() Observable[T] { + next := make(chan Item[T]) close(next) - return &ObservableImpl[I]{ + return &ObservableImpl[T]{ iterable: newChannelIterable(next), } } // FromChannel creates a cold observable from a channel. -func FromChannel[I any](next <-chan Item[I], opts ...Option[I]) Observable[I] { +func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T] { option := parseOptions(opts...) ctx := option.buildContext(emptyContext) - return &ObservableImpl[I]{ + return &ObservableImpl[T]{ parent: ctx, iterable: newChannelIterable(next, opts...), } } // Just creates an Observable with the provided items. -func Just[I any](values ...I) func(opts ...Option[I]) Observable[I] { - return func(opts ...Option[I]) Observable[I] { - return &ObservableImpl[I]{ +func Just[T any](values ...T) func(opts ...Option[T]) Observable[T] { + return func(opts ...Option[T]) Observable[T] { + return &ObservableImpl[T]{ iterable: newJustIterable(values...)(opts...), } } @@ -41,28 +41,28 @@ func Just[I any](values ...I) func(opts ...Option[I]) Observable[I] { // JustSingle is like JustItem in that it is defined for a single item iterable // but behaves like Just in that it returns a func. // This is probably not required, just defined for experimental purposes for now. -func JustSingle[I any](value I, opts ...Option[I]) func(opts ...Option[I]) Single[I] { - return func(_ ...Option[I]) Single[I] { - return &SingleImpl[I]{ +func JustSingle[T any](value T, opts ...Option[T]) func(opts ...Option[T]) Single[T] { + return func(_ ...Option[T]) Single[T] { + return &SingleImpl[T]{ iterable: newJustIterable(value)(opts...), } } } // JustItem creates a single from one item. -func JustItem[I any](value I, opts ...Option[I]) Single[I] { +func JustItem[T any](value T, opts ...Option[T]) Single[T] { // Why does this not return a func, but Just does? // - return &SingleImpl[I]{ + return &SingleImpl[T]{ iterable: newJustIterable(value)(opts...), } } // Never creates an Observable that emits no items and does not terminate. -func Never[I any]() Observable[I] { - next := make(chan Item[I]) +func Never[T any]() Observable[T] { + next := make(chan Item[T]) - return &ObservableImpl[I]{ + return &ObservableImpl[T]{ iterable: newChannelIterable(next), } } diff --git a/rx/item.go b/rx/item.go index 99d80a7..821c5ff 100644 --- a/rx/item.go +++ b/rx/item.go @@ -8,16 +8,16 @@ import ( type ( // Item is a wrapper having either a value or an error. // - Item[I any] struct { - V I + Item[T any] struct { + V T E error } // TimestampItem attach a timestamp to an item. // - TimestampItem[I any] struct { + TimestampItem[T any] struct { Timestamp time.Time - V I + V T } // CloseChannelStrategy indicates a strategy on whether to close a channel. @@ -32,21 +32,21 @@ const ( ) // Of creates an item from a value. -func Of[I any](v I) Item[I] { - return Item[I]{V: v} +func Of[T any](v T) Item[T] { + return Item[T]{V: v} } // Error creates an item from an error. -func Error[I any](err error) Item[I] { - return Item[I]{E: err} +func Error[T any](err error) Item[T] { + return Item[T]{E: err} } // SendItems is an utility function that send a list of items and indicate a // strategy on whether to close the channel once the function completes. // This method has been derived from the original SendItems. // (does not support channels or slice) -func SendItems[I any](ctx context.Context, - ch chan<- Item[I], strategy CloseChannelStrategy, items ...Item[I], +func SendItems[T any](ctx context.Context, + ch chan<- Item[T], strategy CloseChannelStrategy, items ...Item[T], ) { if strategy == CloseChannel { defer close(ch) @@ -55,25 +55,25 @@ func SendItems[I any](ctx context.Context, sendItems(ctx, ch, items...) } -func sendItems[I any](ctx context.Context, ch chan<- Item[I], items ...Item[I]) { +func sendItems[T any](ctx context.Context, ch chan<- Item[T], items ...Item[T]) { for _, item := range items { item.SendContext(ctx, ch) } } // IsError checks if an item is an error. -func (i Item[I]) IsError() bool { +func (i Item[T]) IsError() bool { return i.E != nil } // SendBlocking sends an item and blocks until it is sent. -func (i Item[I]) SendBlocking(ch chan<- Item[I]) { +func (i Item[T]) SendBlocking(ch chan<- Item[T]) { ch <- i } // SendContext sends an item and blocks until it is sent or a context canceled. // It returns a boolean to indicate whether the item was sent. -func (i Item[I]) SendContext(ctx context.Context, ch chan<- Item[I]) bool { +func (i Item[T]) SendContext(ctx context.Context, ch chan<- Item[T]) bool { select { case <-ctx.Done(): // Context's done channel has the highest priority return false @@ -87,7 +87,7 @@ func (i Item[I]) SendContext(ctx context.Context, ch chan<- Item[I]) bool { } } -func (i Item[I]) SendOpContext(ctx context.Context, ch any) bool { // Item[operator[I]] +func (i Item[T]) SendOpContext(ctx context.Context, ch any) bool { // Item[operator[T]] _ = ctx _ = ch @@ -96,7 +96,7 @@ func (i Item[I]) SendOpContext(ctx context.Context, ch any) bool { // Item[opera // SendNonBlocking sends an item without blocking. // It returns a boolean to indicate whether the item was sent. -func (i Item[I]) SendNonBlocking(ch chan<- Item[I]) bool { +func (i Item[T]) SendNonBlocking(ch chan<- Item[T]) bool { select { default: return false diff --git a/rx/iterable-channel.go b/rx/iterable-channel.go index e6f1442..b155c53 100644 --- a/rx/iterable-channel.go +++ b/rx/iterable-channel.go @@ -5,24 +5,24 @@ import ( "sync" ) -type channelIterable[I any] struct { - next <-chan Item[I] - opts []Option[I] - subscribers []chan Item[I] +type channelIterable[T any] struct { + next <-chan Item[T] + opts []Option[T] + subscribers []chan Item[T] mutex sync.RWMutex producerAlreadyCreated bool } -func newChannelIterable[I any](next <-chan Item[I], opts ...Option[I]) Iterable[I] { - return &channelIterable[I]{ +func newChannelIterable[T any](next <-chan Item[T], opts ...Option[T]) Iterable[T] { + return &channelIterable[T]{ next: next, - subscribers: make([]chan Item[I], 0), + subscribers: make([]chan Item[T], 0), opts: opts, } } -func (i *channelIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] { - mergedOptions := make([]Option[I], 0, len(opts)) +func (i *channelIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { + mergedOptions := make([]Option[T], 0, len(opts)) copy(mergedOptions, opts) mergedOptions = append(mergedOptions, opts...) @@ -46,7 +46,7 @@ func (i *channelIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] { return ch } -func (i *channelIterable[I]) connect(ctx context.Context) { +func (i *channelIterable[T]) connect(ctx context.Context) { i.mutex.Lock() if !i.producerAlreadyCreated { go i.produce(ctx) @@ -55,7 +55,7 @@ func (i *channelIterable[I]) connect(ctx context.Context) { i.mutex.Unlock() } -func (i *channelIterable[I]) produce(ctx context.Context) { +func (i *channelIterable[T]) produce(ctx context.Context) { defer func() { i.mutex.RLock() diff --git a/rx/iterable-factory.go b/rx/iterable-factory.go index b03bf97..e8daf06 100644 --- a/rx/iterable-factory.go +++ b/rx/iterable-factory.go @@ -1,13 +1,13 @@ package rx -type factoryIterable[I any] struct { - factory func(opts ...Option[I]) <-chan Item[I] +type factoryIterable[T any] struct { + factory func(opts ...Option[T]) <-chan Item[T] } -func newFactoryIterable[I any](factory func(opts ...Option[I]) <-chan Item[I]) Iterable[I] { - return &factoryIterable[I]{factory: factory} +func newFactoryIterable[T any](factory func(opts ...Option[T]) <-chan Item[T]) Iterable[T] { + return &factoryIterable[T]{factory: factory} } -func (i *factoryIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] { +func (i *factoryIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { return i.factory(opts...) } diff --git a/rx/iterable-just.go b/rx/iterable-just.go index 20727f2..25cbdb6 100644 --- a/rx/iterable-just.go +++ b/rx/iterable-just.go @@ -1,23 +1,23 @@ package rx -type justIterable[I any] struct { - items []I - opts []Option[I] +type justIterable[T any] struct { + items []T + opts []Option[T] } -func newJustIterable[I any](items ...I) func(opts ...Option[I]) Iterable[I] { - return func(opts ...Option[I]) Iterable[I] { - return &justIterable[I]{ +func newJustIterable[T any](items ...T) func(opts ...Option[T]) Iterable[T] { + return func(opts ...Option[T]) Iterable[T] { + return &justIterable[T]{ items: items, opts: opts, } } } -func (i *justIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] { +func (i *justIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() - items := make([]Item[I], 0, len(i.items)) + items := make([]Item[T], 0, len(i.items)) for _, item := range i.items { items = append(items, Of(item)) diff --git a/rx/iterable.go b/rx/iterable.go index e51b8c0..79981d5 100644 --- a/rx/iterable.go +++ b/rx/iterable.go @@ -1,6 +1,6 @@ package rx // Iterable is the basic type that can be observed. -type Iterable[I any] interface { - Observe(opts ...Option[I]) <-chan Item[I] +type Iterable[T any] interface { + Observe(opts ...Option[T]) <-chan Item[T] } diff --git a/rx/observable-operator.go b/rx/observable-operator.go index 651722c..3b0ffea 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -5,42 +5,42 @@ import ( "reflect" ) -func (o *ObservableImpl[I]) Observe(opts ...Option[I]) <-chan Item[I] { +func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { return o.iterable.Observe(opts...) } // Max determines and emits the maximum-valued item emitted by an Observable according to a comparator. -func (o *ObservableImpl[I]) Max(comparator Comparator[I], - opts ...Option[I], -) OptionalSingle[I] { +func (o *ObservableImpl[T]) Max(comparator Comparator[T], + opts ...Option[T], +) OptionalSingle[T] { const ( forceSeq = false bypassGather = false ) - return optionalSingle(o.parent, o, func() operator[I] { - return &maxOperator[I]{ + return optionalSingle(o.parent, o, func() operator[T] { + return &maxOperator[T]{ comparator: comparator, empty: true, } }, forceSeq, bypassGather, opts...) } -func isLimitDefined[I any](limit I) bool { +func isLimitDefined[T any](limit T) bool { val := reflect.ValueOf(limit).Interface() zero := reflect.Zero(reflect.TypeOf(limit)).Interface() return val != zero } -type maxOperator[I any] struct { - comparator Comparator[I] +type maxOperator[T any] struct { + comparator Comparator[T] empty bool - max I + max T } -func (op *maxOperator[I]) next(_ context.Context, - item Item[I], _ chan<- Item[I], _ operatorOptions[I], +func (op *maxOperator[T]) next(_ context.Context, + item Item[T], _ chan<- Item[T], _ operatorOptions[T], ) { op.empty = false @@ -56,20 +56,20 @@ func (op *maxOperator[I]) next(_ context.Context, } } -func (op *maxOperator[I]) err(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *maxOperator[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { defaultErrorFuncOperator(ctx, item, dst, operatorOptions) } -func (op *maxOperator[I]) end(ctx context.Context, dst chan<- Item[I]) { +func (op *maxOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { if !op.empty { Of(op.max).SendContext(ctx, dst) } } -func (op *maxOperator[I]) gatherNext(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *maxOperator[T]) gatherNext(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { // TODO(check): op.next(ctx, Of(item.V.(*maxOperator).max), dst, operatorOptions)รท op.next(ctx, Of(item.V), dst, operatorOptions) @@ -77,29 +77,29 @@ func (op *maxOperator[I]) gatherNext(ctx context.Context, // Min determines and emits the minimum-valued item emitted by an Observable // according to a comparator. -func (o *ObservableImpl[I]) Min(comparator Comparator[I], opts ...Option[I]) OptionalSingle[I] { +func (o *ObservableImpl[T]) Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] { const ( forceSeq = false bypassGather = false ) - return optionalSingle(o.parent, o, func() operator[I] { - return &minOperator[I]{ + return optionalSingle(o.parent, o, func() operator[T] { + return &minOperator[T]{ comparator: comparator, empty: true, } }, forceSeq, bypassGather, opts...) } -type minOperator[I any] struct { - comparator Comparator[I] +type minOperator[T any] struct { + comparator Comparator[T] empty bool - min I - limit func(value I) bool // represents min or max + min T + limit func(value T) bool // represents min or max } -func (op *minOperator[I]) next(_ context.Context, - item Item[I], _ chan<- Item[I], _ operatorOptions[I], +func (op *minOperator[T]) next(_ context.Context, + item Item[T], _ chan<- Item[T], _ operatorOptions[T], ) { op.empty = false @@ -115,20 +115,20 @@ func (op *minOperator[I]) next(_ context.Context, } } -func (op *minOperator[I]) err(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *minOperator[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { defaultErrorFuncOperator(ctx, item, dst, operatorOptions) } -func (op *minOperator[I]) end(ctx context.Context, dst chan<- Item[I]) { +func (op *minOperator[T]) end(ctx context.Context, dst chan<- Item[T]) { if !op.empty { Of(op.min).SendContext(ctx, dst) } } -func (op *minOperator[I]) gatherNext(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *minOperator[T]) gatherNext(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { // TODO(check): op.next(ctx, Of(item.V.(*minOperator).min), dst, operatorOptions) op.next(ctx, Of(item.V), dst, operatorOptions) diff --git a/rx/observable.go b/rx/observable.go index 46f673b..4948807 100644 --- a/rx/observable.go +++ b/rx/observable.go @@ -19,36 +19,36 @@ func LimitComparator[T constraints.Ordered](a, b T) int { return 1 } -type Observable[I any] interface { - Iterable[I] +type Observable[T any] interface { + Iterable[T] - Max(comparator Comparator[I], opts ...Option[I]) OptionalSingle[I] - Min(comparator Comparator[I], opts ...Option[I]) OptionalSingle[I] + Max(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] + Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] } // ObservableImpl implements Observable. -type ObservableImpl[I any] struct { +type ObservableImpl[T any] struct { parent context.Context - iterable Iterable[I] + iterable Iterable[T] } -func defaultErrorFuncOperator[I any](ctx context.Context, - item Item[I], dst chan<- Item[I], options operatorOptions[I], +func defaultErrorFuncOperator[T any](ctx context.Context, + item Item[T], dst chan<- Item[T], options operatorOptions[T], ) { item.SendContext(ctx, dst) options.stop() } -type operator[I any] interface { - next(ctx context.Context, item Item[I], dst chan<- Item[I], options operatorOptions[I]) - err(ctx context.Context, item Item[I], dst chan<- Item[I], options operatorOptions[I]) - end(ctx context.Context, dst chan<- Item[I]) - gatherNext(ctx context.Context, item Item[I], dst chan<- Item[I], options operatorOptions[I]) +type operator[T any] interface { + next(ctx context.Context, item Item[T], dst chan<- Item[T], options operatorOptions[T]) + err(ctx context.Context, item Item[T], dst chan<- Item[T], options operatorOptions[T]) + end(ctx context.Context, dst chan<- Item[T]) + gatherNext(ctx context.Context, item Item[T], dst chan<- Item[T], options operatorOptions[T]) } -func single[I any](parent context.Context, - iterable Iterable[I], operatorFactory func() operator[I], forceSeq, bypassGather bool, opts ...Option[I], -) Single[I] { +func single[T any](parent context.Context, + iterable Iterable[T], operatorFactory func() operator[T], forceSeq, bypassGather bool, opts ...Option[T], +) Single[T] { option := parseOptions(opts...) parallel, _ := option.getPool() next := option.buildChannel() @@ -61,11 +61,11 @@ func single[I any](parent context.Context, runParallel(ctx, next, iterable.Observe(opts...), operatorFactory, bypassGather, option, opts...) } - return &SingleImpl[I]{iterable: newChannelIterable(next)} + return &SingleImpl[T]{iterable: newChannelIterable(next)} } - return &SingleImpl[I]{ - iterable: newFactoryIterable(func(propagatedOptions ...Option[I]) <-chan Item[I] { + return &SingleImpl[T]{ + iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { mergedOptions := append(opts, propagatedOptions...) //nolint:gocritic // foo option = parseOptions(mergedOptions...) @@ -80,11 +80,11 @@ func single[I any](parent context.Context, } } -func optionalSingle[I any](parent context.Context, - iterable Iterable[I], operatorFactory func() operator[I], +func optionalSingle[T any](parent context.Context, + iterable Iterable[T], operatorFactory func() operator[T], forceSeq, bypassGather bool, - opts ...Option[I], -) OptionalSingle[I] { + opts ...Option[T], +) OptionalSingle[T] { option := parseOptions(opts...) ctx := option.buildContext(parent) parallel, _ := option.getPool() @@ -102,12 +102,12 @@ func optionalSingle[I any](parent context.Context, ) } - return &OptionalSingleImpl[I]{iterable: newChannelIterable(next)} + return &OptionalSingleImpl[T]{iterable: newChannelIterable(next)} } - return &OptionalSingleImpl[I]{ + return &OptionalSingleImpl[T]{ parent: ctx, - iterable: newFactoryIterable(func(propagatedOptions ...Option[I]) <-chan Item[I] { + iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { mergedOptions := append(opts, propagatedOptions...) //nolint:gocritic // foo option = parseOptions(mergedOptions...) @@ -129,22 +129,22 @@ func optionalSingle[I any](parent context.Context, } } -func runSequential[I any](ctx context.Context, - next chan Item[I], iterable Iterable[I], operatorFactory func() operator[I], - option Option[I], opts ...Option[I], +func runSequential[T any](ctx context.Context, + next chan Item[T], iterable Iterable[T], operatorFactory func() operator[T], + option Option[T], opts ...Option[T], ) { observe := iterable.Observe(opts...) go func() { op := operatorFactory() stopped := false - operator := operatorOptions[I]{ + operator := operatorOptions[T]{ stop: func() { if option.getErrorStrategy() == StopOnError { stopped = true } }, - resetIterable: func(newIterable Iterable[I]) { + resetIterable: func(newIterable Iterable[T]) { observe = newIterable.Observe(opts...) }, } @@ -171,31 +171,31 @@ func runSequential[I any](ctx context.Context, }() } -func runParallel[I any](ctx context.Context, - next chan Item[I], observe <-chan Item[I], operatorFactory func() operator[I], - bypassGather bool, option Option[I], opts ...Option[I], +func runParallel[T any](ctx context.Context, + next chan Item[T], observe <-chan Item[T], operatorFactory func() operator[T], + bypassGather bool, option Option[T], opts ...Option[T], ) { wg := sync.WaitGroup{} _, pool := option.getPool() wg.Add(pool) - var gather chan Item[I] + var gather chan Item[T] if bypassGather { gather = next } else { - gather = make(chan Item[I], 1) + gather = make(chan Item[T], 1) // Gather go func() { op := operatorFactory() stopped := false - operator := operatorOptions[I]{ + operator := operatorOptions[T]{ stop: func() { if option.getErrorStrategy() == StopOnError { stopped = true } }, - resetIterable: func(newIterable Iterable[I]) { + resetIterable: func(newIterable Iterable[T]) { observe = newIterable.Observe(opts...) }, } @@ -222,13 +222,13 @@ func runParallel[I any](ctx context.Context, go func() { op := operatorFactory() stopped := false - operator := operatorOptions[I]{ + operator := operatorOptions[T]{ stop: func() { if option.getErrorStrategy() == StopOnError { stopped = true } }, - resetIterable: func(newIterable Iterable[I]) { + resetIterable: func(newIterable Iterable[T]) { observe = newIterable.Observe(opts...) }, } @@ -243,12 +243,12 @@ func runParallel[I any](ctx context.Context, if !ok { if !bypassGather { // TODO: - // cannot use gather (variable of type chan Item[I]) as chan<- Item[operator[I]] + // cannot use gather (variable of type chan Item[T]) as chan<- Item[operator[T]] // value in argument to Of(op).SendContext // - // op = operator[I] / Item[operator[I]] - // gather = chan Item[I] - // can we send I down the channel, then apply the operator on the other + // op = operator[T] / Item[operator[T]] + // gather = chan Item[T] + // can we send T down the channel, then apply the operator on the other // end of the channel? // // or can we define another method on Item, such as SendOp ==> this looks diff --git a/rx/optional-single.go b/rx/optional-single.go index 13145d0..970781a 100644 --- a/rx/optional-single.go +++ b/rx/optional-single.go @@ -5,40 +5,40 @@ import ( ) // OptionalSingle is an optional single. -type OptionalSingle[I any] interface { - Iterable[I] - Get(opts ...Option[I]) (Item[I], error) - Map(apply Func[I], opts ...Option[I]) OptionalSingle[I] - Run(opts ...Option[I]) Disposed +type OptionalSingle[T any] interface { + Iterable[T] + Get(opts ...Option[T]) (Item[T], error) + Map(apply Func[T], opts ...Option[T]) OptionalSingle[T] + Run(opts ...Option[T]) Disposed } // OptionalSingleImpl implements OptionalSingle. -type OptionalSingleImpl[I any] struct { +type OptionalSingleImpl[T any] struct { parent context.Context - iterable Iterable[I] + iterable Iterable[T] } // NewOptionalSingleImpl create OptionalSingleImpl -func NewOptionalSingleImpl[I any](iterable Iterable[I]) OptionalSingleImpl[I] { +func NewOptionalSingleImpl[T any](iterable Iterable[T]) OptionalSingleImpl[T] { // this is new functionality due to iterable not being exported // - return OptionalSingleImpl[I]{ + return OptionalSingleImpl[T]{ iterable: iterable, } } // Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled. // This method is blocking. -func (o *OptionalSingleImpl[I]) Get(opts ...Option[I]) (Item[I], error) { +func (o *OptionalSingleImpl[T]) Get(opts ...Option[T]) (Item[T], error) { option := parseOptions(opts...) ctx := option.buildContext(o.parent) - optionalSingleEmpty := Item[I]{} + optionalSingleEmpty := Item[T]{} observe := o.Observe(opts...) for { select { case <-ctx.Done(): - return Item[I]{}, ctx.Err() + return Item[T]{}, ctx.Err() case v, ok := <-observe: if !ok { return optionalSingleEmpty, nil @@ -50,27 +50,27 @@ func (o *OptionalSingleImpl[I]) Get(opts ...Option[I]) (Item[I], error) { } // Map transforms the items emitted by an OptionalSingle by applying a function to each item. -func (o *OptionalSingleImpl[I]) Map(apply Func[I], opts ...Option[I]) OptionalSingle[I] { - return optionalSingle(o.parent, o, func() operator[I] { - return &mapOperatorOptionalSingle[I]{apply: apply} +func (o *OptionalSingleImpl[T]) Map(apply Func[T], opts ...Option[T]) OptionalSingle[T] { + return optionalSingle(o.parent, o, func() operator[T] { + return &mapOperatorOptionalSingle[T]{apply: apply} }, false, true, opts...) } // Observe observes an OptionalSingle by returning its channel. -func (o *OptionalSingleImpl[I]) Observe(opts ...Option[I]) <-chan Item[I] { +func (o *OptionalSingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { return o.iterable.Observe(opts...) } -type mapOperatorOptionalSingle[I any] struct { - apply Func[I] +type mapOperatorOptionalSingle[T any] struct { + apply Func[T] } -func (op *mapOperatorOptionalSingle[I]) next(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *mapOperatorOptionalSingle[T]) next(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { res, err := op.apply(ctx, item.V) if err != nil { - dst <- Error[I](err) + dst <- Error[T](err) operatorOptions.stop() @@ -79,20 +79,20 @@ func (op *mapOperatorOptionalSingle[I]) next(ctx context.Context, dst <- Of(res) } -func (op *mapOperatorOptionalSingle[I]) err(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *mapOperatorOptionalSingle[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { defaultErrorFuncOperator(ctx, item, dst, operatorOptions) } -func (op *mapOperatorOptionalSingle[I]) end(_ context.Context, _ chan<- Item[I]) { +func (op *mapOperatorOptionalSingle[T]) end(_ context.Context, _ chan<- Item[T]) { } -func (op *mapOperatorOptionalSingle[I]) gatherNext(_ context.Context, - item Item[I], dst chan<- Item[I], _ operatorOptions[I], +func (op *mapOperatorOptionalSingle[T]) gatherNext(_ context.Context, + item Item[T], dst chan<- Item[T], _ operatorOptions[T], ) { // --> switch item.V.(type) { - // case *mapOperatorOptionalSingle[I]: + // case *mapOperatorOptionalSingle[T]: // return // } dst <- item @@ -101,7 +101,7 @@ func (op *mapOperatorOptionalSingle[I]) gatherNext(_ context.Context, } // Run creates an observer without consuming the emitted items. -func (o *OptionalSingleImpl[I]) Run(opts ...Option[I]) Disposed { +func (o *OptionalSingleImpl[T]) Run(opts ...Option[T]) Disposed { dispose := make(chan struct{}) option := parseOptions(opts...) ctx := option.buildContext(o.parent) diff --git a/rx/options.go b/rx/options.go index 44d54e3..07d0f9b 100644 --- a/rx/options.go +++ b/rx/options.go @@ -9,22 +9,22 @@ import ( var emptyContext context.Context -type Option[I any] interface { - apply(*funcOption[I]) +type Option[T any] interface { + apply(*funcOption[T]) toPropagate() bool isEagerObservation() bool getPool() (bool, int) - buildChannel() chan Item[I] + buildChannel() chan Item[T] buildContext(parent context.Context) context.Context getBackPressureStrategy() BackPressureStrategy getErrorStrategy() OnErrorStrategy isConnectable() bool isConnectOperation() bool - isSerialized() (bool, func(I) int) + isSerialized() (bool, func(T) int) } -type funcOption[I any] struct { - f func(*funcOption[I]) +type funcOption[T any] struct { + f func(*funcOption[T]) isBuffer bool buffer int ctx context.Context @@ -35,30 +35,30 @@ type funcOption[I any] struct { propagate bool connectable bool connectOperation bool - serialized func(I) int + serialized func(T) int } -func (fdo *funcOption[I]) toPropagate() bool { +func (fdo *funcOption[T]) toPropagate() bool { return fdo.propagate } -func (fdo *funcOption[I]) isEagerObservation() bool { +func (fdo *funcOption[T]) isEagerObservation() bool { return fdo.observation == Eager } -func (fdo *funcOption[I]) getPool() (b bool, p int) { +func (fdo *funcOption[T]) getPool() (b bool, p int) { return fdo.pool > 0, fdo.pool } -func (fdo *funcOption[I]) buildChannel() chan Item[I] { +func (fdo *funcOption[T]) buildChannel() chan Item[T] { if fdo.isBuffer { - return make(chan Item[I], fdo.buffer) + return make(chan Item[T], fdo.buffer) } - return make(chan Item[I]) + return make(chan Item[T]) } -func (fdo *funcOption[I]) buildContext(parent context.Context) context.Context { +func (fdo *funcOption[T]) buildContext(parent context.Context) context.Context { if fdo.ctx != nil && parent != nil { ctx, _ := onecontext.Merge(fdo.ctx, parent) @@ -76,27 +76,27 @@ func (fdo *funcOption[I]) buildContext(parent context.Context) context.Context { return context.Background() } -func (fdo *funcOption[I]) getBackPressureStrategy() BackPressureStrategy { +func (fdo *funcOption[T]) getBackPressureStrategy() BackPressureStrategy { return fdo.backPressureStrategy } -func (fdo *funcOption[I]) getErrorStrategy() OnErrorStrategy { +func (fdo *funcOption[T]) getErrorStrategy() OnErrorStrategy { return fdo.onErrorStrategy } -func (fdo *funcOption[I]) isConnectable() bool { +func (fdo *funcOption[T]) isConnectable() bool { return fdo.connectable } -func (fdo *funcOption[I]) isConnectOperation() bool { +func (fdo *funcOption[T]) isConnectOperation() bool { return fdo.connectOperation } -func (fdo *funcOption[I]) apply(do *funcOption[I]) { +func (fdo *funcOption[T]) apply(do *funcOption[T]) { fdo.f(do) } -func (fdo *funcOption[I]) isSerialized() (b bool, f func(I) int) { +func (fdo *funcOption[T]) isSerialized() (b bool, f func(T) int) { if fdo.serialized == nil { return false, nil } @@ -104,14 +104,14 @@ func (fdo *funcOption[I]) isSerialized() (b bool, f func(I) int) { return true, fdo.serialized } -func newFuncOption[I any](f func(*funcOption[I])) *funcOption[I] { - return &funcOption[I]{ +func newFuncOption[T any](f func(*funcOption[T])) *funcOption[T] { + return &funcOption[T]{ f: f, } } -func parseOptions[I any](opts ...Option[I]) Option[I] { - o := new(funcOption[I]) +func parseOptions[T any](opts ...Option[T]) Option[T] { + o := new(funcOption[T]) for _, opt := range opts { opt.apply(o) } @@ -120,72 +120,72 @@ func parseOptions[I any](opts ...Option[I]) Option[I] { } // WithBufferedChannel allows to configure the capacity of a buffered channel. -func WithBufferedChannel[I any](capacity int) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithBufferedChannel[T any](capacity int) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.isBuffer = true options.buffer = capacity }) } // WithContext allows to pass a context. -func WithContext[I any](ctx context.Context) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithContext[T any](ctx context.Context) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.ctx = ctx }) } // WithObservationStrategy uses the eager observation mode meaning consuming the items even without subscription. -func WithObservationStrategy[I any](strategy ObservationStrategy) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithObservationStrategy[T any](strategy ObservationStrategy) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.observation = strategy }) } // WithPool allows to specify an execution pool. -func WithPool[I any](pool int) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithPool[T any](pool int) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.pool = pool }) } // WithCPUPool allows to specify an execution pool based on the number of logical CPUs. -func WithCPUPool[I any]() Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithCPUPool[T any]() Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.pool = runtime.NumCPU() }) } // WithBackPressureStrategy sets the back pressure strategy: drop or block. -func WithBackPressureStrategy[I any](strategy BackPressureStrategy) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithBackPressureStrategy[T any](strategy BackPressureStrategy) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.backPressureStrategy = strategy }) } // WithErrorStrategy defines how an observable should deal with error. // This strategy is propagated to the parent observable. -func WithErrorStrategy[I any](strategy OnErrorStrategy) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithErrorStrategy[T any](strategy OnErrorStrategy) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.onErrorStrategy = strategy }) } // WithPublishStrategy converts an ordinary Observable into a connectable Observable. -func WithPublishStrategy[I any]() Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func WithPublishStrategy[T any]() Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.connectable = true }) } // Serialize forces an Observable to make serialized calls and to be well-behaved. -func Serialize[I any](identifier func(I) int) Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func Serialize[T any](identifier func(T) int) Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.serialized = identifier }) } -func connect[I any]() Option[I] { - return newFuncOption(func(options *funcOption[I]) { +func connect[T any]() Option[T] { + return newFuncOption(func(options *funcOption[T]) { options.connectOperation = true }) } diff --git a/rx/single.go b/rx/single.go index 4c72cbb..ca4c7dc 100644 --- a/rx/single.go +++ b/rx/single.go @@ -3,30 +3,30 @@ package rx import "context" // Single is a observable with a single element. -type Single[I any] interface { - Iterable[I] - Filter(apply Predicate[I], opts ...Option[I]) OptionalSingle[I] - Get(opts ...Option[I]) (Item[I], error) - Map(apply Func[I], opts ...Option[I]) Single[I] - Run(opts ...Option[I]) Disposed +type Single[T any] interface { + Iterable[T] + Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T] + Get(opts ...Option[T]) (Item[T], error) + Map(apply Func[T], opts ...Option[T]) Single[T] + Run(opts ...Option[T]) Disposed } // SingleImpl implements Single. -type SingleImpl[I any] struct { +type SingleImpl[T any] struct { parent context.Context - iterable Iterable[I] + iterable Iterable[T] } // Filter emits only those items from an Observable that pass a predicate test. -func (s *SingleImpl[I]) Filter(apply Predicate[I], opts ...Option[I]) OptionalSingle[I] { - return optionalSingle(s.parent, s, func() operator[I] { - return &filterOperatorSingle[I]{apply: apply} +func (s *SingleImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T] { + return optionalSingle(s.parent, s, func() operator[T] { + return &filterOperatorSingle[T]{apply: apply} }, true, true, opts...) } // Get returns the item. The error returned is if the context has been cancelled. // This method is blocking. -func (s *SingleImpl[I]) Get(opts ...Option[I]) (Item[I], error) { +func (s *SingleImpl[T]) Get(opts ...Option[T]) (Item[T], error) { option := parseOptions(opts...) ctx := option.buildContext(s.parent) @@ -35,7 +35,7 @@ func (s *SingleImpl[I]) Get(opts ...Option[I]) (Item[I], error) { for { select { case <-ctx.Done(): - return Item[I]{}, ctx.Err() + return Item[T]{}, ctx.Err() case v := <-observe: return v, nil } @@ -43,23 +43,23 @@ func (s *SingleImpl[I]) Get(opts ...Option[I]) (Item[I], error) { } // Map transforms the items emitted by a Single by applying a function to each item. -func (s *SingleImpl[I]) Map(apply Func[I], opts ...Option[I]) Single[I] { - return single(s.parent, s, func() operator[I] { - return &mapOperatorSingle[I]{apply: apply} +func (s *SingleImpl[T]) Map(apply Func[T], opts ...Option[T]) Single[T] { + return single(s.parent, s, func() operator[T] { + return &mapOperatorSingle[T]{apply: apply} }, false, true, opts...) } -type mapOperatorSingle[I any] struct { - apply Func[I] +type mapOperatorSingle[T any] struct { + apply Func[T] } -func (op *mapOperatorSingle[I]) next(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *mapOperatorSingle[T]) next(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { res, err := op.apply(ctx, item.V) if err != nil { - Error[I](err).SendContext(ctx, dst) + Error[T](err).SendContext(ctx, dst) operatorOptions.stop() return @@ -68,17 +68,17 @@ func (op *mapOperatorSingle[I]) next(ctx context.Context, Of(res).SendContext(ctx, dst) } -func (op *mapOperatorSingle[I]) err(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *mapOperatorSingle[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { defaultErrorFuncOperator(ctx, item, dst, operatorOptions) } -func (op *mapOperatorSingle[I]) end(_ context.Context, _ chan<- Item[I]) { +func (op *mapOperatorSingle[T]) end(_ context.Context, _ chan<- Item[T]) { } -func (op *mapOperatorSingle[I]) gatherNext(ctx context.Context, - item Item[I], dst chan<- Item[I], _ operatorOptions[I], +func (op *mapOperatorSingle[T]) gatherNext(ctx context.Context, + item Item[T], dst chan<- Item[T], _ operatorOptions[T], ) { // TODO: switch item.V.(type) { // case *mapOperatorSingle: @@ -90,38 +90,38 @@ func (op *mapOperatorSingle[I]) gatherNext(ctx context.Context, } // Observe observes a Single by returning its channel. -func (s *SingleImpl[I]) Observe(opts ...Option[I]) <-chan Item[I] { +func (s *SingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { return s.iterable.Observe(opts...) } -type filterOperatorSingle[I any] struct { - apply Predicate[I] +type filterOperatorSingle[T any] struct { + apply Predicate[T] } -func (op *filterOperatorSingle[I]) next(ctx context.Context, - item Item[I], dst chan<- Item[I], _ operatorOptions[I], +func (op *filterOperatorSingle[T]) next(ctx context.Context, + item Item[T], dst chan<- Item[T], _ operatorOptions[T], ) { if op.apply(item.V) { item.SendContext(ctx, dst) } } -func (op *filterOperatorSingle[I]) err(ctx context.Context, - item Item[I], dst chan<- Item[I], operatorOptions operatorOptions[I], +func (op *filterOperatorSingle[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], ) { defaultErrorFuncOperator(ctx, item, dst, operatorOptions) } -func (op *filterOperatorSingle[I]) end(_ context.Context, _ chan<- Item[I]) { +func (op *filterOperatorSingle[T]) end(_ context.Context, _ chan<- Item[T]) { } -func (op *filterOperatorSingle[I]) gatherNext(_ context.Context, - _ Item[I], _ chan<- Item[I], _ operatorOptions[I], +func (op *filterOperatorSingle[T]) gatherNext(_ context.Context, + _ Item[T], _ chan<- Item[T], _ operatorOptions[T], ) { } // Run creates an observer without consuming the emitted items. -func (s *SingleImpl[I]) Run(opts ...Option[I]) Disposed { +func (s *SingleImpl[T]) Run(opts ...Option[T]) Disposed { dispose := make(chan struct{}) option := parseOptions(opts...) ctx := option.buildContext(s.parent) diff --git a/rx/types.go b/rx/types.go index fe2b63e..3b38102 100644 --- a/rx/types.go +++ b/rx/types.go @@ -3,45 +3,45 @@ package rx import "context" type ( - operatorOptions[I any] struct { + operatorOptions[T any] struct { stop func() - resetIterable func(Iterable[I]) + resetIterable func(Iterable[T]) } // Comparator defines a func that returns an int: // - 0 if two elements are equals // - A negative value if the first argument is less than the second // - A positive value if the first argument is greater than the second - Comparator[I any] func(I, I) int + Comparator[T any] func(T, T) int // ItemToObservable defines a function that computes an observable from an item. - ItemToObservable[I any] func(Item[I]) Observable[I] + ItemToObservable[T any] func(Item[T]) Observable[T] // ErrorToObservable defines a function that transforms an observable from an error. - ErrorToObservable[I any] func(error) Observable[I] + ErrorToObservable[T any] func(error) Observable[T] // Func defines a function that computes a value from an input value. - Func[I any] func(context.Context, I) (I, error) + Func[T any] func(context.Context, T) (T, error) // Func2 defines a function that computes a value from two input values. - Func2[I any] func(context.Context, I, I) (I, error) + Func2[T any] func(context.Context, T, T) (T, error) // FuncN defines a function that computes a value from N input values. - FuncN[I any] func(...I) I + FuncN[T any] func(...T) T // ErrorFunc defines a function that computes a value from an error. - ErrorFunc[I any] func(error) I + ErrorFunc[T any] func(error) T // Predicate defines a func that returns a bool from an input value. - Predicate[I any] func(I) bool - // Marshaller defines a marshaller type (ItemValue[I] to []byte). - Marshaller[I any] func(I) ([]byte, error) + Predicate[T any] func(T) bool + // Marshaller defines a marshaller type (ItemValue[T] to []byte). + Marshaller[T any] func(T) ([]byte, error) // Unmarshaller defines an unmarshaller type ([]byte to interface). - Unmarshaller[I any] func([]byte, I) error + Unmarshaller[T any] func([]byte, T) error // Producer defines a producer implementation. - Producer[I any] func(ctx context.Context, next chan<- Item[I]) + Producer[T any] func(ctx context.Context, next chan<- Item[T]) // Supplier defines a function that supplies a result from nothing. - Supplier[I any] func(ctx context.Context) Item[I] + Supplier[T any] func(ctx context.Context) Item[T] // Disposed is a notification channel indicating when an Observable is closed. Disposed <-chan struct{} // Disposable is a function to be called in order to dispose a subscription. Disposable context.CancelFunc // NextFunc handles a next item in a stream. - NextFunc[I any] func(I) + NextFunc[T any] func(T) // ErrFunc handles an error in a stream. ErrFunc func(error) // CompletedFunc handles the end of a stream.