Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ContextCloserify #208

Merged
merged 6 commits into from
Oct 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 46 additions & 66 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)

var log = u.Logger("core")
Expand Down Expand Up @@ -71,119 +72,98 @@ type IpfsNode struct {

// the pinning manager
Pinning pin.Pinner

ctxc.ContextCloser
}

// NewIpfsNode constructs a new IpfsNode based on the given config.
func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
// derive this from a higher context.
// cancel if we need to fail early.
ctx, cancel := context.WithCancel(context.TODO())
func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
cancel()
if !success && n != nil {
n.Close()
}
}()

if cfg == nil {
return nil, fmt.Errorf("configuration required")
}

d, err := makeDatastore(cfg.Datastore)
if err != nil {
// derive this from a higher context.
ctx := context.TODO()
n = &IpfsNode{
Config: cfg,
ContextCloser: ctxc.NewContextCloser(ctx, nil),
}

// setup datastore.
if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil {
return nil, err
}

peerstore := peer.NewPeerstore()
local, err := initIdentity(cfg, peerstore, online)
// setup peerstore + local peer identity
n.Peerstore = peer.NewPeerstore()
n.Identity, err = initIdentity(n.Config, n.Peerstore, online)
if err != nil {
return nil, err
}

// FIXME(brian): This is a bit dangerous. If any of the vars declared in
// this block are assigned inside of the "if online" block using the ":="
// declaration syntax, the compiler permits re-declaration. This is rather
// undesirable
var (
net inet.Network
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
route *dht.IpfsDHT
exchangeSession exchange.Interface
diagnostics *diag.Diagnostics
network inet.Network
)

// setup online services
if online {

dhtService := netservice.NewService(nil) // nil handler for now, need to patch it
exchangeService := netservice.NewService(nil) // nil handler for now, need to patch it
diagService := netservice.NewService(nil)

if err := dhtService.Start(ctx); err != nil {
return nil, err
}
if err := exchangeService.Start(ctx); err != nil {
return nil, err
}
if err := diagService.Start(ctx); err != nil {
return nil, err
}
dhtService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it
exchangeService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it
diagService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it

net, err = inet.NewIpfsNetwork(ctx, local, peerstore, &mux.ProtocolMap{
muxMap := &mux.ProtocolMap{
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
mux.ProtocolID_Diagnostic: diagService,
// add protocol services here.
})
}

// setup the network
n.Network, err = inet.NewIpfsNetwork(ctx, n.Identity, n.Peerstore, muxMap)
if err != nil {
return nil, err
}
network = net
n.AddCloserChild(n.Network)

diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.SetHandler(diagnostics)
// setup diagnostics service
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.Network, diagService)
diagService.SetHandler(n.Diagnostics)

route = dht.NewDHT(ctx, local, peerstore, net, dhtService, d)
// setup routing service
dhtRouting := dht.NewDHT(ctx, n.Identity, n.Peerstore, n.Network, dhtService, n.Datastore)
// TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(route) // wire the handler to the service.
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
n.Routing = dhtRouting
n.AddCloserChild(dhtRouting)

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
exchangeSession = bitswap.NetMessageSession(ctx, local, net, exchangeService, route, d, alwaysSendToPeer)
n.Exchange = bitswap.NetMessageSession(ctx, n.Identity, n.Network, exchangeService, n.Routing, n.Datastore, alwaysSendToPeer)
// ok, this function call is ridiculous o/ consider making it simpler.

// TODO(brian): pass a context to initConnections
go initConnections(ctx, cfg, peerstore, route)
go initConnections(ctx, n.Config, n.Peerstore, dhtRouting)
}

// TODO(brian): when offline instantiate the BlockService with a bitswap
// session that simply doesn't return blocks
bs, err := bserv.NewBlockService(d, exchangeSession)
n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange)
if err != nil {
return nil, err
}

dag := merkledag.NewDAGService(bs)
ns := namesys.NewNameSystem(route)
p, err := pin.LoadPinner(d, dag)
n.DAG = merkledag.NewDAGService(n.Blocks)
n.Namesys = namesys.NewNameSystem(n.Routing)
n.Pinning, err = pin.LoadPinner(n.Datastore, n.DAG)
if err != nil {
p = pin.NewPinner(d, dag)
n.Pinning = pin.NewPinner(n.Datastore, n.DAG)
}

success = true
return &IpfsNode{
Config: cfg,
Peerstore: peerstore,
Datastore: d,
Blocks: bs,
DAG: dag,
Resolver: &path.Resolver{DAG: dag},
Exchange: exchangeSession,
Identity: local,
Routing: route,
Namesys: ns,
Diagnostics: diagnostics,
Network: network,
Pinning: p,
}, nil
return n, nil
}

