diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 1ea04ee8e5d..2c67482189d 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -322,6 +322,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo return nil, ErrWarmup } + if !ps.topologyDriver.IsWithinDepth(ch.Address()) { + return nil, ErrNoPush + } + count := 0 // Push the chunk to some peers in the neighborhood in parallel for replication. // Any errors here should NOT impact the rest of the handler. diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 84083ed4125..45d6170e75b 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -51,6 +51,9 @@ var ( defaultSigner = cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) { return nil, nil })) + WithinDepthMock = mock.WithIsWithinFunc(func(addr swarm.Address) bool { + return true + }) ) // TestPushClosest inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node @@ -66,7 +69,7 @@ func TestPushClosest(t *testing.T) { // peer is the node responding to the chunk receipt message // mock should return ErrWantSelf since there's no one to forward to - psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer.Close() recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) @@ -129,23 +132,19 @@ func TestReplicateBeforeReceipt(t *testing.T) { defer storerEmpty.Close() emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer)) - wFunc := func(addr swarm.Address) bool { - return true - } - // node that is connected to closestPeer // will receieve chunk from closestPeer - psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), mock.WithIsWithinFunc(wFunc)) + psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), WithinDepthMock) defer storerSecond.Close() secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer)) - psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf)) + psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer.Close() recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode)) // pivot node needs the streamer since the chunk is intercepted by // the chunk worker, then gets sent by opening a new stream - psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) + psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer)) defer storerPivot.Close() // Trigger the sending of chunk to the closest node @@ -238,13 +237,13 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) { defer storerSecond.Close() secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer)) - psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf)) + psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer.Close() recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode)) // pivot node needs the streamer since the chunk is intercepted by // the chunk worker, then gets sent by opening a new stream - psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) + psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer)) defer storerPivot.Close() // Trigger the sending of chunk to the closest node @@ -324,7 +323,7 @@ func TestPushChunkToClosest(t *testing.T) { // peer is the node responding to the chunk receipt message // mock should return ErrWantSelf since there's no one to forward to - psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer.Close() recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) @@ -415,7 +414,7 @@ func TestPushChunkToNextClosest(t *testing.T) { psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) defer storerPeer1.Close() - psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer2.Close() var fail = true @@ -546,7 +545,7 @@ func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) { psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) defer storerPeer3.Close() - psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer4.Close() recorder := streamtest.New( @@ -665,19 +664,19 @@ func TestHandler(t *testing.T) { closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // Create the closest peer - psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer closestStorerPeerDB.Close() closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer)) // creating the pivot peer - psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) + psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithPeers(closestPeer)) defer storerPivotDB.Close() pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer)) // Creating the trigger peer - psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithClosestPeer(pivotPeer)) + psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithPeers(pivotPeer)) defer triggerStorerDB.Close() receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk) @@ -753,13 +752,13 @@ func TestSignsReceipt(t *testing.T) { closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // Create the closest peer - psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer closestStorerPeerDB.Close() closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer)) // creating the pivot peer who will act as a forwarder node with a higher price (17) - psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithClosestPeer(closestPeer)) + psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithPeers(closestPeer)) defer storerPivotDB.Close() receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) @@ -783,87 +782,6 @@ func TestSignsReceipt(t *testing.T) { t.Fatal("receipt block hash do not match") } } - -func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) { - t.Helper() - mockAccounting := accountingmock.NewAccounting() - ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...) - return ps, mstorer, ts, mockAccounting -} - -func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) { - t.Helper() - logger := logging.New(ioutil.Discard, 0) - storer := mocks.NewStorer() - - mockTopology := mock.NewTopologyDriver(mockOpts...) - mockStatestore := statestore.NewStateStore() - mtag := tags.NewTags(mockStatestore, logger) - - mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice) - - recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder) - if unwrap == nil { - unwrap = func(swarm.Chunk) {} - } - - validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) { - return ch, nil - } - - return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag -} - -func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { - t.Helper() - records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5) - - if data != nil { - messages, err := protobuf.ReadMessages( - bytes.NewReader(records[0].In()), - func() protobuf.Message { return new(pb.Delivery) }, - ) - if err != nil { - t.Fatal(err) - } - if messages == nil { - t.Fatal("nil rcvd. for message") - } - if len(messages) > 1 { - t.Fatal("too many messages") - } - delivery := messages[0].(*pb.Delivery) - - if !bytes.Equal(delivery.Address, add.Bytes()) { - t.Fatalf("chunk address mismatch") - } - - if !bytes.Equal(delivery.Data, data) { - t.Fatalf("chunk data mismatch") - } - } else { - messages, err := protobuf.ReadMessages( - bytes.NewReader(records[0].In()), - func() protobuf.Message { return new(pb.Receipt) }, - ) - if err != nil { - t.Fatal(err) - } - if messages == nil { - t.Fatal("nil rcvd. for message") - } - if len(messages) > 1 { - t.Fatal("too many messages") - } - receipt := messages[0].(*pb.Receipt) - receiptAddress := swarm.NewAddress(receipt.Address) - - if !receiptAddress.Equal(add) { - t.Fatalf("receipt address mismatch") - } - } -} - func TestPeerSkipList(t *testing.T) { skipList := pushsync.NewPeerSkipList() @@ -915,7 +833,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { psPeer2, storerPeer2, _, _ := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) defer storerPeer2.Close() - psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) defer storerPeer3.Close() var ( @@ -984,6 +902,86 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { } } +func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) { + t.Helper() + mockAccounting := accountingmock.NewAccounting() + ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...) + return ps, mstorer, ts, mockAccounting +} + +func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) { + t.Helper() + logger := logging.New(ioutil.Discard, 0) + storer := mocks.NewStorer() + + mockTopology := mock.NewTopologyDriver(mockOpts...) + mockStatestore := statestore.NewStateStore() + mtag := tags.NewTags(mockStatestore, logger) + + mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice) + + recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder) + if unwrap == nil { + unwrap = func(swarm.Chunk) {} + } + + validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) { + return ch, nil + } + + return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag +} + +func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { + t.Helper() + records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5) + + if data != nil { + messages, err := protobuf.ReadMessages( + bytes.NewReader(records[0].In()), + func() protobuf.Message { return new(pb.Delivery) }, + ) + if err != nil { + t.Fatal(err) + } + if messages == nil { + t.Fatal("nil rcvd. for message") + } + if len(messages) > 1 { + t.Fatal("too many messages") + } + delivery := messages[0].(*pb.Delivery) + + if !bytes.Equal(delivery.Address, add.Bytes()) { + t.Fatalf("chunk address mismatch") + } + + if !bytes.Equal(delivery.Data, data) { + t.Fatalf("chunk data mismatch") + } + } else { + messages, err := protobuf.ReadMessages( + bytes.NewReader(records[0].In()), + func() protobuf.Message { return new(pb.Receipt) }, + ) + if err != nil { + t.Fatal(err) + } + if messages == nil { + t.Fatal("nil rcvd. for message") + } + if len(messages) > 1 { + t.Fatal("too many messages") + } + receipt := messages[0].(*pb.Receipt) + receiptAddress := swarm.NewAddress(receipt.Address) + + if !receiptAddress.Equal(add) { + t.Fatalf("receipt address mismatch") + } + } +} + func chanFunc(c chan<- struct{}) func(swarm.Chunk) { return func(_ swarm.Chunk) { c <- struct{}{}