Skip to content

Commit

Permalink
feat(rx): add zip from iterable operator (#220)
Browse files Browse the repository at this point in the history
feat(rx): add zip from iterable operator (#220)

ref(rx): minor refactorings (#220)
  • Loading branch information
plastikfan committed Apr 15, 2024
1 parent ad4607c commit 43ebec2
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 73 deletions.
20 changes: 10 additions & 10 deletions rx/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,47 +37,47 @@ type execution struct {
isTick bool
}

func timeCausality[T any](elems ...any) (context.Context, Observable[T], Duration) {
func timeCausality[T any](values ...any) (context.Context, Observable[T], Duration) {
ch := make(chan Item[T], 1)
fs := make([]execution, len(elems)+1)
fs := make([]execution, len(values)+1)
ctx, cancel := context.WithCancel(context.Background())

for i, elem := range elems {
for i, value := range values {
i := i
elem := elem
value := value

if el, ok := elem.(Item[T]); ok && el.IsTick() {
if el, ok := value.(Item[T]); ok && el.IsTick() {
fs[i] = execution{
f: func() {},
isTick: true,
}
} else {
switch elem := elem.(type) {
switch value := value.(type) {
case Item[T]:
fs[i] = execution{
f: func() {
ch <- elem
ch <- value
},
}

case error:
fs[i] = execution{
f: func() {
ch <- Error[T](elem)
ch <- Error[T](value)
},
}

case T:
fs[i] = execution{
f: func() {
ch <- Of(elem)
ch <- Of(value)
},
}
}
}
}

fs[len(elems)] = execution{
fs[len(values)] = execution{
f: func() {
cancel()
},
Expand Down
2 changes: 1 addition & 1 deletion rx/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ var _ = Describe("Factory", func() {
defer leaktest.Check(GinkgoT())()

/*
needs to accommodate item.N, ie the numeric aux value
TODO: needs to accommodate item.N, ie the numeric aux value
and also should be modified to support all the other
new ways of interpreting an item (Ch, Tick, Tv)
*/
Expand Down
18 changes: 9 additions & 9 deletions rx/observable-operator-distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var _ = Describe("Observable operator", func() {
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 1, 3).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
func(_ context.Context, value int) (int, error) {
return value, nil
},
)

Expand All @@ -44,8 +44,8 @@ var _ = Describe("Observable operator", func() {
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, errFoo, 3).Distinct(
func(_ context.Context, item int) (int, error) {
return item, nil
func(_ context.Context, value int) (int, error) {
return value, nil
},
)

Expand All @@ -69,12 +69,12 @@ var _ = Describe("Observable operator", func() {
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 2, 3, 4).Distinct(
func(_ context.Context, v int) (int, error) {
if v == 3 {
func(_ context.Context, value int) (int, error) {
if value == 3 {
return 0, errFoo
}

return v, nil
return value, nil
},
)

Expand Down Expand Up @@ -195,8 +195,8 @@ var _ = Describe("Observable operator", func() {
defer cancel()

obs := testObservable[int](ctx, 1, 2, 2, 1, 3).DistinctUntilChanged(
func(_ context.Context, item int) (int, error) {
return item, nil
func(_ context.Context, value int) (int, error) {
return value, nil
}, rx.NativeItemLimitComparator, rx.WithCPUPool[int]())

rx.Assert(ctx, obs, rx.HasItems[int]{
Expand Down
73 changes: 73 additions & 0 deletions rx/observable-operator-zip-from-iterable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("ZipFromIterable", func() {
When("source and other observers same length", func() {
It("🧪 should: emit zipped elements", func() {
// rxgo: Test_Observable_ZipFromObservable
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs1 := testObservable[int](ctx, 1, 2, 3)
obs2 := testObservable[int](ctx, 10, 20, 30)
zipper := func(_ context.Context, a, b rx.Item[int]) (int, error) {
return a.V + b.V, nil
}
zip := obs1.ZipFromIterable(obs2, zipper)
rx.Assert(ctx, zip, rx.HasItems[int]{
Expected: []int{11, 22, 33},
})
})
})

When("source observer longer than other", func() {
It("🧪 should: omit zip from trailing items", func() {
// rxgo: Test_Observable_ZipFromObservable_DifferentLength1
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs1 := testObservable[int](ctx, 1, 2, 3)
obs2 := testObservable[int](ctx, 10, 20)
zipper := func(_ context.Context, a, b rx.Item[int]) (int, error) {
return a.V + b.V, nil
}
zip := obs1.ZipFromIterable(obs2, zipper)
rx.Assert(ctx, zip, rx.HasItems[int]{
Expected: []int{11, 22},
})
})
})

When("source observer shorter than other", func() {
It("🧪 should: omit zip from trailing items", func() {
// rxgo: Test_Observable_ZipFromObservable_DifferentLength2
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs1 := testObservable[int](ctx, 1, 2)
obs2 := testObservable[int](ctx, 10, 20, 30)
zipper := func(_ context.Context, a, b rx.Item[int]) (int, error) {
return a.V + b.V, nil
}
zip := obs1.ZipFromIterable(obs2, zipper)
rx.Assert(ctx, zip, rx.HasItems[int]{
Expected: []int{11, 22},
})
})
})
})
})
Loading

0 comments on commit 43ebec2

Please sign in to comment.