Skip to content

Commit

Permalink
feat(rx): add send operator (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 13, 2024
1 parent 038923a commit 8f328c2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
47 changes: 47 additions & 0 deletions rx/observable-operator-send_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Send", func() {
When("channel is buffered", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Send
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan rx.Item[int], 10)
testObservable[int](ctx, 1, 2, 3, errFoo).Send(ch)
Expect(rx.Of(1)).To(Equal(<-ch))
Expect(rx.Of(2)).To(Equal(<-ch))
Expect(rx.Of(3)).To(Equal(<-ch))
Expect(rx.Error[int](errFoo)).To(Equal(<-ch))
})
})

When("channel is not buffered", func() {
It("🧪 should: ", func() {
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan rx.Item[int])
testObservable[int](ctx, 1, 2, 3, errFoo).Send(ch)
Expect(rx.Of(1)).To(Equal(<-ch))
Expect(rx.Of(2)).To(Equal(<-ch))
Expect(rx.Of(3)).To(Equal(<-ch))
Expect(rx.Error[int](errFoo)).To(Equal(<-ch))
})
})
})
})
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Observable[T any] interface {
Run(opts ...Option[T]) Disposed
Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
Scan(apply Func2[T], opts ...Option[T]) Observable[T]
Send(output chan<- Item[T], opts ...Option[T])
SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T]) Single[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}
Expand Down

0 comments on commit 8f328c2

Please sign in to comment.