Skip to content

Commit

Permalink
ref(rx): complete factory impl (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 2, 2024
1 parent fa839c4 commit 9691cfd
Show file tree
Hide file tree
Showing 16 changed files with 1,824 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
],
"cSpell.words": [
"Assistable",
"binaryheap",
"bodyclose",
"cmds",
"coverpkg",
Expand All @@ -15,6 +16,7 @@
"dogsled",
"dotenv",
"dupl",
"emirpasic",
"errcheck",
"exportloopref",
"extendio",
Expand Down Expand Up @@ -46,6 +48,7 @@
"nakedret",
"nolint",
"nolintlint",
"notif",
"onecontext",
"onsi",
"outdir",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
)

require (
github.com/emirpasic/gods v1.18.1
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/go-cmp v0.6.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
Expand Down
25 changes: 25 additions & 0 deletions rx/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ loop:
break loop
}

// TODO: needs to accommodate item.N, ie the numeric aux value
// and also should be modified to support all the other
// new ways of interpreting an item (Ch, Tick, Tv), possibly
// with new assertions, ie: HasCh, HasTick, HasTv.
//
if item.IsError() {
errs = append(errs, item.E)
} else {
Expand Down Expand Up @@ -226,6 +231,14 @@ func HasItem[T any](i T) RxAssert[T] {
})
}

// HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order.
func HasItemsNoOrder[T any](items ...T) RxAssert[T] {
return newAssertion(func(a *rxAssert[T]) {
a.checkHasItemsNoOrder = true
a.itemsNoOrder = items
})
}

// IsNotEmpty checks that the observable produces some items.
func IsNotEmpty[T any]() RxAssert[T] {
return newAssertion(func(a *rxAssert[T]) {
Expand Down Expand Up @@ -259,3 +272,15 @@ func HasNoError[T any]() RxAssert[T] {
ra.checkHasNotRaisedError = true
})
}

// CustomPredicate checks a custom predicate.
func CustomPredicate[T any](predicate AssertPredicate[T]) RxAssert[T] {
return newAssertion(func(a *rxAssert[T]) {
if !a.checkHasCustomPredicate {
a.checkHasCustomPredicate = true
a.customPredicates = make([]AssertPredicate[T], 0)
}

a.customPredicates = append(a.customPredicates, predicate)
})
}
110 changes: 110 additions & 0 deletions rx/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package rx

import (
"context"
"time"
)

// Infinite represents an infinite wait time
var Infinite int64 = -1

// Duration represents a duration
type Duration interface {
duration() time.Duration
}

type duration struct {
d time.Duration
}

func (d *duration) duration() time.Duration {
return d.d
}

// WithDuration is a duration option
func WithDuration(d time.Duration) Duration {
return &duration{
d: d,
}
}

type causalityDuration struct {
fs []execution
}

type execution struct {
f func()
isTick bool
}

func timeCausality[T any](elems ...any) (context.Context, Observable[T], Duration) {
ch := make(chan Item[T], 1)
fs := make([]execution, len(elems)+1)
ctx, cancel := context.WithCancel(context.Background())

for i, elem := range elems {
i := i
elem := elem

if el, ok := elem.(Item[T]); ok && el.IsTick() {
fs[i] = execution{
f: func() {},
isTick: true,
}
} else {
switch elem := elem.(type) {
case Item[T]:
fs[i] = execution{
f: func() {
ch <- elem
},
}

case error:
fs[i] = execution{
f: func() {
ch <- Error[T](elem)
},
}

case T:
fs[i] = execution{
f: func() {
ch <- Of(elem)
},
}
}
}
}

fs[len(elems)] = execution{
f: func() {
cancel()
},
isTick: false,
}

return ctx, FromChannel(ch), &causalityDuration{fs: fs}
}

func (d *causalityDuration) duration() time.Duration {
pop := d.fs[0]
pop.f()

d.fs = d.fs[1:]

if pop.isTick {
return time.Nanosecond
}

return time.Minute
}

// type mockDuration struct {
// mock.Mock
// }

// func (m *mockDuration) duration() time.Duration {
// args := m.Called()
// return args.Get(0).(time.Duration)
// }
19 changes: 19 additions & 0 deletions rx/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package rx

// IllegalInputError is triggered when the observable receives an illegal input.
type IllegalInputError struct {
error string
}

func (e IllegalInputError) Error() string {
return "illegal input: " + e.error
}

// IndexOutOfBoundError is triggered when the observable cannot access to the specified index.
type IndexOutOfBoundError struct {
error string
}

func (e IndexOutOfBoundError) Error() string {
return "index out of bound: " + e.error
}
Loading

0 comments on commit 9691cfd

Please sign in to comment.