Skip to content

Commit

Permalink
dht ctxcloserify
Browse files Browse the repository at this point in the history
  • Loading branch information
jbenet committed Oct 26, 2014
1 parent 4584bc8 commit d79ebe6
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 31 deletions.
1 change: 1 addition & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
// TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
n.Routing = dhtRouting
n.AddCloserChild(dhtRouting)

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
Expand Down
15 changes: 10 additions & 5 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Expand Down Expand Up @@ -56,7 +57,7 @@ type IpfsDHT struct {
//lock to make diagnostics work better
diaglock sync.Mutex

ctx context.Context
ctxc.ContextCloser
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand All @@ -67,9 +68,10 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.datastore = dstore
dht.self = p
dht.peerstore = ps
dht.ctx = ctx
dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)

dht.providers = NewProviderManager(p.ID())
dht.providers = NewProviderManager(dht.Context(), p.ID())
dht.AddCloserChild(dht.providers)

dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
Expand All @@ -78,6 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.birth = time.Now()

if doPinging {
dht.Children().Add(1)
go dht.PingRoutine(time.Second * 10)
}
return dht
Expand Down Expand Up @@ -516,6 +519,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error {

// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()

tick := time.Tick(t)
for {
select {
Expand All @@ -524,13 +529,13 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.ctx, time.Second*5)
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
err := dht.Ping(ctx, p)
if err != nil {
log.Errorf("Ping error: %s", err)
}
}
case <-dht.ctx.Done():
case <-dht.Closing():
return
}
}
Expand Down
20 changes: 10 additions & 10 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func TestPing(t *testing.T) {
dhtA := setupDHT(ctx, t, peerA)
dhtB := setupDHT(ctx, t, peerB)

defer dhtA.Halt()
defer dhtB.Halt()
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()

Expand Down Expand Up @@ -136,8 +136,8 @@ func TestValueGetSet(t *testing.T) {
dhtA := setupDHT(ctx, t, peerA)
dhtB := setupDHT(ctx, t, peerB)

defer dhtA.Halt()
defer dhtB.Halt()
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()

Expand Down Expand Up @@ -179,7 +179,7 @@ func TestProvides(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestProvidesAsync(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestLayeredGet(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
defer dhts[i].dialer.(inet.Network).Close()
}
}()
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestFindPeer(t *testing.T) {
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Halt()
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
}
}()
Expand Down Expand Up @@ -443,8 +443,8 @@ func TestConnectCollision(t *testing.T) {
t.Fatal("Timeout received!")
}

dhtA.Halt()
dhtB.Halt()
dhtA.Close()
dhtB.Close()
dhtA.dialer.(inet.Network).Close()
dhtB.dialer.(inet.Network).Close()

Expand Down
6 changes: 0 additions & 6 deletions routing/dht/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,3 @@ func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Messag

return pmes, nil // send back same msg as confirmation.
}

// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
dht.providers.Halt()
}
24 changes: 16 additions & 8 deletions routing/dht/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (

peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

type ProviderManager struct {
Expand All @@ -14,8 +17,8 @@ type ProviderManager struct {
getlocal chan chan []u.Key
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
period time.Duration
ctxc.ContextCloser
}

type addProv struct {
Expand All @@ -28,19 +31,24 @@ type getProv struct {
resp chan []peer.Peer
}

func NewProviderManager(local peer.ID) *ProviderManager {
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[u.Key][]*providerInfo)
pm.getlocal = make(chan chan []u.Key)
pm.local = make(map[u.Key]struct{})
pm.halt = make(chan struct{})
pm.ContextCloser = ctxc.NewContextCloser(ctx, nil)

pm.Children().Add(1)
go pm.run()

return pm
}

func (pm *ProviderManager) run() {
defer pm.Children().Done()

tick := time.NewTicker(time.Hour)
for {
select {
Expand All @@ -53,19 +61,22 @@ func (pm *ProviderManager) run() {
pi.Value = np.val
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)

case gp := <-pm.getprovs:
var parr []peer.Peer
provs := pm.providers[gp.k]
for _, p := range provs {
parr = append(parr, p.Value)
}
gp.resp <- parr

case lc := <-pm.getlocal:
var keys []u.Key
for k, _ := range pm.local {
keys = append(keys, k)
}
lc <- keys

case <-tick.C:
for k, provs := range pm.providers {
var filtered []*providerInfo
Expand All @@ -76,7 +87,8 @@ func (pm *ProviderManager) run() {
}
pm.providers[k] = filtered
}
case <-pm.halt:

case <-pm.Closing():
return
}
}
Expand All @@ -102,7 +114,3 @@ func (pm *ProviderManager) GetLocal() []u.Key {
pm.getlocal <- resp
return <-resp
}

func (pm *ProviderManager) Halt() {
pm.halt <- struct{}{}
}
7 changes: 5 additions & 2 deletions routing/dht/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (

"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

func TestProviderManager(t *testing.T) {
ctx := context.Background()
mid := peer.ID("testing")
p := NewProviderManager(mid)
p := NewProviderManager(ctx, mid)
a := u.Key("test")
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
p.Halt()
p.Close()
}

0 comments on commit d79ebe6

Please sign in to comment.