Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ethersphere/swarm
Browse files Browse the repository at this point in the history
* 'master' of github.com:ethersphere/swarm:
  network: terminate Hive connect goroutine on Stop (ethersphere#1740)
  Incentives rpc test (ethersphere#1733)
  swarm, swap: pass chequebook address at start-up (ethersphere#1718)
  • Loading branch information
chadsr committed Sep 10, 2019
2 parents 7d157a7 + 43f2b87 commit 0f9aceb
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 40 deletions.
41 changes: 25 additions & 16 deletions network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Hive struct {
lock sync.Mutex
peers map[enode.ID]*BzzPeer
ticker *time.Ticker
done chan struct{}
}

// NewHive constructs a new hive
Expand Down Expand Up @@ -97,6 +98,8 @@ func (h *Hive) Start(server *p2p.Server) error {
h.addPeer = server.AddPeer
// ticker to keep the hive alive
h.ticker = time.NewTicker(h.KeepAliveInterval)
// done channel to signal the connect goroutine to return after Stop
h.done = make(chan struct{})
// this loop is doing bootstrapping and maintains a healthy table
if !h.DisableAutoConnect {
go h.connect()
Expand All @@ -108,6 +111,7 @@ func (h *Hive) Start(server *p2p.Server) error {
func (h *Hive) Stop() error {
log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4]))
h.ticker.Stop()
close(h.done)
if h.Store != nil {
if err := h.savePeers(); err != nil {
return fmt.Errorf("could not save peers to persistence store: %v", err)
Expand All @@ -131,24 +135,29 @@ func (h *Hive) Stop() error {
// at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
// as well as advertises saturation depth if needed
func (h *Hive) connect() {
for range h.ticker.C {
loop:
for {
select {
case <-h.ticker.C:
addr, depth, changed := h.SuggestPeer()
if h.Discovery && changed {
NotifyDepth(uint8(depth), h.Kademlia)
}
if addr == nil {
continue loop
}

addr, depth, changed := h.SuggestPeer()
if h.Discovery && changed {
NotifyDepth(uint8(depth), h.Kademlia)
log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
under, err := enode.ParseV4(string(addr.Under()))
if err != nil {
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
continue loop
}
log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
h.addPeer(under)
case <-h.done:
break loop
}
if addr == nil {
continue
}

log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
under, err := enode.ParseV4(string(addr.Under()))
if err != nil {
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
continue
}
log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
h.addPeer(under)
}
}

Expand Down
2 changes: 1 addition & 1 deletion swap/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *Swap) verifyHandshake(msg interface{}) error {
return ErrEmptyAddressInSignature
}

return s.verifyContract(context.Background(), handshake.ContractAddress)
return contract.ValidateCode(context.Background(), s.backend, handshake.ContractAddress)
}

// run is the actual swap protocol run method
Expand Down
172 changes: 172 additions & 0 deletions swap/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ package swap

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
contract "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/p2p/protocols"
p2ptest "github.com/ethersphere/swarm/p2p/testing"
)
Expand Down Expand Up @@ -296,3 +305,166 @@ func TestTriggerDisconnectThreshold(t *testing.T) {
t.Fatalf("Expected still no cheque, but there are %d", lenCheques)
}
}

// TestSwapRPC tests some basic things over RPC
// We want this so that we can check the API works
func TestSwapRPC(t *testing.T) {

if runtime.GOOS == "windows" {
t.Skip()
}

var (
ipcPath = ".swap.ipc"
err error
)

swap, clean := newTestSwap(t, ownerKey)
defer clean()

// need to have a dummy contract or the call will fail at `GetParams` due to `NewAPI`
swap.contract, err = contract.InstanceAt(common.Address{}, swap.backend)
if err != nil {
t.Fatal(err)
}

// start a service stack
stack := createAndStartSvcNode(swap, ipcPath, t)
defer func() {
go stack.Stop()
}()

// use unique IPC path on windows
ipcPath = filepath.Join(stack.DataDir(), ipcPath)

// connect to the servicenode RPCs
rpcclient, err := rpc.Dial(ipcPath)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(stack.DataDir())

// create dummy peers so that we can artificially set balances and query
dummyPeer1 := newDummyPeer()
dummyPeer2 := newDummyPeer()
id1 := dummyPeer1.ID()
id2 := dummyPeer2.ID()

// set some fake balances
fakeBalance1 := int64(234)
fakeBalance2 := int64(-100)

// query a first time, should give error
var balance int64
err = rpcclient.Call(&balance, "swap_balance", id1)
// at this point no balance should be there: no peer at address in map...
if err == nil {
t.Fatal("Expected error but no error received")
}
log.Debug("servicenode balance", "balance", balance)

// ...thus balance should be zero
if balance != 0 {
t.Fatalf("Expected balance to be 0 but it is %d", balance)
}

// now artificially assign some balances
swap.balances[id1] = fakeBalance1
swap.balances[id2] = fakeBalance2

// query them, values should coincide
err = rpcclient.Call(&balance, "swap_balance", id1)
if err != nil {
t.Fatal(err)
}
log.Debug("balance1", "balance1", balance)
if balance != fakeBalance1 {
t.Fatalf("Expected balance %d to be equal to fake balance %d, but it is not", balance, fakeBalance1)
}

err = rpcclient.Call(&balance, "swap_balance", id2)
if err != nil {
t.Fatal(err)
}
log.Debug("balance2", "balance2", balance)
if balance != fakeBalance2 {
t.Fatalf("Expected balance %d to be equal to fake balance %d, but it is not", balance, fakeBalance2)
}

// now call all balances
allBalances := make(map[enode.ID]int64)
err = rpcclient.Call(&allBalances, "swap_balances")
if err != nil {
t.Fatal(err)
}
log.Debug("received balances", "allBalances", allBalances)

var sum int64
for _, v := range allBalances {
sum += v
}

fakeSum := fakeBalance1 + fakeBalance2
if sum != fakeSum {
t.Fatalf("Expected total balance to be %d, but it %d", fakeSum, sum)
}

if !reflect.DeepEqual(allBalances, swap.balances) {
t.Fatal("Balances are not deep equal")
}
}

// createAndStartSvcNode setup a p2p service and start it
func createAndStartSvcNode(swap *Swap, ipcPath string, t *testing.T) *node.Node {
stack, err := newServiceNode(ipcPath, 0, 0)
if err != nil {
t.Fatal("Create servicenode #1 fail", "err", err)
}

swapsvc := func(ctx *node.ServiceContext) (node.Service, error) {
return swap, nil
}

err = stack.Register(swapsvc)
if err != nil {
t.Fatal("Register service in servicenode #1 fail", "err", err)
}

// start the nodes
err = stack.Start()
if err != nil {
t.Fatal("servicenode #1 start failed", "err", err)
}

return stack
}

// newServiceNode creates a p2p.Service node stub
func newServiceNode(ipcPath string, httpport int, wsport int, modules ...string) (*node.Node, error) {
var err error
cfg := &node.DefaultConfig
cfg.P2P.EnableMsgEvents = true
cfg.P2P.NoDiscovery = true
cfg.IPCPath = ipcPath
cfg.DataDir, err = ioutil.TempDir("", "test-Service-node")
if err != nil {
return nil, err
}
if httpport > 0 {
cfg.HTTPHost = node.DefaultHTTPHost
cfg.HTTPPort = httpport
}
if wsport > 0 {
cfg.WSHost = node.DefaultWSHost
cfg.WSPort = wsport
cfg.WSOrigins = []string{"*"}
for i := 0; i < len(modules); i++ {
cfg.WSModules = append(cfg.WSModules, modules[i])
}
}
stack, err := node.New(cfg)
if err != nil {
return nil, fmt.Errorf("ServiceNode create fail: %v", err)
}
return stack, nil
}
51 changes: 40 additions & 11 deletions swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func NewParams() *Params {
}

// New - swap constructor
func New(stateStore state.Store, prvkey *ecdsa.PrivateKey, contract common.Address, backend contract.Backend) *Swap {
func New(stateStore state.Store, prvkey *ecdsa.PrivateKey, backend contract.Backend) *Swap {
return &Swap{
store: stateStore,
balances: make(map[enode.ID]int64),
cheques: make(map[enode.ID]*Cheque),
peers: make(map[enode.ID]*Peer),
backend: backend,
owner: createOwner(prvkey, contract),
owner: createOwner(prvkey),
params: NewParams(),
paymentThreshold: DefaultPaymentThreshold,
disconnectThreshold: DefaultDisconnectThreshold,
Expand Down Expand Up @@ -127,13 +127,12 @@ func keyToID(key string, prefix string) enode.ID {
}

// createOwner assings keys and addresses
func createOwner(prvkey *ecdsa.PrivateKey, contract common.Address) *Owner {
func createOwner(prvkey *ecdsa.PrivateKey) *Owner {
pubkey := &prvkey.PublicKey
return &Owner{
address: crypto.PubkeyToAddress(*pubkey),
privateKey: prvkey,
publicKey: pubkey,
Contract: contract,
address: crypto.PubkeyToAddress(*pubkey),
}
}

Expand Down Expand Up @@ -522,9 +521,9 @@ func (s *Swap) GetParams() *swap.Params {
return s.contract.ContractParams()
}

// verifyContract checks if the bytecode found at address matches the expected bytecode
func (s *Swap) verifyContract(ctx context.Context, address common.Address) error {
return contract.ValidateCode(ctx, s.backend, address)
// setChequebookAddr sets the chequebook address
func (s *Swap) setChequebookAddr(chequebookAddr common.Address) {
s.owner.Contract = chequebookAddr
}

// getContractOwner retrieve the owner of the chequebook at address from the blockchain
Expand All @@ -537,8 +536,38 @@ func (s *Swap) getContractOwner(ctx context.Context, address common.Address) (co
return contr.Issuer(nil)
}

// Deploy deploys the Swap contract
func (s *Swap) Deploy(ctx context.Context, backend swap.Backend, path string) error {
// StartChequebook deploys a new instance of a chequebook if chequebookAddr is empty, otherwise it wil bind to an existing instance
func (s *Swap) StartChequebook(chequebookAddr common.Address) error {
if chequebookAddr != (common.Address{}) {
if err := s.BindToContractAt(chequebookAddr); err != nil {
return err
}
log.Info("Using the provided chequebook", "chequebookAddr", chequebookAddr)
} else {
if err := s.Deploy(context.Background(), s.backend); err != nil {
return err
}
log.Info("New SWAP contract deployed", "contract info", s.DeploySuccess())
}
return nil
}

// BindToContractAt binds an instance of an already existing chequebook contract at address and sets chequebookAddr
func (s *Swap) BindToContractAt(address common.Address) (err error) {

if err := contract.ValidateCode(context.Background(), s.backend, address); err != nil {
return fmt.Errorf("contract validation for %v failed: %v", address, err)
}
s.contract, err = contract.InstanceAt(address, s.backend)
if err != nil {
return err
}
s.setChequebookAddr(address)
return nil
}

// Deploy deploys the Swap contract and sets the contract address
func (s *Swap) Deploy(ctx context.Context, backend swap.Backend) error {
opts := bind.NewKeyedTransactor(s.owner.privateKey)
// initial topup value
opts.Value = big.NewInt(int64(s.params.InitialDepositAmount))
Expand All @@ -550,7 +579,7 @@ func (s *Swap) Deploy(ctx context.Context, backend swap.Backend, path string) er
log.Error("unable to deploy swap", "error", err)
return err
}
s.owner.Contract = address
s.setChequebookAddr(address)
log.Info("swap deployed", "address", address.Hex(), "owner", opts.From.Hex())

return err
Expand Down
Loading

0 comments on commit 0f9aceb

Please sign in to comment.