Skip to content

Commit

Permalink
feat(rx): add repeat operator (#189) (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan authored Apr 12, 2024
1 parent d1ebe83 commit 9984e1b
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 1 deletion.
116 changes: 116 additions & 0 deletions rx/observable-operator-repeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package rx_test

import (
"context"
"errors"
"time"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Repeat", func() {
When("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Repeat
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

repeat := testObservable[int](ctx, 1, 2, 3).Repeat(1, nil)
rx.Assert(ctx, repeat, rx.HasItems[int]{
Expected: []int{1, 2, 3, 1, 2, 3},
})
})
})

When("zero", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Repeat_Zero
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

repeat := testObservable[int](ctx, 1, 2, 3).Repeat(0, nil)
rx.Assert(ctx, repeat, rx.HasItems[int]{
Expected: []int{1, 2, 3},
})
})
})

When("infinite", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Repeat_Infinite
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
repeat := testObservable[int](ctx, 1, 2, 3).Repeat(
rx.Infinite, nil, rx.WithContext[int](ctx),
)

go func() {
time.Sleep(50 * time.Millisecond)
cancel()
}()

rx.Assert(ctx, repeat, rx.HasNoError[int]{},
rx.CustomPredicate[int]{
Expected: func(actual rx.AssertResources[int]) error {
items := actual.Values()
if len(items) == 0 {
return errors.New("no items")
}

return nil
},
})
})
})

When("frequency", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Repeat_Frequency
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
frequency := new(mockDuration)
frequency.On("duration").Return(time.Millisecond)
repeat := testObservable[int](ctx, 1, 2, 3).Repeat(1, frequency)
rx.Assert(ctx, repeat, rx.HasItems[int]{
Expected: []int{1, 2, 3, 1, 2, 3},
})
frequency.AssertNumberOfCalls("duration", 1)
frequency.AssertExpectations()
*/
})
})

Context("Errors", func() {
When("Negative Count", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Repeat_NegativeCount
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

repeat := testObservable[int](ctx, 1, 2, 3).Repeat(-2, nil)
rx.Assert(ctx, repeat,
rx.IsEmpty[int]{},
rx.HasAnError[int]{},
)
})
})
})
})
})
75 changes: 75 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rx
import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
)
Expand Down Expand Up @@ -1525,6 +1526,80 @@ func (op *reduceOperator[T]) gatherNext(ctx context.Context, item Item[T],
op.next(ctx, item.O.(*reduceOperator[T]).acc, dst, operatorOptions)
}

// Repeat returns an Observable that repeats the sequence of items emitted
// by the source Observable at most count times, at a particular frequency.
// Cannot run in parallel.
func (o *ObservableImpl[T]) Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T] {
if count != Infinite {
if count < 0 {
return Thrown[T](IllegalInputError{error: "count must be positive"})
}
}

const (
forceSeq = true
bypassGather = false
)

return observable(o.parent, o, func() operator[T] {
return &repeatOperator[T]{
count: count,
frequency: frequency,
seq: make([]Item[T], 0),
}
}, forceSeq, bypassGather, opts...)
}

type repeatOperator[T any] struct {
count int64
frequency Duration
seq []Item[T]
}

func (op *repeatOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
item.SendContext(ctx, dst)
op.seq = append(op.seq, item)
}

func (op *repeatOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *repeatOperator[T]) end(ctx context.Context, dst chan<- Item[T]) {
for {
select {
default:
case <-ctx.Done():
return
}

if op.count != Infinite {
if op.count == 0 {
break
}
}

if op.frequency != nil {
time.Sleep(op.frequency.duration())
}

for _, v := range op.seq {
v.SendContext(ctx, dst)
}

op.count--
}
}

func (op *repeatOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// !!!

// ToSlice collects all items from an Observable and emit them in a slice and
Expand Down
2 changes: 1 addition & 1 deletion rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Observable[T any] interface {
OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
Run(opts ...Option[T]) Disposed
Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]

Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
}

Expand Down

0 comments on commit 9984e1b

Please sign in to comment.