Skip to content

Commit

Permalink
feat(rx): add first/first-or-default operators (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 10, 2024
1 parent b302d80 commit e087538
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 0 deletions.
135 changes: 135 additions & 0 deletions rx/observable-operator-first_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("First", func() {
When("not empty", func() {
It("🧪 should: return first item", func() {
// rxgo: Test_Observable_First_NotEmpty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).First()
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 1,
})
})
})

When("empty", func() {
It("🧪 should: return nothing", func() {
// rxgo: Test_Observable_First_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Empty[int]().First()
rx.Assert(ctx, obs, rx.IsEmpty[int]{})
})
})

Context("Parallel", func() {
When("not empty", func() {
It("🧪 should: return first item", func() {
// rxgo: Test_Observable_First_Parallel_NotEmpty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).First(rx.WithCPUPool[int]())
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 1,
})
})
})

When("empty", func() {
It("🧪 should: return nothing", func() {
// rxgo: Test_Observable_First_Parallel_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Empty[int]().First(rx.WithCPUPool[int]())
rx.Assert(ctx, obs, rx.IsEmpty[int]{})
})
})
})
})

Context("FirstOrDefault", func() {
When("not empty", func() {
It("🧪 should: return first item", func() {
// rxgo: Test_Observable_First_NotEmpty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).FirstOrDefault(10)
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 1,
})
})
})

When("empty", func() {
It("🧪 should: return default", func() {
// rxgo: Test_Observable_First_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Empty[int]().FirstOrDefault(10)
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 10,
})
})
})

Context("Parallel", func() {
When("not empty", func() {
It("🧪 should: return first item", func() {
// rxgo: Test_Observable_First_Parallel_NotEmpty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).FirstOrDefault(10, rx.WithCPUPool[int]())
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 1,
})
})
})

When("empty", func() {
It("🧪 should: return default ", func() {
// rxgo: Test_Observable_First_Parallel_Empty
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Empty[int]().FirstOrDefault(10, rx.WithCPUPool[int]())
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 10,
})
})
})
})
})
})
84 changes: 84 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,90 @@ func (op *findOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T]) {
}

// First returns new Observable which emit only first item.
// Cannot be run in parallel.
func (o *ObservableImpl[T]) First(opts ...Option[T]) OptionalSingle[T] {
const (
forceSeq = true
bypassGather = false
)

return optionalSingle(o.parent, o, func() operator[T] {
return &firstOperator[T]{}
}, forceSeq, bypassGather, opts...)
}

type firstOperator[T any] struct{}

func (op *firstOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
item.SendContext(ctx, dst)
operatorOptions.stop()
}

func (op *firstOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *firstOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *firstOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// FirstOrDefault returns new Observable which emit only first item.
// If the observable fails to emit any items, it emits a default value.
// Cannot be run in parallel.
func (o *ObservableImpl[T]) FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T] {
const (
forceSeq = true
bypassGather = false
)

return single(o.parent, o, func() operator[T] {
return &firstOrDefaultOperator[T]{
defaultValue: defaultValue,
}
}, forceSeq, bypassGather, opts...)
}

type firstOrDefaultOperator[T any] struct {
defaultValue T
sent bool
}

func (op *firstOrDefaultOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
item.SendContext(ctx, dst)

op.sent = true

operatorOptions.stop()
}

func (op *firstOrDefaultOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *firstOrDefaultOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
if !op.sent {
Of(op.defaultValue).SendContext(ctx, dst)
}
}

func (op *firstOrDefaultOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// !!!

// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
Expand Down
2 changes: 2 additions & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Observable[T any] interface {
Errors(opts ...Option[T]) []error
Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]
First(opts ...Option[T]) OptionalSingle[T]
FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T]
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Map(apply Func[T], opts ...Option[T]) Observable[T]
Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Expand Down

0 comments on commit e087538

Please sign in to comment.