Skip to content

Commit

Permalink
break: refreshable/v2 with Generic type handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoylan committed Jun 1, 2022
1 parent 64ab9f8 commit 3cfdc4c
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 531 deletions.
2 changes: 1 addition & 1 deletion refreshable/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/palantir/pkg/refreshable
module github.com/palantir/pkg/refreshable/v2

go 1.18

Expand Down
23 changes: 15 additions & 8 deletions refreshable/refreshable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

package refreshable

type Refreshable interface {
// Current returns the most recent value of this Refreshable.
Current() interface{}
type Refreshable[T any] interface {
// Current returns the most recent value of this Supplier.
// If the value has not been initialized, returns T's zero value.
Current() T

// Subscribe subscribes to changes of this Refreshable. The provided function is called with the value of Current()
// whenever the value changes.
Subscribe(consumer func(interface{})) (unsubscribe func())
// Subscribe subscribes to changes of this Supplier.
// The provided function is called with the value of Get() whenever the value changes.
Subscribe(consumer func(T)) (unsubscribe func())
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
Map(func(interface{}) interface{}) Refreshable
// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (m Refreshable[M], unsubscribe func()) {
out := New(mapFn(t.Current()))
unsubscribe = t.Subscribe(func(v T) {
out.Update(mapFn(v))
})
return out, unsubscribe
}
43 changes: 14 additions & 29 deletions refreshable/refreshable_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,47 @@
package refreshable

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)

type DefaultRefreshable struct {
typ reflect.Type
type DefaultRefreshable[T any] struct {
current *atomic.Value

sync.Mutex // protects subscribers
subscribers []*func(interface{})
subscribers []*func(T)
}

func NewDefaultRefreshable(val interface{}) *DefaultRefreshable {
func New[T any](val T) *DefaultRefreshable[T] {
current := atomic.Value{}
current.Store(val)
current.Store(&val)

return &DefaultRefreshable{
return &DefaultRefreshable[T]{
current: &current,
typ: reflect.TypeOf(val),
}
}

func (d *DefaultRefreshable) Update(val interface{}) error {
// Update changes the value of the Refreshable, then blocks while subscribers are executed.
func (d *DefaultRefreshable[T]) Update(val T) {
d.Lock()
defer d.Unlock()

if valType := reflect.TypeOf(val); valType != d.typ {
return fmt.Errorf("new refreshable value must be type %s: got %s", d.typ, valType)
if reflect.DeepEqual(d.Current(), val) {
return
}

if reflect.DeepEqual(d.current.Load(), val) {
return nil
}
d.current.Store(val)
d.current.Store(&val)

for _, sub := range d.subscribers {
(*sub)(val)
}
return nil
}

func (d *DefaultRefreshable) Current() interface{} {
return d.current.Load()
func (d *DefaultRefreshable[T]) Current() T {
return *(d.current.Load().(*T))
}

func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe func()) {
func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) (unsubscribe func()) {
d.Lock()
defer d.Unlock()

Expand All @@ -63,7 +56,7 @@ func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe
}
}

func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) {
func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) {
d.Lock()
defer d.Unlock()

Expand All @@ -78,11 +71,3 @@ func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) {
d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...)
}
}

func (d *DefaultRefreshable) Map(mapFn func(interface{}) interface{}) Refreshable {
newRefreshable := NewDefaultRefreshable(mapFn(d.Current()))
d.Subscribe(func(updatedVal interface{}) {
_ = newRefreshable.Update(mapFn(updatedVal))
})
return newRefreshable
}
32 changes: 13 additions & 19 deletions refreshable/refreshable_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,51 @@ package refreshable_test
import (
"testing"

"github.com/palantir/pkg/refreshable"
"github.com/palantir/pkg/refreshable/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaultRefreshable(t *testing.T) {
type container struct{ Value string }

v := &container{Value: "original"}
r := refreshable.NewDefaultRefreshable(v)
r := refreshable.New(v)
assert.Equal(t, r.Current(), v)

t.Run("Update", func(t *testing.T) {
v2 := &container{Value: "updated"}
err := r.Update(v2)
require.NoError(t, err)
r.Update(v2)
assert.Equal(t, r.Current(), v2)
})

t.Run("Subscribe", func(t *testing.T) {
var v1, v2 container
unsub1 := r.Subscribe(func(i interface{}) {
v1 = *(i.(*container))
unsub1 := r.Subscribe(func(i *container) {
v1 = *i
})
_ = r.Subscribe(func(i interface{}) {
v2 = *(i.(*container))
_ = r.Subscribe(func(i *container) {
v2 = *i
})
assert.Equal(t, v1.Value, "")
assert.Equal(t, v2.Value, "")
err := r.Update(&container{Value: "value"})
require.NoError(t, err)
r.Update(&container{Value: "value"})
assert.Equal(t, v1.Value, "value")
assert.Equal(t, v2.Value, "value")

unsub1()
err = r.Update(&container{Value: "value2"})
require.NoError(t, err)
r.Update(&container{Value: "value2"})
assert.Equal(t, v1.Value, "value", "should be unchanged after unsubscribing")
assert.Equal(t, v2.Value, "value2", "should be updated after unsubscribing other")
})

t.Run("Map", func(t *testing.T) {
err := r.Update(&container{Value: "value"})
require.NoError(t, err)
m := r.Map(func(i interface{}) interface{} {
return len(i.(*container).Value)
r.Update(&container{Value: "value"})
m, _ := refreshable.Map[*container, int](r, func(i *container) int {
return len(i.Value)
})
assert.Equal(t, m.Current(), 5)

err = r.Update(&container{Value: "updated"})
require.NoError(t, err)
r.Update(&container{Value: "updated"})
assert.Equal(t, m.Current(), 7)
})

Expand Down
Loading

0 comments on commit 3cfdc4c

Please sign in to comment.