Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm/network/stream: refactor bootstrapping of tests #1176

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
Expand Down Expand Up @@ -66,6 +69,80 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return addr, netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New

return addr, netStore, delivery, cleanup, nil
}

func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
n := ctx.Config.Node()

store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if *useMockStore {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janos I am pretty sure this doesn't work, but haven't tried it just yet. Not sure how we cast a mock store to *storage.LocalStore further down, but this is how it is in the current code as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have found that mock store in stream tests were broken after some rewrite. I am not sure if they were fixed. Maybe @holisticode have more details, but I will have a look also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is important that mock store is not casted to LocalStore, there were numerous discussion on this, but injected down to ldbstore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least now we have to fix it at one place, and not at multiple :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockStore is working correctly here, but naming is confusing. createMockStore does not create mock store, it creates new localstore with injected mockstore. It is similar to createTestLocalStorageForID, so maybe to rename this function to createTestLocalStoreWithMockStore, or newLocalStoreWithMock, and to move it from syncer_test.go to common_test.go. BTW, the description of this flag is completely wrong, it is false by default.

store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
}
if err != nil {
return nil, nil, nil, err
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)

bucket.Store(bucketKeyStore, store)
bucket.Store(bucketKeyDB, netStore)
bucket.Store(bucketKeyDelivery, delivery)
bucket.Store(bucketKeyFileStore, fileStore)

cleanup := func() {
netStore.Close()
os.RemoveAll(datadir)
}

return netStore, delivery, cleanup, nil
}

func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
Expand Down
51 changes: 11 additions & 40 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -457,39 +456,24 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down Expand Up @@ -644,38 +628,25 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
28 changes: 6 additions & 22 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)

sim := simulation.New(map[string]simulation.ServiceFunc{
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Expand All @@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})

fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup := func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
52 changes: 16 additions & 36 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) {
}

var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
"streamer": retrievalStreamerFunc,
}

func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)

localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we'd just add a RegistryOptions object to the newNetStoreAndDeliveryWithRequestFunc, which then also would contain this NewRegistry code? I know it starts to look ugly with so many params...but it could further simplify simulation creation

Copy link
Contributor Author

@nonsense nonsense Jan 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holisticode we override the registry in some of the tests, this is why I didn't do it - its not just the RegistryOptions that change.

Personally I think even the current constructors are a bit too complicated, but this is because of the high coupling between the interfaces...

The current solution is far from ideal, I just hope it bring slightly better defaults.

Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
return r, cleanup, nil
},
}

/*
Expand Down
51 changes: 18 additions & 33 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}

var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": streamerFunc,
}
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}

func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

return r, cleanup, nil
bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
},
}

func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
Expand Down
Loading