Skip to content

Commit

Permalink
refactor: closer func (#2527)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 21, 2021
1 parent e7a3ede commit 356759e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 35 deletions.
10 changes: 4 additions & 6 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

// if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early.
if p.FullNode {
bytes := chunkAddress.Bytes()
if dcmp, _ := swarm.DistanceCmp(bytes, p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
ps.metrics.TotalReplication.Inc()
if closer, _ := p.Address.Closer(chunkAddress, ps.address); closer {
if ps.topologyDriver.IsWithinDepth(chunkAddress) {

ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
Expand Down Expand Up @@ -211,12 +209,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
defer debit.Cleanup()

// return back receipt
signature, err := ps.signer.Sign(bytes)
signature, err := ps.signer.Sign(chunkAddress.Bytes())
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("receipt signature: %w", err)
}
receipt := pb.Receipt{Address: bytes, Signature: signature, BlockHash: ps.blockHash}
receipt := pb.Receipt{Address: chunkAddress.Bytes(), Signature: signature, BlockHash: ps.blockHash}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
Expand Down Expand Up @@ -368,7 +366,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo

// here we skip the peer if the peer is closer to the chunk than us
// we replicate with peers that are further away than us because we are the storer
if dcmp, _ := swarm.DistanceCmp(ch.Address().Bytes(), peer.Bytes(), ps.address.Bytes()); dcmp == 1 {
if closer, _ := peer.Closer(ch.Address(), ps.address); closer {
return false, false, nil
}

Expand Down
15 changes: 4 additions & 11 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,19 +362,12 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, all
closest = peer
return false, false, nil
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
closer, err := peer.Closer(addr, closest)
if err != nil {
return false, false, fmt.Errorf("distance compare error. addr %s closest %s peer %s: %w", addr.String(), closest.String(), peer.String(), err)
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
if closer {
closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
return false, false, nil
})
Expand All @@ -390,11 +383,11 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, all
return closest, nil
}

dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), s.addr.Bytes())
closer, err := closest.Closer(addr, s.addr)
if err != nil {
return swarm.Address{}, fmt.Errorf("distance compare addr %s closest %s base address %s: %w", addr.String(), closest.String(), s.addr.String(), err)
}
if dcmp != 1 {
if closer {
return swarm.Address{}, topology.ErrNotFound
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func (a Address) MarshalJSON() ([]byte, error) {
return json.Marshal(a.String())
}

// Closer returns if x is closer to a than y
func (x Address) Closer(a Address, y Address) (bool, error) {
cmp, err := DistanceCmp(a.b, x.b, y.b)
return cmp == 1, err
}

// ZeroAddress is the address that has no value.
var ZeroAddress = NewAddress(nil)

Expand Down
10 changes: 10 additions & 0 deletions pkg/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,13 @@ func TestAddress_MemberOf(t *testing.T) {
}

}

func TestCloser(t *testing.T) {
a := swarm.MustParseHexAddress("9100000000000000000000000000000000000000000000000000000000000000")
x := swarm.MustParseHexAddress("8200000000000000000000000000000000000000000000000000000000000000")
y := swarm.MustParseHexAddress("1200000000000000000000000000000000000000000000000000000000000000")

if cmp, _ := x.Closer(a, y); !cmp {
t.Fatal("x is closer")
}
}
23 changes: 5 additions & 18 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,19 +1058,13 @@ func closestPeerFunc(closest *swarm.Address, addr swarm.Address, spf sanctionedP
*closest = peer
return false, false, nil
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())

closer, err := peer.Closer(addr, *closest)
if err != nil {
return false, false, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
if closer {
*closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
return false, false, nil
}
Expand Down Expand Up @@ -1100,19 +1094,12 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swa
closest = peer
}

dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
closer, err := peer.Closer(addr, closest)
if err != nil {
return false, false, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
if closer {
closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
return false, false, nil
})
Expand Down

0 comments on commit 356759e

Please sign in to comment.