Skip to content

Commit

Permalink
Merge pull request ethereum#276 from holisticode/swarm-network-rewrit…
Browse files Browse the repository at this point in the history
…e-snapshot-test

Swarm network rewrite snapshot test
  • Loading branch information
holisticode authored Apr 4, 2018
2 parents 74f6037 + 6d556a1 commit b34ec33
Show file tree
Hide file tree
Showing 16 changed files with 734 additions and 25 deletions.
35 changes: 32 additions & 3 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,20 @@ import (
)

var (
adapter = flag.String("adapter", "sim", "type of simulation: sim|socket|exec|docker")
loglevel = flag.Int("loglevel", 4, "verbosity of logs")
deliveries map[discover.NodeID]*Delivery
stores map[discover.NodeID]storage.ChunkStore
toAddr func(discover.NodeID) *network.BzzAddr
peerCount func(discover.NodeID) int
adapter = flag.String("adapter", "sim", "type of simulation: sim|socket|exec|docker")
loglevel = flag.Int("loglevel", 2, "verbosity of logs")
)

var (
defaultSkipCheck bool
waitPeerErrC chan error
chunkSize = 4096
registries map[discover.NodeID]*TestRegistry
createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error)
)

var services = adapters.Services{
Expand All @@ -71,9 +77,14 @@ func init() {

// NewStreamerService
func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
var err error
id := ctx.Config.ID
addr := toAddr(id)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
stores[id], err = createStoreFunc(id, addr)
if err != nil {
return nil, err
}
store := stores[id].(*storage.LocalStore)
db := storage.NewDBAPI(store)
delivery := NewDelivery(kad, db)
Expand All @@ -87,7 +98,25 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
}()
dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewDPAParams())
return &TestRegistry{Registry: r, dpa: dpa}, nil
testRegistry := &TestRegistry{Registry: r, dpa: dpa}
registries[id] = testRegistry
return testRegistry, nil
}

func datadirsCleanup() {
for _, id := range ids {
os.RemoveAll(datadirs[id])
}
}

//local stores need to be cleaned up after the sim is done
func localStoreCleanup() {
log.Info("Cleaning up...")
for _, id := range ids {
registries[id].Close()
stores[id].Close()
}
log.Info("Local store cleanup done")
}

func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
Expand Down
3 changes: 2 additions & 1 deletion swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type RetrieveRequestMsg struct {
}

func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error {
log.Debug("received request", "peer", sp.ID(), "hash", req.Key)
log.Trace("received request", "peer", sp.ID(), "hash", req.Key)
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false))
if err != nil {
return err
Expand Down Expand Up @@ -157,6 +157,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e
if req.SkipCheck {
err := sp.Deliver(chunk, s.priority)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
sp.Drop(err)
}
}
Expand Down
15 changes: 7 additions & 8 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)

var (
deliveries map[discover.NodeID]*Delivery
stores map[discover.NodeID]storage.ChunkStore
toAddr func(discover.NodeID) *network.BzzAddr
peerCount func(discover.NodeID) int
)

func TestStreamerRetrieveRequest(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
defer teardown()
Expand Down Expand Up @@ -314,6 +307,7 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
defaultSkipCheck = skipCheck
toAddr = network.NewAddrFromNodeID
createStoreFunc = createTestLocalStorageFromSim
conf := &streamTesting.RunConfig{
Adapter: *adapter,
NodeCount: nodes,
Expand All @@ -329,10 +323,11 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
t.Fatal(err.Error())
}
stores = make(map[discover.NodeID]storage.ChunkStore)
deliveries = make(map[discover.NodeID]*Delivery)
for i, id := range sim.IDs {
stores[id] = sim.Stores[i]
}
registries = make(map[discover.NodeID]*TestRegistry)
deliveries = make(map[discover.NodeID]*Delivery)
peerCount = func(id discover.NodeID) int {
if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
return 1
Expand Down Expand Up @@ -674,3 +669,7 @@ Loop:
b.Fatalf("expected no error. got %v", err)
}
}

func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
return stores[id], nil
}
10 changes: 8 additions & 2 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {
}
}()

log.Debug("received subscription", "peer", p.ID(), "stream", req.Stream, "history", req.History)
log.Debug("%s received subscription", "from", p.streamer.addr.ID(), "peer", p.ID(), "stream", req.Stream, "history", req.History)

f, err := p.streamer.GetServerFunc(req.Stream.Name)
if err != nil {
Expand All @@ -110,6 +110,7 @@ func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {

go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
log.Warn("ERROR in SendOfferedHashes, DROPPING peer!", "err", err)
p.Drop(err)
}
}()
Expand All @@ -127,6 +128,7 @@ func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
log.Warn("ERROR in SendOfferedHashes, DROPPING peer!", "err", err)
p.Drop(err)
}
}()
Expand Down Expand Up @@ -235,18 +237,21 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
}
go func() {
select {
case <-time.After(30 * time.Second):
case <-time.After(120 * time.Second):
log.Warn("ERROR in handleOfferedHashesMsg, DROPPING peer!", "err", "TIMEOUT")
p.Drop(err)
return
case err := <-c.next:
if err != nil {
log.Warn("ERROR in handleOfferedHashesMsg, DROPPING peer!", "err", err)
p.Drop(err)
return
}
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(msg, c.priority)
if err != nil {
log.Warn("ERROR in handleOfferedHashesMsg, DROPPING peer!", "err", err)
p.Drop(err)
}
}()
Expand Down Expand Up @@ -279,6 +284,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
log.Warn("ERROR in handleWantedHashesMsg, DROPPING peer!", "err", err)
p.Drop(err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)

var sendTimeout = 5 * time.Second
var sendTimeout = 30 * time.Second

type notFoundError struct {
t string
Expand Down
Loading

0 comments on commit b34ec33

Please sign in to comment.