Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoylan committed Sep 4, 2022
1 parent dfa88d4 commit 615f9a9
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 191 deletions.
100 changes: 100 additions & 0 deletions refreshable/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package refreshable

import (
"context"
"time"
)

type ready[T any] struct {
in Updatable[T]
readyC <-chan struct{}
cancel context.CancelFunc
}

func newReady[T any](in Updatable[T]) *ready[T] {
ctx, cancel := context.WithCancel(context.Background())
return &ready[T]{
in: in,
readyC: ctx.Done(),
cancel: cancel,
}
}

func (r *ready[T]) Current() T {
return r.in.Current()
}

func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return r.in.Subscribe(consumer)
}

func (r *ready[T]) ReadyC() <-chan struct{} {
return r.readyC
}

func (r *ready[T]) Update(val T) {
r.cancel()
r.in.Update(val)
}

// NewFromChannel populates an Updatable with the values channel.
// If an element is already available, the returned Value is guaranteed to be populated.
// The channel should be closed when no longer used to avoid leaking resources.
func NewFromChannel[T any](values <-chan T) Ready[T] {
out := newReady(newZero[T]())
select {
case initial, ok := <-values:
if !ok {
return out // channel already closed
}
out.Update(initial)
default:
}
go func() {
for value := range values {
out.Update(value)
}
}()
return out
}

// NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval.
// If the providers bool return is false, the value is ignored.
// The result's ReadyC channel is closed when a new value is populated.
func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) {
out := newReady(newZero[T]())
ctx, cancel := context.WithCancel(context.Background())
values := make(chan T)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer close(values)
for {
if value, ok := provider(); ok {
out.Update(value)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}()
return out, UnsubscribeFunc(cancel)
}

// Wait waits until the Ready has a current value or the context expires.
func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) {
select {
case <-ready.ReadyC():
return ready.Current(), true
case <-ctx.Done():
var zero T
return zero, false
}
}
126 changes: 24 additions & 102 deletions refreshable/refreshable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@

package refreshable

import (
"context"
"time"
)

