Skip to content

Commit

Permalink
[Observables] chore: add `ReplayObservable#SubscribeFromLatestBuffere…
Browse files Browse the repository at this point in the history
…dOffset()` (#647)

Co-authored-by: Daniel Olshansky <olshansky.daniel@gmail.com>
  • Loading branch information
bryanchriswhite and Olshansk authored Jul 12, 2024
1 parent 4ad0ab0 commit 208a228
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
28 changes: 26 additions & 2 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,42 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
// It replays the values stored in the replay buffer in the order of their arrival
// before emitting new values.
func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
return ro.SubscribeFromLatestBufferedOffset(ctx, ro.replayBufferSize)
}

// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
// values in the replay buffer, starting from the latest buffered value at index 'offset'.
//
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferSize or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
func (ro *replayObservable[V]) SubscribeFromLatestBufferedOffset(
ctx context.Context,
endOffset int,
) observable.Observer[V] {
obs, ch := NewObservable[V]()
ctx, cancel := context.WithCancel(ctx)

go func() {
// Replay the values stored in the buffer form the oldest to the newest.
ro.replayBufferMu.RLock()
for i := len(ro.replayBuffer) - 1; i >= 0; i-- {

// Ensure that the offset is within the bounds of the replay buffer.
if endOffset > len(ro.replayBuffer) {
endOffset = len(ro.replayBuffer)
}

// Replay the values stored in the buffer form the oldest to the newest.
for i := endOffset - 1; i >= 0; i-- {
ch <- ro.replayBuffer[i]
}

bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch()
ro.replayBufferMu.RUnlock()

// Since bufferingObsvbl emits all buffered values in one notification
// and the replay buffer has already been replayed, only the most recent
// value needs to be published
Expand Down
76 changes: 76 additions & 0 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/testerrors"
)
Expand Down Expand Up @@ -300,3 +301,78 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) {
require.ElementsMatch(t, []int{5, 4, 3, 2}, replayObsvbl.Last(ctx, 4))
require.ElementsMatch(t, []int{5, 4, 3, 2, 1}, replayObsvbl.Last(ctx, 5))
}

func TestReplayObservable_SubscribeFromLatestBufferedOffset(t *testing.T) {
receiveTimeout := 100 * time.Millisecond
values := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

tests := []struct {
name string
replayBufferSize int
endOffset int
expectedValues []int
}{
{
name: "endOffset = replayBufferSize",
replayBufferSize: 8,
endOffset: 8,
expectedValues: values[2:], // []int{2, 3, 4, 5, ..., 9},
},
{
name: "endOffset < replayBufferSize",
replayBufferSize: 10,
endOffset: 2,
expectedValues: values[8:], // []int{8, 9},
},
{
name: "endOffset > replayBufferSize",
replayBufferSize: 8,
endOffset: 10,
expectedValues: values[2:],
},
{
name: "replayBufferSize > endOffset > numBufferedValues ",
replayBufferSize: 20,
endOffset: 15,
expectedValues: values,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var ctx = context.Background()

replayObsvbl, publishCh :=
channel.NewReplayObservable[int](ctx, test.replayBufferSize)

for _, value := range values {
publishCh <- value
time.Sleep(time.Millisecond)
}

observer := replayObsvbl.SubscribeFromLatestBufferedOffset(ctx, test.endOffset)
// Assumes all values will be received within receiveTimeout.
actualValues := accumulateValues(observer, receiveTimeout)
require.EqualValues(t, test.expectedValues, actualValues)
})
}
}

func accumulateValues[V any](
observer observable.Observer[V],
timeout time.Duration,
) (values []V) {
for {
select {
case value, ok := <-observer.Ch():
if !ok {
return
}

values = append(values, value)
continue
case <-time.After(timeout):
return
}
}
}
11 changes: 11 additions & 0 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import "context"
// to new observers, before publishing new values to observers.
type ReplayObservable[V any] interface {
Observable[V]
// SubscribeFromLatestBufferedOffset returns an observer which is initially notified of
// values in the replay buffer, starting from the latest buffered value at index 'offset'.
//
// After this range of the replay buffer is notified, the observer continues to be notified,
// in real-time, when the publishCh channel receives a new value.
//
// If offset is greater than replayBufferSize or the number of elements it currently contains,
// the observer is notified of all elements in the replayBuffer, starting from the beginning.
//
// Passing 0 for offset is equivalent to calling Subscribe() on a non-replay observable.
SubscribeFromLatestBufferedOffset(ctx context.Context, offset int) Observer[V]
// Last synchronously returns the last n values from the replay buffer with
// LIFO ordering
Last(ctx context.Context, n int) []V
Expand Down

0 comments on commit 208a228

Please sign in to comment.