func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (peer.Peer, error) {
Expand Down
5 changes: 2 additions & 3 deletions net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
mux "github.com/jbenet/go-ipfs/net/mux"
srv "github.com/jbenet/go-ipfs/net/service"
peer "github.com/jbenet/go-ipfs/peer"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)

// Network is the interface IPFS uses for connecting to the world.
type Network interface {
ctxc.ContextCloser

// Listen handles incoming connections on given Multiaddr.
// Listen(*ma.Muliaddr) error
Expand All @@ -35,9 +37,6 @@ type Network interface {

// SendMessage sends given Message out
SendMessage(msg.NetMessage) error

// Close terminates all network operation
Close() error
}

// Sender interface for network services.
Expand Down
90 changes: 32 additions & 58 deletions net/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)

var log = u.Logger("muxer")

// ProtocolIDs used to identify each protocol.
// These should probably be defined elsewhere.
var (
ProtocolID_Routing = pb.ProtocolID_Routing
ProtocolID_Exchange = pb.ProtocolID_Exchange
Expand All @@ -38,57 +41,40 @@ type Muxer struct {
// Protocols are the multiplexed services.
Protocols ProtocolMap

// cancel is the function to stop the Muxer
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup

bwiLock sync.Mutex
bwIn uint64

bwoLock sync.Mutex
bwOut uint64

*msg.Pipe
ctxc.ContextCloser
}

// NewMuxer constructs a muxer given a protocol map.
func NewMuxer(mp ProtocolMap) *Muxer {
return &Muxer{
Protocols: mp,
Pipe: msg.NewPipe(10),
func NewMuxer(ctx context.Context, mp ProtocolMap) *Muxer {
m := &Muxer{
Protocols: mp,
Pipe: msg.NewPipe(10),
ContextCloser: ctxc.NewContextCloser(ctx, nil),
}
}

// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe
}

// Start kicks off the Muxer goroutines.
func (m *Muxer) Start(ctx context.Context) error {
if m == nil {
panic("nix muxer")
}

if m.cancel != nil {
return errors.New("Muxer already started.")
}

// make a cancellable context.
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg = sync.WaitGroup{}

m.wg.Add(1)
m.Children().Add(1)
go m.handleIncomingMessages()
for pid, proto := range m.Protocols {
m.wg.Add(1)
m.Children().Add(1)
go m.handleOutgoingMessages(pid, proto)
}

return nil
return m
}

// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe
}

// GetBandwidthTotals return the in/out bandwidth measured over this muxer.
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.bwIn
Expand All @@ -100,19 +86,6 @@ func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
return
}

// Stop stops muxer activity.
func (m *Muxer) Stop() {
if m.cancel == nil {
panic("muxer stopped twice.")
}
// issue cancel, and wipe func.
m.cancel()
m.cancel = context.CancelFunc(nil)

// wait for everything to wind down.
m.wg.Wait()
}

// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
if _, found := m.Protocols[pid]; found {
Expand All @@ -126,28 +99,26 @@ func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
// handleIncoming consumes the messages on the m.Incoming channel and
// routes them appropriately (to the protocols).
func (m *Muxer) handleIncomingMessages() {
defer m.wg.Done()
defer m.Children().Done()

for {
if m == nil {
panic("nil muxer")
}

select {
case <-m.Closing():
return

case msg, more := <-m.Incoming:
if !more {
return
}
m.Children().Add(1)
go m.handleIncomingMessage(msg)

case <-m.ctx.Done():
return
}
}
}

// handleIncomingMessage routes message to the appropriate protocol.
func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
defer m.Children().Done()

m.bwiLock.Lock()
// TODO: compensate for overhead
Expand All @@ -169,33 +140,35 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {

select {
case proto.GetPipe().Incoming <- m2:
case <-m.ctx.Done():
log.Error(m.ctx.Err())
case <-m.Closing():
return
}
}

// handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out.
func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
defer m.wg.Done()
defer m.Children().Done()

for {
select {
case msg, more := <-proto.GetPipe().Outgoing:
if !more {
return
}
m.Children().Add(1)
go m.handleOutgoingMessage(pid, msg)

case <-m.ctx.Done():
case <-m.Closing():
return
}
}
}

// handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
defer m.Children().Done()

data, err := wrapData(m1.Data(), pid)
if err != nil {
log.Errorf("muxer serializing error: %v", err)
Expand All @@ -204,13 +177,14 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {

m.bwoLock.Lock()
// TODO: compensate for overhead
// TODO(jbenet): switch this to a goroutine to prevent sync waiting.
m.bwOut += uint64(len(data))
m.bwoLock.Unlock()

m2 := msg.New(m1.Peer(), data)
select {
case m.GetPipe().Outgoing <- m2:
case <-m.ctx.Done():
case <-m.Closing():
return
}
}
Expand Down
Loading