Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: removed neighborhood check for replication #2526

Merged
merged 4 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions pkg/pushsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ type metrics struct {
PushToPeerTime prometheus.HistogramVec
TotalReplicationFromDistantPeer prometheus.Counter
TotalReplicationFromClosestPeer prometheus.Counter
TotalReplication prometheus.Counter
TotalOutsideReplication prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -157,18 +155,6 @@ func newMetrics() metrics {
Name: "total_closest_replications",
Help: "Total no of replication requests received from closest peer to chunk",
}),
TotalReplication: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_replications",
Help: "Total no of replication requests received",
}),
TotalOutsideReplication: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_out_of_neighborhood_replications",
Help: "Total no of replication requests received that do not fall in neighborhood",
}),
}
}

Expand Down
103 changes: 47 additions & 56 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (

defaultTTL = 20 * time.Second // request time to live
sanctionWait = 5 * time.Minute
timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
timeToWaitForPushsyncToNeighbor = 5 * time.Second // time to wait to get a receipt for a chunk
nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
)

Expand Down Expand Up @@ -170,64 +170,62 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early.
if p.FullNode {
if closer, _ := p.Address.Closer(chunkAddress, ps.address); closer {
if ps.topologyDriver.IsWithinDepth(chunkAddress) {

ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
istae marked this conversation as resolved.
Show resolved Hide resolved
defer canceld()
ps.metrics.HandlerReplication.Inc()

span, _, ctxd := ps.tracer.StartSpanFromContext(ctxd, "pushsync-replication-storage", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish()
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld()

ps.metrics.HandlerReplication.Inc()
realClosestPeer, err := ps.topologyDriver.ClosestPeer(chunk.Address(), false, swarm.ZeroAddress)
if err == nil {
if !realClosestPeer.Equal(p.Address) {
ps.metrics.TotalReplicationFromDistantPeer.Inc()
} else {
ps.metrics.TotalReplicationFromClosestPeer.Inc()
}
}
span, _, ctxd := ps.tracer.StartSpanFromContext(ctxd, "pushsync-replication-storage", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish()

chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
ps.metrics.InvalidStampErrors.Inc()
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("pushsync replication valid stamp: %w", err)
realClosestPeer, err := ps.topologyDriver.ClosestPeer(chunk.Address(), false)
if err == nil {
if !realClosestPeer.Equal(p.Address) {
ps.metrics.TotalReplicationFromDistantPeer.Inc()
} else {
ps.metrics.TotalReplicationFromClosestPeer.Inc()
}
}

_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("chunk store: %w", err)
}
chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
ps.metrics.InvalidStampErrors.Inc()
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("pushsync replication valid stamp: %w", err)
}

debit, err := ps.accounting.PrepareDebit(p.Address, price)
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()
_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("chunk store: %w", err)
}

// return back receipt
signature, err := ps.signer.Sign(chunkAddress.Bytes())
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("receipt signature: %w", err)
}
receipt := pb.Receipt{Address: chunkAddress.Bytes(), Signature: signature, BlockHash: ps.blockHash}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
debit, err := ps.accounting.PrepareDebit(p.Address, price)
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()

err = debit.Apply()
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
}
return err
// return back receipt
signature, err := ps.signer.Sign(chunkAddress.Bytes())
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("receipt signature: %w", err)
}
ps.metrics.TotalOutsideReplication.Inc()
return ErrOutOfDepthReplication

receipt := pb.Receipt{Address: chunkAddress.Bytes(), Signature: signature, BlockHash: ps.blockHash}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}

err = debit.Apply()
if err != nil {
ps.metrics.HandlerReplicationErrors.Inc()
}
return err
}
}

Expand Down Expand Up @@ -420,7 +418,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return nil, err
}

ps.skipList.PruneChunk(ch.Address())
return r, nil
}

Expand Down Expand Up @@ -618,12 +615,6 @@ func (l *peerSkipList) ChunkSkipPeers(ch swarm.Address) (peers []swarm.Address)
return peers
}

func (l *peerSkipList) PruneChunk(chunk swarm.Address) {
l.Lock()
defer l.Unlock()
delete(l.skip, chunk.ByteString())
}

func (l *peerSkipList) PruneExpired() {
l.Lock()
defer l.Unlock()
Expand Down
96 changes: 0 additions & 96 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,102 +211,6 @@ func TestReplicateBeforeReceipt(t *testing.T) {
}
}

func TestFailToReplicateBeforeReceipt(t *testing.T) {
acud marked this conversation as resolved.
Show resolved Hide resolved

// chunk data to upload
chunk := testingc.FixtureChunk("7000") // base 0111

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110
secondPeer := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000") // binary 0100
emptyPeer := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000") // binary 0101, this peer should not get the chunk

// node that is connected to secondPeer
// it's address is closer to the chunk than secondPeer but it will not receive the chunk
_, storerEmpty, _, _ := createPushSyncNode(t, emptyPeer, defaultPrices, nil, nil, defaultSigner)
defer storerEmpty.Close()

wFunc := func(addr swarm.Address) bool {
return false
}

// node that is connected to closestPeer
// will receieve chunk from closestPeer
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, nil, nil, defaultSigner, mock.WithPeers(emptyPeer), mock.WithIsWithinFunc(wFunc))
defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))

psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))

// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivot.Close()

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}

if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}

// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data())

// this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)

// this intercepts the outgoing delivery message from storer node to second storer node
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data())

// this intercepts the incoming receipt message
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), nil)

_, err = storerEmpty.Get(context.Background(), storage.ModeGetSync, chunk.Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Fatal(err)
}

balance, err := pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on storer node. want %d got %d", int64(fixedPrice), balance)
}

balance, err = storerAccounting.Balance(pivotNode)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on storer node. want %d got %d", int64(fixedPrice), balance)
}

balance, err = secondAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}

if balance.Int64() != int64(0) {
t.Fatalf("unexpected balance on second storer. want %d got %d", int64(0), balance)
}

balance, err = storerAccounting.Balance(secondPeer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(0) {
t.Fatalf("unexpected balance on storer node. want %d got %d", -int64(0), balance)
}
}

// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
// it also checks wether the tags are incremented properly if they are present
func TestPushChunkToClosest(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,17 +1092,15 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swa

if closest.IsZero() {
closest = peer
return false, false, nil
}

closer, err := peer.Closer(addr, closest)
if err != nil {
return false, false, err
}
if closer {
if closer, _ := peer.Closer(addr, closest); closer {
closest = peer
}
return false, false, nil
})

if err != nil {
return swarm.Address{}, err
}
Expand Down