Skip to content

Commit

Permalink
revert Peers change
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jan 18, 2023
1 parent f24c74f commit 986a21f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
13 changes: 2 additions & 11 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewDiscovery(
}
}

// WithOnPeersUpdate adds OnPeersUpdate callback call on every update of discovered peers list.
func (d *Discovery) WithOnPeersUpdate(f OnPeersUpdate) {
d.onPeersUpdate = func(info updateInfo) {
d.onPeersUpdate(info)
Expand Down Expand Up @@ -200,15 +201,5 @@ func (d *Discovery) Advertise(ctx context.Context) {
// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
peers := d.set.ListPeers()
if len(peers) > 0 {
return peers, nil
}

// block until a new peer will be discovered
p, err := d.set.WaitPeer(ctx)
if err != nil {
return nil, err
}
return []peer.ID{p}, nil
return d.set.Peers(ctx)
}
33 changes: 17 additions & 16 deletions share/availability/discovery/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (ps *limitedSet) TryAdd(p peer.ID) error {
LOOP:
for {
// peer will be pushed to the channel only when somebody is reading from it.
// this is done to handle case when WaitPeers() was called on empty set.
// this is done to handle case when Peers() was called on empty set.
select {
case ps.waitPeer <- p:
default:
Expand All @@ -74,23 +74,24 @@ func (ps *limitedSet) Remove(id peer.ID) {
}
}

// WaitPeer blocks until first peer discovered.
func (ps *limitedSet) WaitPeer(ctx context.Context) (peer.ID, error) {
// Peers returns all discovered peers from the set.
func (ps *limitedSet) Peers(ctx context.Context) ([]peer.ID, error) {
ps.lk.Lock()
if len(ps.ps) > 0 {
out := make([]peer.ID, 0, len(ps.ps))
for p := range ps.ps {
out = append(out, p)
}
ps.lk.Unlock()
return out, nil
}
ps.lk.Unlock()

// block until a new peer will be discovered
select {
case <-ctx.Done():
return "", ctx.Err()
return nil, ctx.Err()
case p := <-ps.waitPeer:
return p, nil
}
}

// ListPeers returns all discovered peers from the set.
func (ps *limitedSet) ListPeers() []peer.ID {
ps.lk.Lock()
defer ps.lk.Unlock()
out := make([]peer.ID, 0, len(ps.ps))
for p := range ps.ps {
out = append(out, p)
return []peer.ID{p}, nil
}
return out
}
12 changes: 8 additions & 4 deletions share/availability/discovery/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func TestSet_Peers(t *testing.T) {
require.NoError(t, set.TryAdd(h1.ID()))
require.NoError(t, set.TryAdd(h2.ID()))

peers := set.ListPeers()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
t.Cleanup(cancel)

peers, err := set.Peers(ctx)
require.NoError(t, err)
require.True(t, len(peers) == 2)
}

// TestSet_WaitPeers ensures that `WaitPeers` will be unblocked once
// TestSet_WaitPeers ensures that `Peers` will be unblocked once
// a new peer was discovered.
func TestSet_WaitPeers(t *testing.T) {
m := mocknet.New()
Expand All @@ -73,8 +77,8 @@ func TestSet_WaitPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)

// call `WaitPeer` on empty set will block until a new peer will be discovered
peers, err := set.WaitPeer(ctx)
// call `Peers` on empty set will block until a new peer will be discovered
peers, err := set.Peers(ctx)
require.NoError(t, err)
require.True(t, len(peers) == 1)
}
Expand Down

0 comments on commit 986a21f

Please sign in to comment.