From 208a2288cbbf677f7d87d38c3ea0440ad29d2c64 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 12 Jul 2024 09:11:40 +0200 Subject: [PATCH] [Observables] chore: add `ReplayObservable#SubscribeFromLatestBufferedOffset()` (#647) Co-authored-by: Daniel Olshansky --- pkg/observable/channel/replay.go | 28 +++++++++- pkg/observable/channel/replay_test.go | 76 +++++++++++++++++++++++++++ pkg/observable/interface.go | 11 ++++ 3 files changed, 113 insertions(+), 2 deletions(-) diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index 5b147ee16..12ff136be 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -122,18 +122,42 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { // It replays the values stored in the replay buffer in the order of their arrival // before emitting new values. func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { + return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferSize) +} + +// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of +// values in the replay buffer, starting from the latest buffered value at index 'offset'. +// +// After this range of the replay buffer is notified, the observer continues to be notified, +// in real-time, when the publishCh channel receives a new value. +// +// If offset is greater than replayBufferSize or the number of elements it currently contains, +// the observer is notified of all elements in the replayBuffer, starting from the beginning. +// +// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable. +func (ro *replayObservable[V]) SubscribeFromLatestBufferedOffset( + ctx context.Context, + endOffset int, +) observable.Observer[V] { obs, ch := NewObservable[V]() ctx, cancel := context.WithCancel(ctx) go func() { - // Replay the values stored in the buffer form the oldest to the newest. ro.replayBufferMu.RLock() - for i := len(ro.replayBuffer) - 1; i >= 0; i-- { + + // Ensure that the offset is within the bounds of the replay buffer. + if endOffset > len(ro.replayBuffer) { + endOffset = len(ro.replayBuffer) + } + + // Replay the values stored in the buffer form the oldest to the newest. + for i := endOffset - 1; i >= 0; i-- { ch <- ro.replayBuffer[i] } bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch() ro.replayBufferMu.RUnlock() + // Since bufferingObsvbl emits all buffered values in one notification // and the replay buffer has already been replayed, only the most recent // value needs to be published diff --git a/pkg/observable/channel/replay_test.go b/pkg/observable/channel/replay_test.go index 5839ced97..e5cb93919 100644 --- a/pkg/observable/channel/replay_test.go +++ b/pkg/observable/channel/replay_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/testerrors" ) @@ -300,3 +301,78 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { require.ElementsMatch(t, []int{5, 4, 3, 2}, replayObsvbl.Last(ctx, 4)) require.ElementsMatch(t, []int{5, 4, 3, 2, 1}, replayObsvbl.Last(ctx, 5)) } + +func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) { + receiveTimeout := 100 * time.Millisecond + values := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + + tests := []struct { + name string + replayBufferSize int + endOffset int + expectedValues []int + }{ + { + name: "endOffset = replayBufferSize", + replayBufferSize: 8, + endOffset: 8, + expectedValues: values[2:], // []int{2, 3, 4, 5, ..., 9}, + }, + { + name: "endOffset < replayBufferSize", + replayBufferSize: 10, + endOffset: 2, + expectedValues: values[8:], // []int{8, 9}, + }, + { + name: "endOffset > replayBufferSize", + replayBufferSize: 8, + endOffset: 10, + expectedValues: values[2:], + }, + { + name: "replayBufferSize > endOffset > numBufferedValues ", + replayBufferSize: 20, + endOffset: 15, + expectedValues: values, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var ctx = context.Background() + + replayObsvbl, publishCh := + channel.NewReplayObservable[int](ctx, test.replayBufferSize) + + for _, value := range values { + publishCh <- value + time.Sleep(time.Millisecond) + } + + observer := replayObsvbl.SubscribeFromLatestBufferedOffset(ctx, test.endOffset) + // Assumes all values will be received within receiveTimeout. + actualValues := accumulateValues(observer, receiveTimeout) + require.EqualValues(t, test.expectedValues, actualValues) + }) + } +} + +func accumulateValues[V any]( + observer observable.Observer[V], + timeout time.Duration, +) (values []V) { + for { + select { + case value, ok := <-observer.Ch(): + if !ok { + return + } + + values = append(values, value) + continue + case <-time.After(timeout): + return + } + } +} diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index d5992afc6..e2ee470fc 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -11,6 +11,17 @@ import "context" // to new observers, before publishing new values to observers. type ReplayObservable[V any] interface { Observable[V] + // SubscribeFromLatestBufferedOffset returns an observer which is initially notified of + // values in the replay buffer, starting from the latest buffered value at index 'offset'. + // + // After this range of the replay buffer is notified, the observer continues to be notified, + // in real-time, when the publishCh channel receives a new value. + // + // If offset is greater than replayBufferSize or the number of elements it currently contains, + // the observer is notified of all elements in the replayBuffer, starting from the beginning. + // + // Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable. + SubscribeFromLatestBufferedOffset(ctx context.Context, offset int) Observer[V] // Last synchronously returns the last n values from the replay buffer with // LIFO ordering Last(ctx context.Context, n int) []V