Skip to content

Commit

Permalink
integrate the event bus into host implementations (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored and marten-seemann committed Apr 19, 2022
1 parent f0aeaa7 commit e6824a7
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"io"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-eventbus"

logging "github.com/ipfs/go-log"

ma "github.com/multiformats/go-multiaddr"
Expand All @@ -21,16 +24,26 @@ var log = logging.Logger("blankhost")

// BlankHost is the thinnest implementation of the host.Host interface
type BlankHost struct {
n network.Network
mux *mstream.MultistreamMuxer
cmgr connmgr.ConnManager
n network.Network
mux *mstream.MultistreamMuxer
cmgr connmgr.ConnManager
eventbus event.Bus
emitters struct {
evtLocalProtocolsUpdated event.Emitter
}
}

func NewBlankHost(n network.Network) *BlankHost {
bh := &BlankHost{
n: n,
cmgr: &connmgr.NullConnMgr{},
mux: mstream.NewMultistreamMuxer(),
n: n,
cmgr: &connmgr.NullConnMgr{},
mux: mstream.NewMultistreamMuxer(),
eventbus: eventbus.NewBus(),
}

var err error
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil
}

n.SetStreamHandler(bh.newStreamHandler)
Expand Down Expand Up @@ -98,8 +111,11 @@ func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protoco
return s, nil
}

func (bh *BlankHost) RemoveStreamHandler(p protocol.ID) {
bh.Mux().RemoveHandler(string(p))
func (bh *BlankHost) RemoveStreamHandler(pid protocol.ID) {
bh.Mux().RemoveHandler(string(pid))
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Removed: []protocol.ID{pid},
})
}

func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
Expand All @@ -109,6 +125,9 @@ func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHan
handler(is)
return nil
})
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Added: []protocol.ID{pid},
})
}

func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler network.StreamHandler) {
Expand All @@ -118,11 +137,13 @@ func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool,
handler(is)
return nil
})
bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Added: []protocol.ID{pid},
})
}

// newStreamHandler is the remote-opened stream handler for network.Network
func (bh *BlankHost) newStreamHandler(s network.Stream) {

protoID, handle, err := bh.Mux().Negotiate(s)
if err != nil {
log.Warning("protocol mux failed: %s", err)
Expand All @@ -148,3 +169,7 @@ func (bh *BlankHost) Network() network.Network {
func (bh *BlankHost) ConnManager() connmgr.ConnManager {
return bh.cmgr
}

func (bh *BlankHost) EventBus() event.Bus {
return bh.eventbus
}

0 comments on commit e6824a7

Please sign in to comment.