// A Refreshable is a generic container type for a volatile underlying value.
// It supports atomic access and user-provided callback "subscriptions" on updates.
type Refreshable[T any] interface {
Expand All @@ -17,7 +12,8 @@ type Refreshable[T any] interface {
Current() T

// Subscribe calls the consumer function when Value updates until stop is closed.
// The consumer should be relatively fast: Updatable.Set blocks until all subscribers have returned.
// The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned.
// Expensive or error-prone responses to refreshed values should be asynchronous.
// Updates considered no-ops by reflect.DeepEqual may be skipped.
Subscribe(consumer func(T)) UnsubscribeFunc
}
Expand All @@ -37,7 +33,7 @@ type Validated[T any] interface {
Refreshable[T]
// Validation returns the result of the most recent validation.
// If the last value was valid, Validation returns the same value as Current and a nil error.
// If the last value was invalid, it and the validation error are returned. Current returns the newest valid value.
// If the last value was invalid, it and the error are returned. Current returns the most recent valid value.
Validation() (T, error)
}

Expand All @@ -49,110 +45,36 @@ type Ready[T any] interface {
ReadyC() <-chan struct{}
}

// UnsubscribeFunc removes a subscription from a refreshable's internal tracking and/or stops its update routine.
// It is safe to call multiple times.
type UnsubscribeFunc func()

func New[T any](val T) *DefaultRefreshable[T] {
d := new(DefaultRefreshable[T])
d.current.Store(&val)
return d
func New[T any](val T) Updatable[T] {
return newDefault(val)
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) {
out := New(mapFn(t.Current()))
unsubscribe := t.Subscribe(func(v T) {
func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) {
out := New(mapFn(original.Current()))
stop := original.Subscribe(func(v T) {
out.Update(mapFn(v))
})
return out, unsubscribe
}

func SubscribeWithCurrent[T any](r Refreshable[T], consumer func(T)) UnsubscribeFunc {
unsubscribe := r.Subscribe(consumer)
consumer(r.Current())
return unsubscribe
}

// UpdateFromChannel populates an Updatable with the values channel.
// If an element is already available, the returned Value is guaranteed to be populated.
// The channel should be closed when no longer used to avoid leaking resources.
func UpdateFromChannel[T any](in Updatable[T], values <-chan T) Ready[T] {
out := newReady(in)
select {
case initial, ok := <-values:
if !ok {
return out // channel already closed
}
out.Update(initial)
default:
}

go func() {
for value := range values {
out.Update(value)
}
}()

return out
}

// UpdateFromTickerFunc returns a Refreshable populated by the result of the provider called each interval.
// If the providers bool return is false, the value is ignored.
func UpdateFromTickerFunc[T any](in Updatable[T], interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) {
out := newReady(in)
ctx, cancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
if value, ok := provider(); ok {
out.Update(value)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}()
return out, UnsubscribeFunc(cancel)
}

// Wait waits until the Ready has a current value or the context expires.
func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) {
select {
case <-ready.ReadyC():
return ready.Current(), true
case <-ctx.Done():
var zero T
return zero, false
}
}

type ready[T any] struct {
in Updatable[T]
readyC <-chan struct{}
cancel context.CancelFunc
}

func newReady[T any](in Updatable[T]) *ready[T] {
ctx, cancel := context.WithCancel(context.Background())
return &ready[T]{in: in, readyC: ctx.Done(), cancel: cancel}
}

func (r *ready[T]) Current() T {
return r.in.Current()
}

func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return r.in.Subscribe(consumer)
out.Update(mapFn(original.Current()))
return out, stop
}

func (r *ready[T]) ReadyC() <-chan struct{} {
return r.readyC
// MapWithError is similar to Validate but allows for the function to return a mapping/mutation
// of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value.
// An error is returned if the current original value fails to map.
func MapWithError[T any, M any](original Refreshable[T], mapFn func(T) (M, error)) (Validated[M], UnsubscribeFunc, error) {
v, stop := newValidRefreshable(original, mapFn)
_, err := v.Validation()
return v, stop, err
}

func (r *ready[T]) Update(val T) {
r.cancel()
r.in.Update(val)
// Validate returns a new Refreshable that returns the latest original value accepted by the validatingFn.
// If the upstream value results in an error, it is reported by Validation().
// An error is returned if the current original value is invalid.
func Validate[T any](original Refreshable[T], validatingFn func(T) error) (Validated[T], UnsubscribeFunc, error) {
return MapWithError(original, identity(validatingFn))
}
23 changes: 18 additions & 5 deletions refreshable/refreshable_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@ import (
"sync/atomic"
)

type DefaultRefreshable[T any] struct {
type defaultRefreshable[T any] struct {
mux sync.Mutex
current atomic.Value
subscribers []*func(T)
}

func newDefault[T any](val T) Updatable[T] {
d := new(defaultRefreshable[T])
d.current.Store(&val)
return d
}

func newZero[T any]() Updatable[T] {
d := new(defaultRefreshable[T])
var zero T
d.current.Store(&zero)
return d
}

// Update changes the value of the Refreshable, then blocks while subscribers are executed.
func (d *DefaultRefreshable[T]) Update(val T) {
func (d *defaultRefreshable[T]) Update(val T) {
d.mux.Lock()
defer d.mux.Unlock()
old := d.current.Swap(&val)
Expand All @@ -29,11 +42,11 @@ func (d *DefaultRefreshable[T]) Update(val T) {
}
}

func (d *DefaultRefreshable[T]) Current() T {
func (d *defaultRefreshable[T]) Current() T {
return *(d.current.Load().(*T))
}

func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
d.mux.Lock()
defer d.mux.Unlock()

Expand All @@ -42,7 +55,7 @@ func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return d.unsubscribe(consumerFnPtr)
}

func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc {
func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc {
return func() {
d.mux.Lock()
defer d.mux.Unlock()
Expand Down
19 changes: 15 additions & 4 deletions refreshable/refreshable_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
)

func TestDefaultRefreshable(t *testing.T) {
type container struct{ Value string }
type container struct {
Value string
}

v := &container{Value: "original"}
r := refreshable.New(v)
Expand Down Expand Up @@ -46,13 +48,22 @@ func TestDefaultRefreshable(t *testing.T) {

t.Run("Map", func(t *testing.T) {
r.Update(&container{Value: "value"})
m, _ := refreshable.Map[*container, int](r, func(i *container) int {
rLen, _ := refreshable.Map[*container, int](r, func(i *container) int {
return len(i.Value)
})
assert.Equal(t, m.Current(), 5)
assert.Equal(t, 5, rLen.Current())

rLenUpdated := false
rLen.Subscribe(func(int) { rLenUpdated = true })
// update to new value with same length and ensure the
// equality check prevented unnecessary subscriber updates.
r.Update(&container{Value: "VALUE"})
assert.Equal(t, "VALUE", r.Current().Value)
assert.False(t, rLenUpdated)

r.Update(&container{Value: "updated"})
assert.Equal(t, m.Current(), 7)
assert.Equal(t, "updated", r.Current().Value)
assert.Equal(t, 7, rLen.Current())
})

}
Loading

0 comments on commit 615f9a9

Please sign in to comment.