diff --git a/core/core.go b/core/core.go index a2dd95ad1c0..36b4d825e16 100644 --- a/core/core.go +++ b/core/core.go @@ -138,6 +138,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { // TODO(brian): perform this inside NewDHT factory method dhtService.SetHandler(dhtRouting) // wire the handler to the service. n.Routing = dhtRouting + n.AddCloserChild(dhtRouting) // setup exchange service const alwaysSendToPeer = true // use YesManStrategy diff --git a/routing/dht/dht.go b/routing/dht/dht.go index fdb9f96f229..76cde7fb54e 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -14,6 +14,7 @@ import ( pb "github.com/jbenet/go-ipfs/routing/dht/pb" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -56,7 +57,7 @@ type IpfsDHT struct { //lock to make diagnostics work better diaglock sync.Mutex - ctx context.Context + ctxc.ContextCloser } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -67,9 +68,10 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia dht.datastore = dstore dht.self = p dht.peerstore = ps - dht.ctx = ctx + dht.ContextCloser = ctxc.NewContextCloser(ctx, nil) - dht.providers = NewProviderManager(p.ID()) + dht.providers = NewProviderManager(dht.Context(), p.ID()) + dht.AddCloserChild(dht.providers) dht.routingTables = make([]*kb.RoutingTable, 3) dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) @@ -78,6 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia dht.birth = time.Now() if doPinging { + dht.Children().Add(1) go dht.PingRoutine(time.Second * 10) } return dht @@ -516,6 +519,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error { // PingRoutine periodically pings nearest neighbors. func (dht *IpfsDHT) PingRoutine(t time.Duration) { + defer dht.Children().Done() + tick := time.Tick(t) for { select { @@ -524,13 +529,13 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { rand.Read(id) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5) for _, p := range peers { - ctx, _ := context.WithTimeout(dht.ctx, time.Second*5) + ctx, _ := context.WithTimeout(dht.Context(), time.Second*5) err := dht.Ping(ctx, p) if err != nil { log.Errorf("Ping error: %s", err) } } - case <-dht.ctx.Done(): + case <-dht.Closing(): return } } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index ef13f03672a..2b873233813 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -92,8 +92,8 @@ func TestPing(t *testing.T) { dhtA := setupDHT(ctx, t, peerA) dhtB := setupDHT(ctx, t, peerB) - defer dhtA.Halt() - defer dhtB.Halt() + defer dhtA.Close() + defer dhtB.Close() defer dhtA.dialer.(inet.Network).Close() defer dhtB.dialer.(inet.Network).Close() @@ -136,8 +136,8 @@ func TestValueGetSet(t *testing.T) { dhtA := setupDHT(ctx, t, peerA) dhtB := setupDHT(ctx, t, peerB) - defer dhtA.Halt() - defer dhtB.Halt() + defer dhtA.Close() + defer dhtB.Close() defer dhtA.dialer.(inet.Network).Close() defer dhtB.dialer.(inet.Network).Close() @@ -179,7 +179,7 @@ func TestProvides(t *testing.T) { _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { - dhts[i].Halt() + dhts[i].Close() defer dhts[i].dialer.(inet.Network).Close() } }() @@ -239,7 +239,7 @@ func TestProvidesAsync(t *testing.T) { _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { - dhts[i].Halt() + dhts[i].Close() defer dhts[i].dialer.(inet.Network).Close() } }() @@ -302,7 +302,7 @@ func TestLayeredGet(t *testing.T) { _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { - dhts[i].Halt() + dhts[i].Close() defer dhts[i].dialer.(inet.Network).Close() } }() @@ -355,7 +355,7 @@ func TestFindPeer(t *testing.T) { _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { - dhts[i].Halt() + dhts[i].Close() dhts[i].dialer.(inet.Network).Close() } }() @@ -443,8 +443,8 @@ func TestConnectCollision(t *testing.T) { t.Fatal("Timeout received!") } - dhtA.Halt() - dhtB.Halt() + dhtA.Close() + dhtB.Close() dhtA.dialer.(inet.Network).Close() dhtB.dialer.(inet.Network).Close() diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index d5db8d1da9f..fe628eeef57 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -205,9 +205,3 @@ func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Messag return pmes, nil // send back same msg as confirmation. } - -// Halt stops all communications from this peer and shut down -// TODO -- remove this in favor of context -func (dht *IpfsDHT) Halt() { - dht.providers.Halt() -} diff --git a/routing/dht/providers.go b/routing/dht/providers.go index 204fdf7d5da..f7d491d6a71 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -5,6 +5,9 @@ import ( peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) type ProviderManager struct { @@ -14,8 +17,8 @@ type ProviderManager struct { getlocal chan chan []u.Key newprovs chan *addProv getprovs chan *getProv - halt chan struct{} period time.Duration + ctxc.ContextCloser } type addProv struct { @@ -28,19 +31,24 @@ type getProv struct { resp chan []peer.Peer } -func NewProviderManager(local peer.ID) *ProviderManager { +func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { pm := new(ProviderManager) pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.providers = make(map[u.Key][]*providerInfo) pm.getlocal = make(chan chan []u.Key) pm.local = make(map[u.Key]struct{}) - pm.halt = make(chan struct{}) + pm.ContextCloser = ctxc.NewContextCloser(ctx, nil) + + pm.Children().Add(1) go pm.run() + return pm } func (pm *ProviderManager) run() { + defer pm.Children().Done() + tick := time.NewTicker(time.Hour) for { select { @@ -53,6 +61,7 @@ func (pm *ProviderManager) run() { pi.Value = np.val arr := pm.providers[np.k] pm.providers[np.k] = append(arr, pi) + case gp := <-pm.getprovs: var parr []peer.Peer provs := pm.providers[gp.k] @@ -60,12 +69,14 @@ func (pm *ProviderManager) run() { parr = append(parr, p.Value) } gp.resp <- parr + case lc := <-pm.getlocal: var keys []u.Key for k, _ := range pm.local { keys = append(keys, k) } lc <- keys + case <-tick.C: for k, provs := range pm.providers { var filtered []*providerInfo @@ -76,7 +87,8 @@ func (pm *ProviderManager) run() { } pm.providers[k] = filtered } - case <-pm.halt: + + case <-pm.Closing(): return } } @@ -102,7 +114,3 @@ func (pm *ProviderManager) GetLocal() []u.Key { pm.getlocal <- resp return <-resp } - -func (pm *ProviderManager) Halt() { - pm.halt <- struct{}{} -} diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index b37327d2e7e..c4ae53910a8 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -5,16 +5,19 @@ import ( "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) func TestProviderManager(t *testing.T) { + ctx := context.Background() mid := peer.ID("testing") - p := NewProviderManager(mid) + p := NewProviderManager(ctx, mid) a := u.Key("test") p.AddProvider(a, peer.WithIDString("testingprovider")) resp := p.GetProviders(a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } - p.Halt() + p.Close() }