Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only in neighborhood replication #2237

Merged
merged 3 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return nil, ErrWarmup
}

if !ps.topologyDriver.IsWithinDepth(ch.Address()) {
return nil, ErrNoPush
}

count := 0
// Push the chunk to some peers in the neighborhood in parallel for replication.
// Any errors here should NOT impact the rest of the handler.
Expand Down
198 changes: 98 additions & 100 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ var (
defaultSigner = cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) {
return nil, nil
}))
WithinDepthMock = mock.WithIsWithinFunc(func(addr swarm.Address) bool {
return true
})
)

// TestPushClosest inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
Expand All @@ -66,7 +69,7 @@ func TestPushClosest(t *testing.T) {
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to

psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close()

recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))
Expand Down Expand Up @@ -129,23 +132,19 @@ func TestReplicateBeforeReceipt(t *testing.T) {
defer storerEmpty.Close()
emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer))

wFunc := func(addr swarm.Address) bool {
return true
}

// node that is connected to closestPeer
// will receieve chunk from closestPeer
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), mock.WithIsWithinFunc(wFunc))
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), WithinDepthMock)
defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))

psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))

// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer))
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivot.Close()

// Trigger the sending of chunk to the closest node
Expand Down Expand Up @@ -238,13 +237,13 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) {
defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))

psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))

// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer))
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivot.Close()

// Trigger the sending of chunk to the closest node
Expand Down Expand Up @@ -324,7 +323,7 @@ func TestPushChunkToClosest(t *testing.T) {
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to

psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close()

recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))
Expand Down Expand Up @@ -415,7 +414,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()

psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer2.Close()

var fail = true
Expand Down Expand Up @@ -546,7 +545,7 @@ func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) {
psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close()

psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer4.Close()

recorder := streamtest.New(
Expand Down Expand Up @@ -665,19 +664,19 @@ func TestHandler(t *testing.T) {
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")

// Create the closest peer
psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer closestStorerPeerDB.Close()

closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))

// creating the pivot peer
psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer))
psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivotDB.Close()

pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))

// Creating the trigger peer
psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithClosestPeer(pivotPeer))
psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithPeers(pivotPeer))
defer triggerStorerDB.Close()

receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
Expand Down Expand Up @@ -753,13 +752,13 @@ func TestSignsReceipt(t *testing.T) {
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")

// Create the closest peer
psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf))
psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer closestStorerPeerDB.Close()

closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))

// creating the pivot peer who will act as a forwarder node with a higher price (17)
psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithClosestPeer(closestPeer))
psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithPeers(closestPeer))
defer storerPivotDB.Close()

receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
Expand All @@ -783,87 +782,6 @@ func TestSignsReceipt(t *testing.T) {
t.Fatal("receipt block hash do not match")
}
}

func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}

func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer()

mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger)

mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)

recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
if unwrap == nil {
unwrap = func(swarm.Chunk) {}
}

validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}

return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag
}

func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
t.Helper()
records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)

if data != nil {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)

if !bytes.Equal(delivery.Address, add.Bytes()) {
t.Fatalf("chunk address mismatch")
}

if !bytes.Equal(delivery.Data, data) {
t.Fatalf("chunk data mismatch")
}
} else {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
receipt := messages[0].(*pb.Receipt)
receiptAddress := swarm.NewAddress(receipt.Address)

if !receiptAddress.Equal(add) {
t.Fatalf("receipt address mismatch")
}
}
}

func TestPeerSkipList(t *testing.T) {

skipList := pushsync.NewPeerSkipList()
Expand Down Expand Up @@ -915,7 +833,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
psPeer2, storerPeer2, _, _ := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()

psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer3.Close()

var (
Expand Down Expand Up @@ -984,6 +902,86 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
}
}

func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}

func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer()

mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger)

mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)

recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
if unwrap == nil {
unwrap = func(swarm.Chunk) {}
}

validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}

return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag
}

func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
t.Helper()
records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)

if data != nil {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)

if !bytes.Equal(delivery.Address, add.Bytes()) {
t.Fatalf("chunk address mismatch")
}

if !bytes.Equal(delivery.Data, data) {
t.Fatalf("chunk data mismatch")
}
} else {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
receipt := messages[0].(*pb.Receipt)
receiptAddress := swarm.NewAddress(receipt.Address)

if !receiptAddress.Equal(add) {
t.Fatalf("receipt address mismatch")
}
}
}

func chanFunc(c chan<- struct{}) func(swarm.Chunk) {
return func(_ swarm.Chunk) {
c <- struct{}{}
Expand Down