-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Yuri Shkuro
committed
Jun 4, 2017
1 parent
2896a2e
commit 7f9fba4
Showing
5 changed files
with
141 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package sd_test | ||
|
||
import ( | ||
"io" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-kit/kit/endpoint" | ||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/sd" | ||
"github.com/go-kit/kit/sd/internal/instance" | ||
) | ||
|
||
func TestDefaultEndpointer(t *testing.T) { | ||
var ( | ||
ca = make(closer) | ||
cb = make(closer) | ||
c = map[string]io.Closer{"a": ca, "b": cb} | ||
f = func(instance string) (endpoint.Endpoint, io.Closer, error) { | ||
return endpoint.Nop, c[instance], nil | ||
} | ||
instancer = &mockInstancer{ | ||
cache: instance.NewCache(), | ||
} | ||
) | ||
// set initial state | ||
instancer.Update(sd.Event{Instances: []string{"a", "b"}}) | ||
|
||
endpointer := sd.NewEndpointer(instancer, f, log.NewNopLogger(), sd.InvalidateOnError(time.Minute)) | ||
if endpoints, err := endpointer.Endpoints(); err != nil { | ||
t.Errorf("unepected error %v", err) | ||
} else if want, have := 2, len(endpoints); want != have { | ||
t.Errorf("want %d, have %d", want, have) | ||
} | ||
|
||
instancer.Update(sd.Event{Instances: []string{}}) | ||
select { | ||
case <-ca: | ||
t.Logf("endpoint a closed, good") | ||
case <-time.After(time.Millisecond): | ||
t.Errorf("didn't close the deleted instance in time") | ||
} | ||
select { | ||
case <-cb: | ||
t.Logf("endpoint b closed, good") | ||
case <-time.After(time.Millisecond): | ||
t.Errorf("didn't close the deleted instance in time") | ||
} | ||
if endpoints, err := endpointer.Endpoints(); err != nil { | ||
t.Errorf("unepected error %v", err) | ||
} else if want, have := 0, len(endpoints); want != have { | ||
t.Errorf("want %d, have %d", want, have) | ||
} | ||
|
||
endpointer.Close() | ||
instancer.Update(sd.Event{Instances: []string{"a"}}) | ||
// TODO verify that on Close the endpointer fully disconnects from the instancer. | ||
// Unfortunately, because we use instance.Cache, this test cannot be in the sd package, | ||
// and therefore does not have access to the endpointer's private members. | ||
} | ||
|
||
type mockInstancer struct { | ||
cache *instance.Cache | ||
} | ||
|
||
func (m *mockInstancer) Update(event sd.Event) { | ||
m.cache.Update(event) | ||
} | ||
|
||
func (m *mockInstancer) Register(ch chan<- sd.Event) { | ||
m.cache.Register(ch) | ||
} | ||
|
||
func (m *mockInstancer) Deregister(ch chan<- sd.Event) { | ||
m.cache.Deregister(ch) | ||
} | ||
|
||
type closer chan struct{} | ||
|
||
func (c closer) Close() error { close(c); return nil } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,79 @@ | ||
package instance | ||
|
||
import ( | ||
"sync" | ||
"reflect" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-kit/kit/sd" | ||
) | ||
|
||
var _ sd.Instancer = &Cache{} // API check | ||
|
||
// The test verifies the following: | ||
// registering causes initial notification of the current state | ||
// notifications delivered to two receivers | ||
// identical notifications cause no updates | ||
// different update causes new notification | ||
// instances are sorted | ||
// no updates after de-registering | ||
func TestCache(t *testing.T) { | ||
// TODO this test is not finished yet | ||
e1 := sd.Event{Instances: []string{"y", "x"}} // not sorted | ||
e2 := sd.Event{Instances: []string{"c", "a", "b"}} | ||
|
||
c := NewCache() | ||
|
||
{ | ||
state := c.State() | ||
if want, have := 0, len(state.Instances); want != have { | ||
t.Fatalf("want %v instances, have %v", want, have) | ||
} | ||
if want, have := 0, len(c.State().Instances); want != have { | ||
t.Fatalf("want %v instances, have %v", want, have) | ||
} | ||
|
||
notification1 := sd.Event{Instances: []string{"x", "y"}} | ||
notification2 := sd.Event{Instances: []string{"a", "b", "c"}} | ||
|
||
c.Update(notification1) | ||
|
||
// times 2 because we have two observers | ||
expectedInstances := 2 * (len(notification1.Instances) + len(notification2.Instances)) | ||
c.Update(e1) // sets initial state | ||
if want, have := 2, len(c.State().Instances); want != have { | ||
t.Fatalf("want %v instances, have %v", want, have) | ||
} | ||
|
||
wg := sync.WaitGroup{} | ||
wg.Add(expectedInstances) | ||
r1 := make(chan sd.Event) | ||
go c.Register(r1) | ||
expectUpdate(t, r1, []string{"x", "y"}) | ||
|
||
r2 := make(chan sd.Event) | ||
go c.Register(r2) | ||
expectUpdate(t, r2, []string{"x", "y"}) | ||
|
||
// send the same instances but in different order. | ||
// because it's a duplicate it should not cause new notification. | ||
// if it did, this call would deadlock trying to send to channels with no readers | ||
c.Update(sd.Event{Instances: []string{"x", "y"}}) | ||
expectNoUpdate(t, r1) | ||
expectNoUpdate(t, r2) | ||
|
||
go c.Update(e2) // different set | ||
expectUpdate(t, r1, []string{"a", "b", "c"}) | ||
expectUpdate(t, r2, []string{"a", "b", "c"}) | ||
|
||
c.Deregister(r1) | ||
c.Deregister(r2) | ||
close(r1) | ||
close(r2) | ||
// if deregister didn't work, Update would panic on closed channels | ||
c.Update(e1) | ||
} | ||
|
||
receiver := func(ch chan sd.Event) { | ||
for state := range ch { | ||
// count total number of instances received | ||
for range state.Instances { | ||
wg.Done() | ||
} | ||
func expectUpdate(t *testing.T, r chan sd.Event, expect []string) { | ||
select { | ||
case e := <-r: | ||
if want, have := expect, e.Instances; !reflect.DeepEqual(want, have) { | ||
t.Fatalf("want: %v, have: %v", want, have) | ||
} | ||
case <-time.After(time.Second): | ||
t.Fatalf("did not receive expected update") | ||
} | ||
} | ||
|
||
f1 := make(chan sd.Event) | ||
f2 := make(chan sd.Event) | ||
go receiver(f1) | ||
go receiver(f2) | ||
|
||
c.Register(f1) | ||
c.Register(f2) | ||
|
||
c.Update(notification1) | ||
c.Update(notification2) | ||
|
||
// if state := c.State(); instances == nil { | ||
// if want, have := len(notification2), len(instances); want != have { | ||
// t.Errorf("want length %v, have %v", want, have) | ||
// } else { | ||
// for i := range notification2 { | ||
// if want, have := notification2[i], instances[i]; want != have { | ||
// t.Errorf("want instance %v, have %v", want, have) | ||
// } | ||
// } | ||
// } | ||
// } | ||
|
||
close(f1) | ||
close(f2) | ||
|
||
wg.Wait() | ||
|
||
// d.Deregister(f1) | ||
|
||
// d.Unregister(f2) | ||
// if want, have := 0, len(d.observers); want != have { | ||
// t.Fatalf("want %v observers, have %v", want, have) | ||
// } | ||
func expectNoUpdate(t *testing.T, r chan sd.Event) { | ||
select { | ||
case e := <-r: | ||
t.Errorf("received unexpected update %v", e) | ||
case <-time.After(time.Millisecond): | ||
return // as expected | ||
} | ||
} |