Skip to content

Commit

Permalink
ref(rx): rationalise item aux fields (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 5, 2024
1 parent c56df52 commit 2d5e522
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 63 deletions.
30 changes: 30 additions & 0 deletions enums/item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package enums

type ItemDiscriminator uint32

const (
// ItemDiscNative enum value that represents the native type T.
//
ItemDiscNative ItemDiscriminator = 0

// ItemDiscError enum value that represents an error
//
ItemDiscError ItemDiscriminator = 1 << (iota - 1)

// ItemDiscPulse enum value that represents a Tick value.
//
ItemDiscPulse

// ItemDiscTick enum value that represents a TickValue value.
//
ItemDiscTickValue

// ItemDiscNumeric enum value that represents a general numeric value
// typically used by range operations that require a number.
//
ItemDiscNumeric

// ItemDiscChan enum value that represents a channel of T
//
ItemDiscChan
)
4 changes: 2 additions & 2 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// Amb takes several Observables, emit all of the items from only the first of these Observables
// to emit an item or notification.
// to emit an item or notification. (What the hell is an Amb, WTF)
func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] {
option := parseOptions(opts...)
ctx := option.buildContext(emptyContext)
Expand Down Expand Up @@ -242,7 +242,7 @@ func Interval[T any](interval Duration, opts ...Option[T]) Observable[T] {
for {
select {
case <-time.After(interval.duration()):
if !Tv[T](i).SendContext(ctx, next) {
if !TV[T](i).SendContext(ctx, next) {
return
}

Expand Down
140 changes: 79 additions & 61 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import (
"context"
"reflect"
"time"

"github.com/snivilised/lorax/enums"
)

type (
// Item is a wrapper having either a value, error or channel.
//
Item[T any] struct {
V T
E error
Disc enums.ItemDiscriminator
V T
E error
//
C chan<- Item[T]
tick bool
tickV bool
numeric bool
TV int
N int
C chan<- Item[T]
N int
}

// TimestampItem attach a timestamp to an item.
Expand All @@ -41,45 +40,57 @@ const (

// Of creates an item from a value.
func Of[T any](v T) Item[T] {
return Item[T]{V: v}
return Item[T]{
V: v,
Disc: enums.ItemDiscNative,
}
}

// Ch creates an item from a channel
func Ch[T any](ch any) Item[T] {
if c, ok := ch.(chan<- Item[T]); ok {
return Item[T]{C: c}
return Item[T]{
C: c,
Disc: enums.ItemDiscChan,
}
}

panic("temp: invalid ch type")
}

// Tick creates a type safe tick instance
func Tick[T any]() Item[T] {
return Item[T]{tick: true}
// Error creates an item from an error.
func Error[T any](err error) Item[T] {
return Item[T]{
E: err,
Disc: enums.ItemDiscError,
}
}

// Tv creates a type safe tick value instance
func Tv[T any](tv int) Item[T] {
// Pulse creates a type safe tick instance that doesn't contain a value
// thats acts like a heartbeat.
func Pulse[T any]() Item[T] {
return Item[T]{
TV: tv,
tickV: true,
Disc: enums.ItemDiscPulse,
}
}

// Num creates a type safe tick value instance
func Num[T any](n int) Item[T] {
// TV creates a type safe tick value instance
func TV[T any](tv int) Item[T] {
return Item[T]{
N: n,
numeric: true,
N: tv,
Disc: enums.ItemDiscTickValue,
}
}

// Error creates an item from an error.
func Error[T any](err error) Item[T] {
return Item[T]{E: err}
// Num creates a type safe tick value instance
func Num[T any](n int) Item[T] {
return Item[T]{
N: n,
Disc: enums.ItemDiscNumeric,
}
}

// SendItems is an utility function that send a list of items and indicate a
// SendItems is a utility function that sends a list of items and indicates a
// strategy on whether to close the channel once the function completes.
func SendItems[T any](ctx context.Context,
ch chan<- Item[T], strategy CloseChannelStrategy, items ...any,
Expand All @@ -92,52 +103,59 @@ func SendItems[T any](ctx context.Context,
}

func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) {
// can we revert items to be Item[T]?
for _, current := range items {
switch item := current.(type) {
default:
rt := reflect.TypeOf(item)
sendItemByType(ctx, ch, rt, item)

switch rt.Kind() { //nolint:exhaustive // foo
default:
switch v := item.(type) {
case error:
Error[T](v).SendContext(ctx, ch)
case error:
Error[T](item).SendContext(ctx, ch)
}
}
}

case Item[T]:
v.SendContext(ctx, ch)
func sendItemByType[T any](ctx context.Context, ch chan<- Item[T], rt reflect.Type, item any) {
switch rt.Kind() { //nolint:exhaustive // foo
default:
switch v := item.(type) {
case error:
Error[T](v).SendContext(ctx, ch)

case T:
Of(v).SendContext(ctx, ch)
}
case Item[T]:
v.SendContext(ctx, ch)

case reflect.Chan:
inCh := reflect.ValueOf(current)
case T:
Of(v).SendContext(ctx, ch)
}

for {
v, ok := inCh.Recv()
case reflect.Chan:
inCh := reflect.ValueOf(item) // current
sendViaRefCh(ctx, inCh, ch)

if !ok {
return
}
case reflect.Slice:
s := reflect.ValueOf(item) // current

vItem := v.Interface()
for i := 0; i < s.Len(); i++ {
send(ctx, ch, s.Index(i).Interface())
}
}
}

switch item := vItem.(type) {
default:
Ch[T](item).SendContext(ctx, ch)
func sendViaRefCh[T any](ctx context.Context, inCh reflect.Value, ch chan<- Item[T]) {
for {
v, ok := inCh.Recv()

case error:
Error[T](item).SendContext(ctx, ch)
}
}
if !ok {
return
}

case reflect.Slice:
s := reflect.ValueOf(current)
vItem := v.Interface()

for i := 0; i < s.Len(); i++ {
send(ctx, ch, s.Index(i).Interface())
}
}
switch item := vItem.(type) {
default:
Ch[T](item).SendContext(ctx, ch)

case error:
Error[T](item).SendContext(ctx, ch)
Expand All @@ -147,27 +165,27 @@ func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) {

// IsCh checks if an item is an error.
func (i Item[T]) IsCh() bool {
return i.C != nil
return (i.Disc & enums.ItemDiscChan) > 0
}

// IsError checks if an item is an error.
func (i Item[T]) IsError() bool {
return i.E != nil
return (i.Disc & enums.ItemDiscError) > 0
}

// IsTick checks if an item is a tick instance.
func (i Item[T]) IsTick() bool {
return i.tick
return (i.Disc & enums.ItemDiscPulse) > 0
}

// IsTickValue checks if an item is a tick instance.
func (i Item[T]) IsTickValue() bool {
return i.tickV
return (i.Disc & enums.ItemDiscTickValue) > 0
}

// IsTickValue checks if an item is a tick instance.
func (i Item[T]) IsNumeric() bool {
return i.numeric
return (i.Disc & enums.ItemDiscNumeric) > 0
}

// SendBlocking sends an item and blocks until it is sent.
Expand Down
4 changes: 4 additions & 0 deletions rx/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func convertAllItemsToAny[T any](values []T) []any {
}

func testObservable[T any](ctx context.Context, items ...any) rx.Observable[T] {
// items is a collection of any because we need the ability to send a stream
// of events that may include errors; 1, 2, err, 4, ..., without enforcing
// that the client should manufacture Item[T]s; Of(1), Of(2), Error(err), Of(4).
//
return rx.FromChannel(channelValue[T](ctx, convertAllItemsToAny(items)...))
}

Expand Down

0 comments on commit 2d5e522

Please sign in to comment.