From 2d5e52203f27ec5c9ef5ade35e2f8b5a74f70ce7 Mon Sep 17 00:00:00 2001 From: plastikfan Date: Fri, 5 Apr 2024 09:27:12 +0100 Subject: [PATCH] ref(rx): rationalise item aux fields (#142) --- enums/item.go | 30 +++++++++++ rx/factory.go | 4 +- rx/item.go | 140 +++++++++++++++++++++++++++--------------------- rx/util_test.go | 4 ++ 4 files changed, 115 insertions(+), 63 deletions(-) create mode 100644 enums/item.go diff --git a/enums/item.go b/enums/item.go new file mode 100644 index 0000000..7dd9c26 --- /dev/null +++ b/enums/item.go @@ -0,0 +1,30 @@ +package enums + +type ItemDiscriminator uint32 + +const ( + // ItemDiscNative enum value that represents the native type T. + // + ItemDiscNative ItemDiscriminator = 0 + + // ItemDiscError enum value that represents an error + // + ItemDiscError ItemDiscriminator = 1 << (iota - 1) + + // ItemDiscPulse enum value that represents a Tick value. + // + ItemDiscPulse + + // ItemDiscTick enum value that represents a TickValue value. + // + ItemDiscTickValue + + // ItemDiscNumeric enum value that represents a general numeric value + // typically used by range operations that require a number. + // + ItemDiscNumeric + + // ItemDiscChan enum value that represents a channel of T + // + ItemDiscChan +) diff --git a/rx/factory.go b/rx/factory.go index f93f14c..49e39eb 100644 --- a/rx/factory.go +++ b/rx/factory.go @@ -11,7 +11,7 @@ import ( ) // Amb takes several Observables, emit all of the items from only the first of these Observables -// to emit an item or notification. +// to emit an item or notification. (What the hell is an Amb, WTF) func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] { option := parseOptions(opts...) ctx := option.buildContext(emptyContext) @@ -242,7 +242,7 @@ func Interval[T any](interval Duration, opts ...Option[T]) Observable[T] { for { select { case <-time.After(interval.duration()): - if !Tv[T](i).SendContext(ctx, next) { + if !TV[T](i).SendContext(ctx, next) { return } diff --git a/rx/item.go b/rx/item.go index 0e9eb85..10dc514 100644 --- a/rx/item.go +++ b/rx/item.go @@ -4,21 +4,20 @@ import ( "context" "reflect" "time" + + "github.com/snivilised/lorax/enums" ) type ( // Item is a wrapper having either a value, error or channel. // Item[T any] struct { - V T - E error + Disc enums.ItemDiscriminator + V T + E error // - C chan<- Item[T] - tick bool - tickV bool - numeric bool - TV int - N int + C chan<- Item[T] + N int } // TimestampItem attach a timestamp to an item. @@ -41,45 +40,57 @@ const ( // Of creates an item from a value. func Of[T any](v T) Item[T] { - return Item[T]{V: v} + return Item[T]{ + V: v, + Disc: enums.ItemDiscNative, + } } // Ch creates an item from a channel func Ch[T any](ch any) Item[T] { if c, ok := ch.(chan<- Item[T]); ok { - return Item[T]{C: c} + return Item[T]{ + C: c, + Disc: enums.ItemDiscChan, + } } panic("temp: invalid ch type") } -// Tick creates a type safe tick instance -func Tick[T any]() Item[T] { - return Item[T]{tick: true} +// Error creates an item from an error. +func Error[T any](err error) Item[T] { + return Item[T]{ + E: err, + Disc: enums.ItemDiscError, + } } -// Tv creates a type safe tick value instance -func Tv[T any](tv int) Item[T] { +// Pulse creates a type safe tick instance that doesn't contain a value +// thats acts like a heartbeat. +func Pulse[T any]() Item[T] { return Item[T]{ - TV: tv, - tickV: true, + Disc: enums.ItemDiscPulse, } } -// Num creates a type safe tick value instance -func Num[T any](n int) Item[T] { +// TV creates a type safe tick value instance +func TV[T any](tv int) Item[T] { return Item[T]{ - N: n, - numeric: true, + N: tv, + Disc: enums.ItemDiscTickValue, } } -// Error creates an item from an error. -func Error[T any](err error) Item[T] { - return Item[T]{E: err} +// Num creates a type safe tick value instance +func Num[T any](n int) Item[T] { + return Item[T]{ + N: n, + Disc: enums.ItemDiscNumeric, + } } -// SendItems is an utility function that send a list of items and indicate a +// SendItems is a utility function that sends a list of items and indicates a // strategy on whether to close the channel once the function completes. func SendItems[T any](ctx context.Context, ch chan<- Item[T], strategy CloseChannelStrategy, items ...any, @@ -92,52 +103,59 @@ func SendItems[T any](ctx context.Context, } func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) { + // can we revert items to be Item[T]? for _, current := range items { switch item := current.(type) { default: rt := reflect.TypeOf(item) + sendItemByType(ctx, ch, rt, item) - switch rt.Kind() { //nolint:exhaustive // foo - default: - switch v := item.(type) { - case error: - Error[T](v).SendContext(ctx, ch) + case error: + Error[T](item).SendContext(ctx, ch) + } + } +} - case Item[T]: - v.SendContext(ctx, ch) +func sendItemByType[T any](ctx context.Context, ch chan<- Item[T], rt reflect.Type, item any) { + switch rt.Kind() { //nolint:exhaustive // foo + default: + switch v := item.(type) { + case error: + Error[T](v).SendContext(ctx, ch) - case T: - Of(v).SendContext(ctx, ch) - } + case Item[T]: + v.SendContext(ctx, ch) - case reflect.Chan: - inCh := reflect.ValueOf(current) + case T: + Of(v).SendContext(ctx, ch) + } - for { - v, ok := inCh.Recv() + case reflect.Chan: + inCh := reflect.ValueOf(item) // current + sendViaRefCh(ctx, inCh, ch) - if !ok { - return - } + case reflect.Slice: + s := reflect.ValueOf(item) // current - vItem := v.Interface() + for i := 0; i < s.Len(); i++ { + send(ctx, ch, s.Index(i).Interface()) + } + } +} - switch item := vItem.(type) { - default: - Ch[T](item).SendContext(ctx, ch) +func sendViaRefCh[T any](ctx context.Context, inCh reflect.Value, ch chan<- Item[T]) { + for { + v, ok := inCh.Recv() - case error: - Error[T](item).SendContext(ctx, ch) - } - } + if !ok { + return + } - case reflect.Slice: - s := reflect.ValueOf(current) + vItem := v.Interface() - for i := 0; i < s.Len(); i++ { - send(ctx, ch, s.Index(i).Interface()) - } - } + switch item := vItem.(type) { + default: + Ch[T](item).SendContext(ctx, ch) case error: Error[T](item).SendContext(ctx, ch) @@ -147,27 +165,27 @@ func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) { // IsCh checks if an item is an error. func (i Item[T]) IsCh() bool { - return i.C != nil + return (i.Disc & enums.ItemDiscChan) > 0 } // IsError checks if an item is an error. func (i Item[T]) IsError() bool { - return i.E != nil + return (i.Disc & enums.ItemDiscError) > 0 } // IsTick checks if an item is a tick instance. func (i Item[T]) IsTick() bool { - return i.tick + return (i.Disc & enums.ItemDiscPulse) > 0 } // IsTickValue checks if an item is a tick instance. func (i Item[T]) IsTickValue() bool { - return i.tickV + return (i.Disc & enums.ItemDiscTickValue) > 0 } // IsTickValue checks if an item is a tick instance. func (i Item[T]) IsNumeric() bool { - return i.numeric + return (i.Disc & enums.ItemDiscNumeric) > 0 } // SendBlocking sends an item and blocks until it is sent. diff --git a/rx/util_test.go b/rx/util_test.go index 1ce9457..4f5b35b 100644 --- a/rx/util_test.go +++ b/rx/util_test.go @@ -42,6 +42,10 @@ func convertAllItemsToAny[T any](values []T) []any { } func testObservable[T any](ctx context.Context, items ...any) rx.Observable[T] { + // items is a collection of any because we need the ability to send a stream + // of events that may include errors; 1, 2, err, 4, ..., without enforcing + // that the client should manufacture Item[T]s; Of(1), Of(2), Error(err), Of(4). + // return rx.FromChannel(channelValue[T](ctx, convertAllItemsToAny(items)...)) }