Skip to content

Commit

Permalink
feat(rx): add all operator (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 8, 2024
1 parent 269aaa6 commit 7857fc0
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 18 deletions.
5 changes: 5 additions & 0 deletions enums/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ const (
//
ItemDiscNumeric

// ItemDiscBoolean enum value that represents a general boolean value
// typically used by predicate based operations eg All.
//
ItemDiscBoolean

// ItemDiscChan enum value that represents a channel of T
//
ItemDiscChan
Expand Down
82 changes: 76 additions & 6 deletions rx/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ type AssertResources[T any] interface {
Values() []T
Numbers() []int
Errors() []error
Booleans() []bool
}

type actualResources[T any] struct {
values []T
numbers []int
errors []error
values []T
numbers []int
errors []error
booleans []bool
}

func (r *actualResources[T]) Values() []T {
Expand All @@ -39,6 +41,10 @@ func (r *actualResources[T]) Errors() []error {
return r.errors
}

func (r *actualResources[T]) Booleans() []bool {
return r.booleans
}

func Assert[T any](ctx context.Context, iterable Iterable[T], asserters ...Asserter[T]) {
resources := assertObserver(ctx, iterable)

Expand All @@ -49,9 +55,10 @@ func Assert[T any](ctx context.Context, iterable Iterable[T], asserters ...Asser

func assertObserver[T any](ctx context.Context, iterable Iterable[T]) *actualResources[T] {
resources := &actualResources[T]{
values: make([]T, 0),
numbers: make([]int, 0),
errors: make([]error, 0),
values: make([]T, 0),
numbers: make([]int, 0),
errors: make([]error, 0),
booleans: make([]bool, 0),
}

observe := iterable.Observe()
Expand All @@ -73,6 +80,9 @@ loop:
case item.IsNumeric():
resources.numbers = append(resources.numbers, item.N)

case item.IsBoolean():
resources.booleans = append(resources.booleans, item.B)

default:
resources.values = append(resources.values, item.V)
}
Expand Down Expand Up @@ -266,3 +276,63 @@ type CustomPredicate[T any] struct {
func (a CustomPredicate[T]) Check(actual AssertResources[T]) {
Expect(a.Expected(actual)).To(Succeed(), reason("CustomPredicate"))
}

// HasTrue
type HasTrue[T any] struct {
}

// Check HasTrue checks boolean values contains at least 1 true value
func (a HasTrue[T]) Check(actual AssertResources[T]) {
values := actual.Booleans()

if len(values) == 0 {
Fail("HasTrue: no values found")
}

Expect(values).To(ContainElements(true), reason("HasTrue"))
}

// HasFalse
type HasFalse[T any] struct {
}

// Check HasFalse checks boolean values contains at least 1 true false
func (a HasFalse[T]) Check(actual AssertResources[T]) {
values := actual.Booleans()

if len(values) == 0 {
Fail("HasFalse: no values found")
}

Expect(values).To(ContainElements(false), reason("HasFalse"))
}

// IsTrue
type IsTrue[T any] struct {
}

// Check IsTrue checks boolean value is true
func (a IsTrue[T]) Check(actual AssertResources[T]) {
values := actual.Booleans()

if len(values) == 0 {
Fail("IsTrue: no value found")
}

Expect(values[0]).To(BeTrue(), reason("IsTrue"))
}

// IsFalse
type IsFalse[T any] struct {
}

// Check IsFalse checks boolean value is false
func (a IsFalse[T]) Check(actual AssertResources[T]) {
values := actual.Booleans()

if len(values) == 0 {
Fail("IsFalse: no value found")
}

Expect(values[0]).To(BeFalse(), reason("IsFalse"))
}
34 changes: 32 additions & 2 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type (
//
C chan<- Item[T]
N int
B bool
disc enums.ItemDiscriminator
}

Expand Down Expand Up @@ -90,6 +91,30 @@ func Num[T any](n int) Item[T] {
}
}

// Bool creates a type safe boolean instance
func Bool[T any](b bool) Item[T] {
return Item[T]{
B: b,
disc: enums.ItemDiscBoolean,
}
}

// True creates a type safe boolean instance set to true
func True[T any]() Item[T] {
return Item[T]{
B: true,
disc: enums.ItemDiscBoolean,
}
}

// False creates a type safe boolean instance set to false
func False[T any]() Item[T] {
return Item[T]{
B: false,
disc: enums.ItemDiscBoolean,
}
}

// SendItems is a utility function that sends a list of items and indicates a
// strategy on whether to close the channel once the function completes.
func SendItems[T any](ctx context.Context,
Expand Down Expand Up @@ -178,16 +203,21 @@ func (i Item[T]) IsTick() bool {
return (i.disc & enums.ItemDiscPulse) > 0
}

// IsTickValue checks if an item is a tick instance.
// IsTickValue checks if an item is a tick value instance.
func (i Item[T]) IsTickValue() bool {
return (i.disc & enums.ItemDiscTickValue) > 0
}

// IsTickValue checks if an item is a tick instance.
// IsTickValue checks if an item is a numeric instance.
func (i Item[T]) IsNumeric() bool {
return (i.disc & enums.ItemDiscNumeric) > 0
}

// IsBoolean checks if an item is a boolean instance.
func (i Item[T]) IsBoolean() bool {
return (i.disc & enums.ItemDiscBoolean) > 0
}

// SendBlocking sends an item and blocks until it is sent.
func (i Item[T]) SendBlocking(ch chan<- Item[T]) {
ch <- i
Expand Down
66 changes: 64 additions & 2 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rx

import (
"context"
"fmt"
"reflect"
)

Expand All @@ -12,8 +13,65 @@ func isZero[T any](limit T) bool {
return val != zero
}

func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] {
return o.iterable.Observe(opts...)
// All determines whether all items emitted by an Observable meet some criteria.
func (o *ObservableImpl[T]) All(predicate Predicate[T], opts ...Option[T]) Single[T] {
const (
forceSeq = false
bypassGather = false
)

return single(o.parent, o, func() operator[T] {
return &allOperator[T]{
predicate: predicate,
all: true,
}
}, forceSeq, bypassGather, opts...)
}

type allOperator[T any] struct {
predicate Predicate[T]
all bool
}

func (op *allOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
if !op.predicate(item) {
False[T]().SendContext(ctx, dst)

op.all = false

operatorOptions.stop()
}
}

func (op *allOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *allOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
if op.all {
True[T]().SendContext(ctx, dst)
}
}

func (op *allOperator[T]) gatherNext(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
if !item.IsBoolean() {
// This panic is temporary
panic(fmt.Sprintf("item: '%+v' is not a Boolean", item))
}

if !item.B {
False[T]().SendContext(ctx, dst)

op.all = false

operatorOptions.stop()
}
}

// Connect instructs a connectable Observable to begin emitting items to its subscribers.
Expand Down Expand Up @@ -238,3 +296,7 @@ func (op *minOperator[T]) gatherNext(ctx context.Context,
// TODO(check): op.next(ctx, Of(item.V.(*minOperator).min), dst, operatorOptions)
op.next(ctx, Of(item.V), dst, operatorOptions)
}

func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] {
return o.iterable.Observe(opts...)
}
2 changes: 1 addition & 1 deletion rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Observable[T any] interface {
Iterable[T]

All(predicate Predicate[T], opts ...Option[T]) Single[T]
Connect(ctx context.Context) (context.Context, Disposable)

Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Expand Down
Loading

0 comments on commit 7857fc0

Please sign in to comment.