From 615f9a934d5b6529e70caa77b085de61696dcfd4 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Sun, 4 Sep 2022 13:46:09 -0700 Subject: [PATCH] updates --- refreshable/async.go | 100 ++++++++++++++++ refreshable/refreshable.go | 126 ++++----------------- refreshable/refreshable_default.go | 23 +++- refreshable/refreshable_default_test.go | 19 +++- refreshable/refreshable_validating.go | 106 +++++++---------- refreshable/refreshable_validating_test.go | 45 +++++--- 6 files changed, 228 insertions(+), 191 deletions(-) create mode 100644 refreshable/async.go diff --git a/refreshable/async.go b/refreshable/async.go new file mode 100644 index 00000000..7ca9a2f8 --- /dev/null +++ b/refreshable/async.go @@ -0,0 +1,100 @@ +// Copyright (c) 2022 Palantir Technologies. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package refreshable + +import ( + "context" + "time" +) + +type ready[T any] struct { + in Updatable[T] + readyC <-chan struct{} + cancel context.CancelFunc +} + +func newReady[T any](in Updatable[T]) *ready[T] { + ctx, cancel := context.WithCancel(context.Background()) + return &ready[T]{ + in: in, + readyC: ctx.Done(), + cancel: cancel, + } +} + +func (r *ready[T]) Current() T { + return r.in.Current() +} + +func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return r.in.Subscribe(consumer) +} + +func (r *ready[T]) ReadyC() <-chan struct{} { + return r.readyC +} + +func (r *ready[T]) Update(val T) { + r.cancel() + r.in.Update(val) +} + +// NewFromChannel populates an Updatable with the values channel. +// If an element is already available, the returned Value is guaranteed to be populated. +// The channel should be closed when no longer used to avoid leaking resources. +func NewFromChannel[T any](values <-chan T) Ready[T] { + out := newReady(newZero[T]()) + select { + case initial, ok := <-values: + if !ok { + return out // channel already closed + } + out.Update(initial) + default: + } + go func() { + for value := range values { + out.Update(value) + } + }() + return out +} + +// NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval. +// If the providers bool return is false, the value is ignored. +// The result's ReadyC channel is closed when a new value is populated. +func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { + out := newReady(newZero[T]()) + ctx, cancel := context.WithCancel(context.Background()) + values := make(chan T) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + defer close(values) + for { + if value, ok := provider(); ok { + out.Update(value) + } + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } + }() + return out, UnsubscribeFunc(cancel) +} + +// Wait waits until the Ready has a current value or the context expires. +func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { + select { + case <-ready.ReadyC(): + return ready.Current(), true + case <-ctx.Done(): + var zero T + return zero, false + } +} diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index e59da113..eb5c3499 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -4,11 +4,6 @@ package refreshable -import ( - "context" - "time" -) - // A Refreshable is a generic container type for a volatile underlying value. // It supports atomic access and user-provided callback "subscriptions" on updates. type Refreshable[T any] interface { @@ -17,7 +12,8 @@ type Refreshable[T any] interface { Current() T // Subscribe calls the consumer function when Value updates until stop is closed. - // The consumer should be relatively fast: Updatable.Set blocks until all subscribers have returned. + // The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned. + // Expensive or error-prone responses to refreshed values should be asynchronous. // Updates considered no-ops by reflect.DeepEqual may be skipped. Subscribe(consumer func(T)) UnsubscribeFunc } @@ -37,7 +33,7 @@ type Validated[T any] interface { Refreshable[T] // Validation returns the result of the most recent validation. // If the last value was valid, Validation returns the same value as Current and a nil error. - // If the last value was invalid, it and the validation error are returned. Current returns the newest valid value. + // If the last value was invalid, it and the error are returned. Current returns the most recent valid value. Validation() (T, error) } @@ -49,110 +45,36 @@ type Ready[T any] interface { ReadyC() <-chan struct{} } +// UnsubscribeFunc removes a subscription from a refreshable's internal tracking and/or stops its update routine. +// It is safe to call multiple times. type UnsubscribeFunc func() -func New[T any](val T) *DefaultRefreshable[T] { - d := new(DefaultRefreshable[T]) - d.current.Store(&val) - return d +func New[T any](val T) Updatable[T] { + return newDefault(val) } // Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. -func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { - out := New(mapFn(t.Current())) - unsubscribe := t.Subscribe(func(v T) { +func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { + out := New(mapFn(original.Current())) + stop := original.Subscribe(func(v T) { out.Update(mapFn(v)) }) - return out, unsubscribe -} - -func SubscribeWithCurrent[T any](r Refreshable[T], consumer func(T)) UnsubscribeFunc { - unsubscribe := r.Subscribe(consumer) - consumer(r.Current()) - return unsubscribe -} - -// UpdateFromChannel populates an Updatable with the values channel. -// If an element is already available, the returned Value is guaranteed to be populated. -// The channel should be closed when no longer used to avoid leaking resources. -func UpdateFromChannel[T any](in Updatable[T], values <-chan T) Ready[T] { - out := newReady(in) - select { - case initial, ok := <-values: - if !ok { - return out // channel already closed - } - out.Update(initial) - default: - } - - go func() { - for value := range values { - out.Update(value) - } - }() - - return out -} - -// UpdateFromTickerFunc returns a Refreshable populated by the result of the provider called each interval. -// If the providers bool return is false, the value is ignored. -func UpdateFromTickerFunc[T any](in Updatable[T], interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { - out := newReady(in) - ctx, cancel := context.WithCancel(context.Background()) - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - if value, ok := provider(); ok { - out.Update(value) - } - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } - }() - return out, UnsubscribeFunc(cancel) -} - -// Wait waits until the Ready has a current value or the context expires. -func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { - select { - case <-ready.ReadyC(): - return ready.Current(), true - case <-ctx.Done(): - var zero T - return zero, false - } -} - -type ready[T any] struct { - in Updatable[T] - readyC <-chan struct{} - cancel context.CancelFunc -} - -func newReady[T any](in Updatable[T]) *ready[T] { - ctx, cancel := context.WithCancel(context.Background()) - return &ready[T]{in: in, readyC: ctx.Done(), cancel: cancel} -} - -func (r *ready[T]) Current() T { - return r.in.Current() -} - -func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { - return r.in.Subscribe(consumer) + out.Update(mapFn(original.Current())) + return out, stop } -func (r *ready[T]) ReadyC() <-chan struct{} { - return r.readyC +// MapWithError is similar to Validate but allows for the function to return a mapping/mutation +// of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value. +// An error is returned if the current original value fails to map. +func MapWithError[T any, M any](original Refreshable[T], mapFn func(T) (M, error)) (Validated[M], UnsubscribeFunc, error) { + v, stop := newValidRefreshable(original, mapFn) + _, err := v.Validation() + return v, stop, err } -func (r *ready[T]) Update(val T) { - r.cancel() - r.in.Update(val) +// Validate returns a new Refreshable that returns the latest original value accepted by the validatingFn. +// If the upstream value results in an error, it is reported by Validation(). +// An error is returned if the current original value is invalid. +func Validate[T any](original Refreshable[T], validatingFn func(T) error) (Validated[T], UnsubscribeFunc, error) { + return MapWithError(original, identity(validatingFn)) } diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index eb534e98..fbaf831b 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -10,14 +10,27 @@ import ( "sync/atomic" ) -type DefaultRefreshable[T any] struct { +type defaultRefreshable[T any] struct { mux sync.Mutex current atomic.Value subscribers []*func(T) } +func newDefault[T any](val T) Updatable[T] { + d := new(defaultRefreshable[T]) + d.current.Store(&val) + return d +} + +func newZero[T any]() Updatable[T] { + d := new(defaultRefreshable[T]) + var zero T + d.current.Store(&zero) + return d +} + // Update changes the value of the Refreshable, then blocks while subscribers are executed. -func (d *DefaultRefreshable[T]) Update(val T) { +func (d *defaultRefreshable[T]) Update(val T) { d.mux.Lock() defer d.mux.Unlock() old := d.current.Swap(&val) @@ -29,11 +42,11 @@ func (d *DefaultRefreshable[T]) Update(val T) { } } -func (d *DefaultRefreshable[T]) Current() T { +func (d *defaultRefreshable[T]) Current() T { return *(d.current.Load().(*T)) } -func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { +func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { d.mux.Lock() defer d.mux.Unlock() @@ -42,7 +55,7 @@ func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { return d.unsubscribe(consumerFnPtr) } -func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc { +func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc { return func() { d.mux.Lock() defer d.mux.Unlock() diff --git a/refreshable/refreshable_default_test.go b/refreshable/refreshable_default_test.go index bd0692d7..3116dc06 100644 --- a/refreshable/refreshable_default_test.go +++ b/refreshable/refreshable_default_test.go @@ -12,7 +12,9 @@ import ( ) func TestDefaultRefreshable(t *testing.T) { - type container struct{ Value string } + type container struct { + Value string + } v := &container{Value: "original"} r := refreshable.New(v) @@ -46,13 +48,22 @@ func TestDefaultRefreshable(t *testing.T) { t.Run("Map", func(t *testing.T) { r.Update(&container{Value: "value"}) - m, _ := refreshable.Map[*container, int](r, func(i *container) int { + rLen, _ := refreshable.Map[*container, int](r, func(i *container) int { return len(i.Value) }) - assert.Equal(t, m.Current(), 5) + assert.Equal(t, 5, rLen.Current()) + + rLenUpdated := false + rLen.Subscribe(func(int) { rLenUpdated = true }) + // update to new value with same length and ensure the + // equality check prevented unnecessary subscriber updates. + r.Update(&container{Value: "VALUE"}) + assert.Equal(t, "VALUE", r.Current().Value) + assert.False(t, rLenUpdated) r.Update(&container{Value: "updated"}) - assert.Equal(t, m.Current(), 7) + assert.Equal(t, "updated", r.Current().Value) + assert.Equal(t, 7, rLen.Current()) }) } diff --git a/refreshable/refreshable_validating.go b/refreshable/refreshable_validating.go index 32ec9105..da96ed77 100644 --- a/refreshable/refreshable_validating.go +++ b/refreshable/refreshable_validating.go @@ -1,85 +1,57 @@ -// Copyright (c) 2021 Palantir Technologies. All rights reserved. +// Copyright (c) 2022 Palantir Technologies. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package refreshable -import ( - "errors" - "sync/atomic" -) - -type ValidatingRefreshable[T any] struct { - Refreshable[T] - lastValidateErr *atomic.Value +type validRefreshable[T any] struct { + r Updatable[validRefreshableContainer[T]] } -// this is needed to be able to store the absence of an error in an atomic.Value -type errorWrapper struct { - err error +type validRefreshableContainer[T any] struct { + validated T + unvalidated T + lastErr error } -func (v *ValidatingRefreshable[T]) LastValidateErr() error { - return v.lastValidateErr.Load().(errorWrapper).err -} +func (v *validRefreshable[T]) Current() T { return v.r.Current().validated } -// NewValidatingRefreshable returns a new Refreshable whose current value is the latest value that passes the provided -// validatingFn successfully. This returns an error if the current value of the passed in Refreshable does not pass the -// validatingFn or if the validatingFn or Refreshable are nil. -func NewValidatingRefreshable[T any](origRefreshable Refreshable[T], validatingFn func(T) error) (*ValidatingRefreshable[T], error) { - mappingFn := func(i T) (T, error) { - if err := validatingFn(i); err != nil { - var zero T - return zero, err - } - return i, nil - } - return newValidatingRefreshable(origRefreshable, mappingFn) +func (v *validRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return v.r.Subscribe(func(val validRefreshableContainer[T]) { + consumer(val.validated) + }) } -// NewMapValidatingRefreshable is similar to NewValidatingRefreshable but allows for the function to return a mapping/mutation -// of the input object in addition to returning an error. The returned ValidatingRefreshable will contain the mapped value. -// The mapped value must always be of the same type (but not necessarily that of the input type). -func NewMapValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { - return newValidatingRefreshable(origRefreshable, mappingFn) +// Validation returns the most recent upstream Refreshable and its validation result. +// If nil, the validRefreshable is up-to-date with its original. +func (v *validRefreshable[T]) Validation() (T, error) { + c := v.r.Current() + return c.unvalidated, c.lastErr } -func newValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { - if mappingFn == nil { - return nil, errors.New("failed to create validating Refreshable because the validating function was nil") - } - if origRefreshable == nil { - return nil, errors.New("failed to create validating Refreshable because the passed in Refreshable was nil") - } - - var validatedRefreshable *DefaultRefreshable[M] - currentVal := origRefreshable.Current() - mappedVal, err := mappingFn(currentVal) - if err != nil { - return nil, err - } - validatedRefreshable = New(mappedVal) - - var lastValidateErr atomic.Value - lastValidateErr.Store(errorWrapper{}) - v := ValidatingRefreshable[M]{ - Refreshable: validatedRefreshable, - lastValidateErr: &lastValidateErr, - } +func newValidRefreshable[T any, M any](original Refreshable[T], mappingFn func(T) (M, error)) (*validRefreshable[M], UnsubscribeFunc) { + valid := &validRefreshable[M]{r: New(validRefreshableContainer[M]{})} + stop := original.Subscribe(func(valueT T) { + updateValidRefreshable(valid, valueT, mappingFn) + }) + updateValidRefreshable(valid, original.Current(), mappingFn) + return valid, stop +} - updateValueFn := func(i T) { - mappedVal, err := mappingFn(i) - v.lastValidateErr.Store(errorWrapper{err}) - if err == nil { - validatedRefreshable.Update(mappedVal) - } +func updateValidRefreshable[T any, M any](valid *validRefreshable[M], value T, mapFn func(T) (M, error)) { + validated := valid.r.Current().validated + unvalidated, err := mapFn(value) + if err == nil { + validated = unvalidated } + valid.r.Update(validRefreshableContainer[M]{ + validated: validated, + unvalidated: unvalidated, + lastErr: err, + }) +} - origRefreshable.Subscribe(updateValueFn) - - // manually update value after performing subscription. This ensures that, if the current value changed between when - // it was fetched earlier in the function and when the subscription was performed, it is properly captured. - updateValueFn(origRefreshable.Current()) - - return &v, nil +// identity is a validating map function that returns its input argument type. +func identity[T any](validatingFn func(T) error) func(i T) (T, error) { + return func(i T) (T, error) { return i, validatingFn(i) } } diff --git a/refreshable/refreshable_validating_test.go b/refreshable/refreshable_validating_test.go index eb5a9875..221c0d9f 100644 --- a/refreshable/refreshable_validating_test.go +++ b/refreshable/refreshable_validating_test.go @@ -17,48 +17,56 @@ import ( func TestValidatingRefreshable(t *testing.T) { type container struct{ Value string } r := refreshable.New(container{Value: "value"}) - vr, err := refreshable.NewValidatingRefreshable[container](r, func(i container) error { + vr, _, err := refreshable.Validate[container](r, func(i container) error { if len(i.Value) == 0 { return errors.New("empty") } return nil }) require.NoError(t, err) - require.NoError(t, vr.LastValidateErr()) - require.Equal(t, r.Current().Value, "value") - require.Equal(t, vr.Current().Value, "value") + v, err := vr.Validation() + require.NoError(t, err) + require.Equal(t, "value", v.Value) + require.Equal(t, "value", r.Current().Value) + require.Equal(t, "value", vr.Current().Value) // attempt bad update r.Update(container{}) require.Equal(t, r.Current().Value, "") - - require.EqualError(t, vr.LastValidateErr(), "empty", "expected err from validating refreshable") + v, err = vr.Validation() + require.EqualError(t, err, "empty", "expected validation error") + require.Equal(t, "", v.Value, "expected invalid value from Validation") require.Equal(t, vr.Current().Value, "value", "expected unchanged validating refreshable") // attempt good update r.Update(container{Value: "value2"}) - require.NoError(t, vr.LastValidateErr()) + v, err = vr.Validation() + require.NoError(t, err) + require.Equal(t, "value2", v.Value) require.Equal(t, "value2", vr.Current().Value) require.Equal(t, "value2", r.Current().Value) } func TestMapValidatingRefreshable(t *testing.T) { r := refreshable.New("https://palantir.com:443") - vr, err := refreshable.NewMapValidatingRefreshable[string, *url.URL](r, url.Parse) + vr, _, err := refreshable.MapWithError[string, *url.URL](r, url.Parse) + require.NoError(t, err) + _, err = vr.Validation() require.NoError(t, err) - require.NoError(t, vr.LastValidateErr()) require.Equal(t, r.Current(), "https://palantir.com:443") require.Equal(t, vr.Current().Hostname(), "palantir.com") // attempt bad update r.Update(":::error.com") assert.Equal(t, r.Current(), ":::error.com") - require.EqualError(t, vr.LastValidateErr(), "parse \":::error.com\": missing protocol scheme", "expected err from validating refreshable") + _, err = vr.Validation() + require.EqualError(t, err, "parse \":::error.com\": missing protocol scheme", "expected err from validating refreshable") assert.Equal(t, vr.Current().Hostname(), "palantir.com", "expected unchanged validating refreshable") // attempt good update r.Update("https://example.com") - require.NoError(t, vr.LastValidateErr()) + _, err = vr.Validation() + require.NoError(t, err) require.Equal(t, r.Current(), "https://example.com") require.Equal(t, vr.Current().Hostname(), "example.com") } @@ -67,15 +75,26 @@ func TestMapValidatingRefreshable(t *testing.T) { // if the underlying refreshable updates during the creation process. func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { r := &updateImmediatelyRefreshable{r: refreshable.New(1), newValue: 2} - vr, err := refreshable.NewValidatingRefreshable[int](r, func(i int) error { return nil }) + var seen1, seen2 bool + vr, _, err := refreshable.Validate[int](r, func(i int) error { + switch i { + case 1: + seen1 = true + case 2: + seen2 = true + } + return nil + }) require.NoError(t, err) + assert.True(t, seen1, "expected to process 1 value") + assert.True(t, seen2, "expected to process 2 value") // If this returns 1, it is likely because the VR contains a stale value assert.Equal(t, 2, vr.Current()) } // updateImmediatelyRefreshable is a mock implementation which updates to newValue immediately when Current() is called type updateImmediatelyRefreshable struct { - r *refreshable.DefaultRefreshable[int] + r refreshable.Updatable[int] newValue int }