Skip to content

Commit

Permalink
Merge pull request #1266 from libp2p/remove-goprocess
Browse files Browse the repository at this point in the history
remove goprocess from the mock package
  • Loading branch information
marten-seemann authored Dec 18, 2021
2 parents bfee9f5 + e7ea19f commit e014b96
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 110 deletions.
1 change: 0 additions & 1 deletion examples/pubsub/chat/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log/v2 v2.4.0
github.com/jbenet/goprocess v0.1.4
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-addr-util v0.1.0
Expand Down
3 changes: 2 additions & 1 deletion p2p/net/mock/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

type Mocknet interface {

// GenPeer generates a peer and its network.Network in the Mocknet
GenPeer() (host.Host, error)

Expand Down Expand Up @@ -63,6 +62,8 @@ type Mocknet interface {
DisconnectNets(network.Network, network.Network) error
LinkAll() error
ConnectAllButSelf() error

io.Closer
}

// LinkOptions are used to change aspects of the links.
Expand Down
18 changes: 7 additions & 11 deletions p2p/net/mock/mock.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package mocknet

import (
"context"

logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("mocknet")

// WithNPeers constructs a Mocknet with N peers.
func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
m := New(ctx)
func WithNPeers(n int) (Mocknet, error) {
m := New()
for i := 0; i < n; i++ {
if _, err := m.GenPeer(); err != nil {
return nil, err
Expand All @@ -22,30 +20,28 @@ func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
// FullMeshLinked constructs a Mocknet with full mesh of Links.
// This means that all the peers **can** connect to each other
// (not that they already are connected. you can use m.ConnectAll())
func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) {
m, err := WithNPeers(ctx, n)
func FullMeshLinked(n int) (Mocknet, error) {
m, err := WithNPeers(n)
if err != nil {
return nil, err
}

if err := m.LinkAll(); err != nil {
return nil, err
}

return m, nil
}

// FullMeshConnected constructs a Mocknet with full mesh of Connections.
// This means that all the peers have dialed and are ready to talk to
// each other.
func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
m, err := FullMeshLinked(ctx, n)
func FullMeshConnected(n int) (Mocknet, error) {
m, err := FullMeshLinked(n)
if err != nil {
return nil, err
}

err = m.ConnectAllButSelf()
if err != nil {
if err := m.ConnectAllButSelf(); err != nil {
return nil, err
}
return m, nil
Expand Down
18 changes: 8 additions & 10 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"sync/atomic"

process "github.com/jbenet/goprocess"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -40,13 +39,13 @@ type conn struct {
streams list.List
stat network.ConnStats

pairProc, connProc process.Process
closeOnce sync.Once

sync.RWMutex
}

func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l, pairProc: p}
func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l}
c.local = ln.peer
c.remote = rn.peer
c.stat.Direction = dir
Expand All @@ -65,7 +64,6 @@ func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction)

c.localPrivKey = ln.ps.PrivKey(ln.peer)
c.remotePubKey = rn.ps.PubKey(rn.peer)
c.connProc = process.WithParent(c.pairProc)
return c
}

Expand All @@ -74,11 +72,11 @@ func (c *conn) ID() string {
}

func (c *conn) Close() error {
return c.pairProc.Close()
}

func (c *conn) setup() {
c.connProc.SetTeardown(c.teardown)
c.closeOnce.Do(func() {
go c.rconn.Close()
c.teardown()
})
return nil
}

func (c *conn) teardown() error {
Expand Down
7 changes: 2 additions & 5 deletions p2p/net/mock/mock_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

process "github.com/jbenet/goprocess"
)

// link implements mocknet.Link
Expand All @@ -33,13 +31,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
l.RLock()
defer l.RUnlock()

parent := process.WithTeardown(func() error { return nil })
target := l.nets[0]
if target == dialer {
target = l.nets[1]
}
dc := newConn(parent, dialer, target, l, network.DirOutbound)
tc := newConn(parent, target, dialer, l, network.DirInbound)
dc := newConn(dialer, target, l, network.DirOutbound)
tc := newConn(target, dialer, l, network.DirInbound)
dc.rconn = tc
tc.rconn = dc
return dc, tc
Expand Down
39 changes: 20 additions & 19 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (

bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

p2putil "github.com/libp2p/go-libp2p-netutil"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -30,7 +27,7 @@ var blackholeIP6 = net.ParseIP("100::")
// mocknet implements mocknet.Mocknet
type mocknet struct {
nets map[peer.ID]*peernet
hosts map[peer.ID]*bhost.BasicHost
hosts map[peer.ID]host.Host

// links make it possible to connect two peers.
// think of links as the physical medium.
Expand All @@ -40,22 +37,30 @@ type mocknet struct {

linkDefaults LinkOptions

proc goprocess.Process // for Context closing
ctx context.Context
ctxCancel context.CancelFunc
ctx context.Context
sync.Mutex
}

func New(ctx context.Context) Mocknet {
proc := goprocessctx.WithContext(ctx)
ctx = goprocessctx.WithProcessClosing(ctx, proc)

return &mocknet{
func New() Mocknet {
mn := &mocknet{
nets: map[peer.ID]*peernet{},
hosts: map[peer.ID]*bhost.BasicHost{},
hosts: map[peer.ID]host.Host{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: proc,
ctx: ctx,
}
mn.ctx, mn.ctxCancel = context.WithCancel(context.Background())
return mn
}

func (mn *mocknet) Close() error {
mn.ctxCancel()
for _, h := range mn.hosts {
h.Close()
}
for _, n := range mn.nets {
n.Close()
}
return nil
}

func (mn *mocknet) GenPeer() (host.Host, error) {
Expand Down Expand Up @@ -104,7 +109,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
}

func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) {
n, err := newPeernet(mn.ctx, mn, p, ps)
n, err := newPeernet(mn, p, ps)
if err != nil {
return nil, err
}
Expand All @@ -119,10 +124,6 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host
return nil, err
}

// Ensure we close the hoset when we close the mock network.
// Otherwise, tests leak memory.
mn.proc.AddChild(goprocess.WithTeardown(h.Close))

mn.Lock()
mn.nets[n.peer] = n
mn.hosts[n.peer] = h
Expand Down
8 changes: 3 additions & 5 deletions p2p/net/mock/mock_notif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (

func TestNotifications(t *testing.T) {
const swarmSize = 5
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const timeout = 10 * time.Second

mn, err := FullMeshLinked(ctx, swarmSize)
mn, err := FullMeshLinked(swarmSize)
if err != nil {
t.Fatal(err)
}

timeout := 10 * time.Second
defer mn.Close()

// signup notifs
nets := mn.Nets()
Expand Down
23 changes: 2 additions & 21 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -36,12 +33,11 @@ type peernet struct {
notifmu sync.Mutex
notifs map[network.Notifiee]struct{}

proc goprocess.Process
sync.RWMutex
}

// newPeernet constructs a new peernet
func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
n := &peernet{
mocknet: m,
peer: p,
Expand All @@ -53,12 +49,10 @@ func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peersto
notifs: make(map[network.Notifiee]struct{}),
}

n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
return n, nil
}

func (pn *peernet) teardown() error {

func (pn *peernet) Close() error {
// close the connections
for _, c := range pn.allConns() {
c.Close()
Expand All @@ -79,11 +73,6 @@ func (pn *peernet) allConns() []*conn {
return cs
}

// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.proc.Close()
}

func (pn *peernet) Peerstore() peerstore.Peerstore {
return pn.ps
}
Expand Down Expand Up @@ -199,9 +188,6 @@ func (pn *peernet) remoteOpenedConn(c *conn) {
// to given remote peer over given link
func (pn *peernet) addConn(c *conn) {
defer c.notifLk.Unlock()
// Call this after unlocking as it might cause us to immediately close
// the connection and remove it from the swarm.
c.setup()

pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
Expand All @@ -226,11 +212,6 @@ func (pn *peernet) removeConn(c *conn) {
delete(cs, c)
}

// Process returns the network's Process
func (pn *peernet) Process() goprocess.Process {
return pn.proc
}

// LocalPeer the network's LocalPeer
func (pn *peernet) LocalPeer() peer.ID {
return pn.peer
Expand Down
Loading

0 comments on commit e014b96

Please sign in to comment.