From ad4607c4539af250752d24143211484f816444a8 Mon Sep 17 00:00:00 2001 From: plastikfan Date: Sun, 14 Apr 2024 21:04:17 +0100 Subject: [PATCH] feat(rx): window with operators (#218) --- rx/observable-operator-window-with_test.go | 180 ++++++++++++ rx/observable-operator.go | 319 ++++++++++++++++++++- rx/observable.go | 3 + 3 files changed, 501 insertions(+), 1 deletion(-) create mode 100644 rx/observable-operator-window-with_test.go diff --git a/rx/observable-operator-window-with_test.go b/rx/observable-operator-window-with_test.go new file mode 100644 index 0000000..9182d1f --- /dev/null +++ b/rx/observable-operator-window-with_test.go @@ -0,0 +1,180 @@ +package rx_test + +import ( + "context" + "time" + + "github.com/fortytw2/leaktest" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + + "github.com/snivilised/lorax/rx" +) + +var _ = Describe("Observable operator", func() { + Context("WindowWithCount", func() { + When("principle", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithCount + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + observe := testObservable[int](ctx, 1, 2, 3, 4, 5).WindowWithCount(2).Observe() + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItems[int]{ + Expected: []int{1, 2}, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItems[int]{ + Expected: []int{3, 4}, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItem[int]{ + Expected: 5, + }, + ) + }) + }) + + When("Zero count", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithCount_ZeroCount + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + observe := testObservable[int](ctx, 1, 2, 3, 4, 5).WindowWithCount(0).Observe() + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItems[int]{ + Expected: []int{1, 2, 3, 4, 5}, + }, + ) + }) + }) + + Context("Errors", func() { + When("error", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithCount_ObservableError + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + observe := testObservable[int](ctx, 1, 2, errFoo, 4, 5).WindowWithCount(2).Observe() + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItems[int]{ + Expected: []int{1, 2}, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.IsEmpty[int]{}, + rx.HasError[int]{ + Expected: []error{errFoo}, + }, + ) + }) + }) + + When("error", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithCount_InputError + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Empty[int]().WindowWithCount(-1) + rx.Assert(ctx, obs, + rx.HasAnError[int]{}, + ) + }) + }) + }) + }) + + Context("WindowWithTime", func() { + When("principle", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithTime + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan rx.Item[int], 10) + ch <- rx.Of(1) + ch <- rx.Of(2) + obs := rx.FromChannel(ch) + + go func() { + time.Sleep(30 * time.Millisecond) + ch <- rx.Of(3) + close(ch) + }() + + observe := obs.WindowWithTime( + rx.WithDuration(10*time.Millisecond), + rx.WithBufferedChannel[int](10), + ).Observe() + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItems[int]{ + Expected: []int{1, 2}, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItem[int]{ + Expected: 3, + }, + ) + }) + }) + }) + + Context("WindowWithTimeOrCount", func() { + When("principle", func() { + It("🧪 should: ", func() { + // rxgo: Test_Observable_WindowWithTimeOrCount + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan rx.Item[int], 10) + ch <- rx.Of(1) + ch <- rx.Of(2) + obs := rx.FromChannel(ch) + + go func() { + time.Sleep(30 * time.Millisecond) + ch <- rx.Of(3) + close(ch) + }() + + observe := obs.WindowWithTimeOrCount( + rx.WithDuration(10*time.Millisecond), 1, + rx.WithBufferedChannel[int](10), + ).Observe() + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItem[int]{ + Expected: 1, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItem[int]{ + Expected: 2, + }, + ) + rx.Assert(ctx, (<-observe).O.(rx.Observable[int]), + rx.HasItem[int]{ + Expected: 3, + }, + ) + }) + }) + }) +}) diff --git a/rx/observable-operator.go b/rx/observable-operator.go index 6c11590..6a32be9 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -4,6 +4,7 @@ import ( "container/ring" "context" "fmt" + "sync" "sync/atomic" "time" @@ -2465,7 +2466,323 @@ func (op *timestampOperator[T]) gatherNext(_ context.Context, _ Item[T], _ chan<- Item[T], _ operatorOptions[T]) { } -// !!! +// WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows +// rather than emitting the items one at a time. +func (o *ObservableImpl[T]) WindowWithCount(count int, opts ...Option[T]) Observable[T] { + const ( + forceSeq = true + bypassGather = false + ) + + if count < 0 { + return Thrown[T](IllegalInputError{ + error: "count must be positive or nil", + }) + } + + option := parseOptions(opts...) + + return observable(o.parent, o, func() operator[T] { + return &windowWithCountOperator[T]{ + count: count, + option: option, + } + }, forceSeq, bypassGather, opts...) +} + +type windowWithCountOperator[T any] struct { + count int + iCount int + currentChannel chan Item[T] + option Option[T] +} + +func (op *windowWithCountOperator[T]) pre(ctx context.Context, dst chan<- Item[T]) { + if op.currentChannel == nil { + ch := op.option.buildChannel() + op.currentChannel = ch + + Opaque[T](FromChannel(ch)).SendContext(ctx, dst) + } +} + +func (op *windowWithCountOperator[T]) post(ctx context.Context, dst chan<- Item[T]) { + if op.iCount == op.count { + op.iCount = 0 + close(op.currentChannel) + + ch := op.option.buildChannel() + op.currentChannel = ch + Opaque[T](FromChannel(ch)).SendContext(ctx, dst) + } +} + +func (op *windowWithCountOperator[T]) next(ctx context.Context, item Item[T], + dst chan<- Item[T], _ operatorOptions[T], +) { + op.pre(ctx, dst) + op.currentChannel <- item + + op.iCount++ + + op.post(ctx, dst) +} + +func (op *windowWithCountOperator[T]) err(ctx context.Context, item Item[T], + dst chan<- Item[T], operatorOptions operatorOptions[T]) { + op.pre(ctx, dst) + + op.currentChannel <- item + + op.iCount++ + + op.post(ctx, dst) + operatorOptions.stop() +} + +func (op *windowWithCountOperator[T]) end(_ context.Context, _ chan<- Item[T]) { + if op.currentChannel != nil { + close(op.currentChannel) + } +} + +func (op *windowWithCountOperator[T]) gatherNext(_ context.Context, _ Item[T], + _ chan<- Item[T], _ operatorOptions[T], +) { +} + +// WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows +// and emit them rather than emitting the items one at a time. +func (o *ObservableImpl[T]) WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T] { + if timespan == nil { + return Thrown[T](IllegalInputError{ + error: "timespan must no be nil", + }) + } + + f := func(ctx context.Context, next chan Item[T], option Option[T], opts ...Option[T]) { + observe := o.Observe(opts...) + ch := option.buildChannel() + done := make(chan struct{}) + empty := true + mutex := sync.Mutex{} + + if !Opaque[T](FromChannel(ch)).SendContext(ctx, next) { + return + } + + go func() { + defer func() { + mutex.Lock() + close(ch) + mutex.Unlock() + }() + defer close(next) + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case <-time.After(timespan.duration()): + mutex.Lock() + + if empty { + mutex.Unlock() + continue + } + + close(ch) + + empty = true + + ch = option.buildChannel() + + if !Opaque[T](FromChannel(ch)).SendContext(ctx, next) { + close(done) + + return + } + mutex.Unlock() + } + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case item, ok := <-observe: + if !ok { + close(done) + + return + } + + if item.IsError() { + mutex.Lock() + + if !item.SendContext(ctx, ch) { + mutex.Unlock() + close(done) + + return + } + mutex.Unlock() + + if option.getErrorStrategy() == StopOnError { + close(done) + + return + } + } + + mutex.Lock() + if !item.SendContext(ctx, ch) { + mutex.Unlock() + + return + } + + empty = false + + mutex.Unlock() + } + } + } + + return customObservableOperator(o.parent, f, opts...) +} + +// WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size +// and emit them rather than emitting the items one at a time. +func (o *ObservableImpl[T]) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T]) Observable[T] { + if timespan == nil { + return Thrown[T](IllegalInputError{ + error: "timespan must not be nil", + }) + } + + if count < 0 { + return Thrown[T](IllegalInputError{ + error: "count must be positive or nil", + }) + } + + f := func(ctx context.Context, next chan Item[T], option Option[T], opts ...Option[T]) { + observe := o.Observe(opts...) + ch := option.buildChannel() + done := make(chan struct{}) + mutex := sync.Mutex{} + iCount := 0 + + if !Opaque[T](FromChannel(ch)).SendContext(ctx, next) { + return + } + + go func() { + defer func() { + mutex.Lock() + close(ch) + mutex.Unlock() + }() + defer close(next) + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case <-time.After(timespan.duration()): + mutex.Lock() + if iCount == 0 { + mutex.Unlock() + + continue + } + + close(ch) + + iCount = 0 + ch = option.buildChannel() + + if !Opaque[T](FromChannel(ch)).SendContext(ctx, next) { + close(done) + + return + } + mutex.Unlock() + } + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case item, ok := <-observe: + if !ok { + close(done) + + return + } + + if item.IsError() { + mutex.Lock() + if !item.SendContext(ctx, ch) { + mutex.Unlock() + + close(done) + + return + } + mutex.Unlock() + + if option.getErrorStrategy() == StopOnError { + close(done) + + return + } + } + + mutex.Lock() + + if !item.SendContext(ctx, ch) { + mutex.Unlock() + + return + } + + iCount++ + + if iCount == count { + close(ch) + + iCount = 0 + ch = option.buildChannel() + + if !Opaque[T](FromChannel(ch)).SendContext(ctx, next) { + mutex.Unlock() + close(done) + + return + } + } + mutex.Unlock() + } + } + } + + return customObservableOperator(o.parent, f, opts...) +} + +// <<< // ToSlice collects all items from an Observable and emit them in a slice and // an optional error. Cannot be run in parallel. diff --git a/rx/observable.go b/rx/observable.go index e64df72..c8c11f6 100644 --- a/rx/observable.go +++ b/rx/observable.go @@ -64,6 +64,9 @@ type Observable[T any] interface { TimeInterval(opts ...Option[T]) Observable[T] Timestamp(opts ...Option[T]) Observable[T] ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error) + WindowWithCount(count int, opts ...Option[T]) Observable[T] + WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T] + WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T]) Observable[T] } // ObservableImpl implements Observable.