Skip to content

Commit

Permalink
Change Process interface into object variable
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
  • Loading branch information
rht committed Jun 21, 2015
1 parent 137ea69 commit 19d1744
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 58 deletions.
43 changes: 30 additions & 13 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ type IpfsNode struct {

IpnsFs *ipnsfs.Filesystem

goprocess.Process
proc goprocess.Process
ctx context.Context

mode mode
}
Expand All @@ -120,23 +121,24 @@ type Mounts struct {

type ConfigOption func(ctx context.Context) (*IpfsNode, error)

func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
procctx := goprocessctx.WithContext(parent)
ctx := parent
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node, err := option(ctx)
if err != nil {
return nil, err
}

proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(node.teardown)
node.proc = proc
node.ctx = ctx

success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
procctx.Close()
proc.Close()
}
}()

node, err := option(ctx)
if err != nil {
return nil, err
}
node.Process = procctx
ctxg.SetTeardown(node.teardown)

// Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be
// initialized after this point.
Expand Down Expand Up @@ -334,6 +336,21 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
return nil
}

// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
return n.ctx
}

// teardown closes owned children. If any errors occur, this function returns
// the first error.
func (n *IpfsNode) teardown() error {
Expand All @@ -360,7 +377,7 @@ func (n *IpfsNode) teardown() error {
}

if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
closers = append(closers, dht.Process())
}

if n.PeerHost != nil {
Expand Down
10 changes: 4 additions & 6 deletions core/corehttp/corehttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
core "github.com/ipfs/go-ipfs/core"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
Expand Down Expand Up @@ -78,20 +79,17 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
var serverError error
serverExited := make(chan struct{})

node.Children().Add(1)
defer node.Children().Done()

go func() {
node.Process().Go(func(p goprocess.Process) {
serverError = http.Serve(lis, handler)
close(serverExited)
}()
})

// wait for server to exit.
select {
case <-serverExited:

// if node being closed before server exits, close server
case <-node.Closing():
case <-node.Process().Closing():
log.Infof("server at %s terminating...", addr)

lis.Close()
Expand Down
4 changes: 2 additions & 2 deletions core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

func Cat(n *core.IpfsNode, pstr string) (io.Reader, error) {
p := path.FromString(pstr)
dagNode, err := n.Resolver.ResolvePath(n.ContextGroup.Context(), p)
dagNode, err := n.Resolver.ResolvePath(n.Context(), p)
if err != nil {
return nil, err
}
return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG)
return uio.NewDagReader(n.Context(), dagNode, n.DAG)
}
16 changes: 7 additions & 9 deletions core/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package coremock
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
Expand All @@ -28,7 +27,13 @@ import (
// NewMockNode constructs an IpfsNode for use in tests.
func NewMockNode() (*core.IpfsNode, error) {
ctx := context.TODO()
nd := new(core.IpfsNode)
nd, err := core.Offline(&repo.Mock{
// TODO C: conf,
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
})(ctx)
if err != nil {
return nil, err
}

// Generate Identity
ident, err := testutil.RandIdentity()
Expand All @@ -42,19 +47,12 @@ func NewMockNode() (*core.IpfsNode, error) {
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())
nd.Process = goprocessctx.WithContext(ctx)

nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
if err != nil {
return nil, err
}

// Temp Datastore
nd.Repo = &repo.Mock{
// TODO C: conf,
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
}

// Routing
nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey)

Expand Down
2 changes: 1 addition & 1 deletion fuse/ipns/mount_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
return nil, err
}

return mount.NewMount(ipfs, fsys, ipnsmp, allow_other)
return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other)
}
2 changes: 1 addition & 1 deletion fuse/readonly/mount_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
cfg := ipfs.Repo.Config()
allow_other := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs, fsys, mountpoint, allow_other)
return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other)
}
10 changes: 6 additions & 4 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type mocknet struct {
linkDefaults LinkOptions

proc goprocess.Process // for Context closing
ctx context.Context
sync.RWMutex
}

Expand All @@ -42,6 +43,7 @@ func New(ctx context.Context) Mocknet {
hosts: map[peer.ID]*bhost.BasicHost{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
}
}

Expand All @@ -62,15 +64,15 @@ func (mn *mocknet) GenPeer() (host.Host, error) {
}

func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
n, err := newPeernet(mn.cg.Context(), mn, k, a)
n, err := newPeernet(mn.ctx, mn, k, a)
if err != nil {
return nil, err
}

h := bhost.New(n)
log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)

mn.cg.AddChild(n.cg)
mn.proc.AddChild(n.proc)

mn.Lock()
mn.nets[n.peer] = n
Expand Down Expand Up @@ -298,11 +300,11 @@ func (mn *mocknet) ConnectAll() error {
}

func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) {
return mn.Net(a).DialPeer(mn.cg.Context(), b)
return mn.Net(a).DialPeer(mn.ctx, b)
}

func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) {
return a.DialPeer(mn.cg.Context(), b.LocalPeer())
return a.DialPeer(mn.ctx, b.LocalPeer())
}

func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error {
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
notifs: make(map[inet.Notifiee]struct{}),
}

n.cg.SetTeardown(n.teardown)
n.proc.SetTeardown(n.teardown)
return n, nil
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (pn *peernet) allConns() []*conn {

// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.cg.Close()
return pn.proc.Close()
}

func (pn *peernet) Peerstore() peer.Peerstore {
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Swarm struct {
notifs map[inet.Notifiee]ps.Notifiee

proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
}

Expand All @@ -69,6 +70,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local: local,
peers: peers,
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
Expand Down Expand Up @@ -110,6 +112,11 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}

// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
return s.ctx
}

// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.proc.Close()
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
if err != nil {
return err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
return
}
}
}(s.cg.Context(), sl)
}(s.Context(), sl)

return nil
}
Expand Down
39 changes: 28 additions & 11 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type IpfsDHT struct {

Validator record.Validator // record validator funcs

Context context.Context
goprocess.Process
ctx context.Context
proc goprocess.Process
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand All @@ -73,18 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
dht.proc = proc
dht.ctx = ctx

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.providers = NewProviderManager(dht.ctx, dht.self)
dht.proc.AddChild(dht.providers.proc)

dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()
Expand All @@ -93,7 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator

if doPinging {
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
dht.proc.Go(func(p goprocess.Process) {
dht.PingRoutine(time.Second * 10)
})
}
return dht
}
Expand Down Expand Up @@ -360,15 +362,30 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.Closing():
case <-dht.proc.Closing():
return
}
}
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
4 changes: 2 additions & 2 deletions routing/dht/notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (nn *netNotifiee) DHT() *IpfsDHT {
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}
Expand All @@ -26,7 +26,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}
Expand Down
Loading

0 comments on commit 19d1744

Please sign in to comment.