Skip to content

Commit

Permalink
feat(CommonService): add channel and use commonService in discv5 (#735)
Browse files Browse the repository at this point in the history
* feat(CommonService): add channel and use commonService in discv5

* fix: add mutex to PushToChan

* fix: remove generic functionality
  • Loading branch information
harsh-98 authored Sep 18, 2023
1 parent fb2df14 commit a5f9ee5
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 127 deletions.
134 changes: 32 additions & 102 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -34,23 +32,20 @@ type PeerConnector interface {
}

type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
peerChannel *peerChannel
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics

peerConnector PeerConnector
NAT nat.Interface

log *zap.Logger

started atomic.Bool
cancel context.CancelFunc
wg *sync.WaitGroup
*peermanager.CommonDiscoveryService
}

type discV5Parameters struct {
Expand Down Expand Up @@ -132,13 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
}

return &DiscoveryV5{
params: params,
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
peerChannel: &peerChannel{},
localnode: localnode,
metrics: newMetrics(reg),
params: params,
peerConnector: peerConnector,
NAT: NAT,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
Expand Down Expand Up @@ -167,9 +161,9 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
d.udpAddr = conn.LocalAddr().(*net.UDPAddr)

if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()

Expand Down Expand Up @@ -197,74 +191,24 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}

type peerChannel struct {
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}

func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}

func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}

func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}

func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false

}
return true
}

// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state
// and prevents multiple calls to the start method by being atomic.
if !d.started.CompareAndSwap(false, true) {
return nil
}

ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
return d.CommonDiscoveryService.Start(ctx, d.start)
}

d.peerChannel.Start(ctx)
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())
func (d *DiscoveryV5) start() error {
d.peerConnector.Subscribe(d.Context(), d.GetListeningChan())

err := d.listen(ctx)
err := d.listen(d.Context())
if err != nil {
return err
}

if d.params.autoFindPeers {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
d.runDiscoveryV5Loop(ctx)
defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(d.Context())
}()
}

Expand All @@ -284,27 +228,18 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
// only works if the discovery v5 is in running state
// so we can assume that cancel method is set
func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}

d.cancel()

if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}

d.wg.Wait()

defer func() {
if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting")
}
}()

d.peerChannel.Stop()
d.CommonDiscoveryService.Stop(func() {
if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}
})
}

/*
Expand Down Expand Up @@ -495,7 +430,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n,
}

if d.peerChannel.Publish(peer) {
if d.PushToChan(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
Expand Down Expand Up @@ -527,8 +462,3 @@ restartLoop:
}
d.log.Warn("Discv5 loop stopped")
}

// IsStarted determines whether discoveryV5 started or not
func (d *DiscoveryV5) IsStarted() bool {
return d.started.Load()
}
81 changes: 81 additions & 0 deletions waku/v2/peermanager/common_discovery_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package peermanager

import (
"context"
"sync"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
PubSubTopics []string
}

type CommonDiscoveryService struct {
commonService *protocol.CommonService
channel chan PeerData
}

func NewCommonDiscoveryService() *CommonDiscoveryService {
return &CommonDiscoveryService{
commonService: protocol.NewCommonService(),
}
}

func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error {
return sp.commonService.Start(ctx, func() error {
// currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them
// mutex protection for this operation
sp.channel = make(chan PeerData)
return fn()
})
}

func (sp *CommonDiscoveryService) Stop(stopFn func()) {
sp.commonService.Stop(func() {
stopFn()
sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service.
// there is a wait in the CommonService too
close(sp.channel)
})
}
func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
return sp.channel
}
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
sp.RLock()
defer sp.RUnlock()
if err := sp.ErrOnNotRunning(); err != nil {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.Context().Done():
return false
}
}

func (sp *CommonDiscoveryService) RLock() {
sp.commonService.RLock()
}
func (sp *CommonDiscoveryService) RUnlock() {
sp.commonService.RUnlock()
}

func (sp *CommonDiscoveryService) Context() context.Context {
return sp.commonService.Context()
}
func (sp *CommonDiscoveryService) ErrOnNotRunning() error {
return sp.commonService.ErrOnNotRunning()
}
func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup {
return sp.commonService.WaitGroup()
}
9 changes: 0 additions & 9 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,14 +24,6 @@ import (
lru "github.com/hashicorp/golang-lru"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
PubSubTopics []string
ENR *enode.Node
}

// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
Expand Down
1 change: 1 addition & 0 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PeerExchange
DNSDiscovery
Rendezvous
PeerManager
)

const peerOrigin = "origin"
Expand Down
32 changes: 16 additions & 16 deletions waku/v2/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Rendezvous struct {
peerConnector PeerConnector

log *zap.Logger
*protocol.CommonService
*peermanager.CommonDiscoveryService
}

// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
Expand All @@ -43,10 +43,10 @@ type PeerConnector interface {
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
return &Rendezvous{
db: db,
peerConnector: peerConnector,
log: logger,
CommonService: protocol.NewCommonService(),
db: db,
peerConnector: peerConnector,
log: logger,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
}
}

Expand All @@ -56,14 +56,19 @@ func (r *Rendezvous) SetHost(h host.Host) {
}

func (r *Rendezvous) Start(ctx context.Context) error {
return r.CommonService.Start(ctx, r.start)
return r.CommonDiscoveryService.Start(ctx, r.start)
}

func (r *Rendezvous) start() error {
err := r.db.Start(r.Context())
if err != nil {
return err
if r.db != nil {
if err := r.db.Start(r.Context()); err != nil {
return err
}
}
if r.peerConnector != nil {
r.peerConnector.Subscribe(r.Context(), r.GetListeningChan())
}

r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)

r.log.Info("rendezvous protocol started")
Expand Down Expand Up @@ -98,19 +103,14 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
if len(addrInfo) != 0 {
rp.SetSuccess(cookie)

peerCh := make(chan peermanager.PeerData)
defer close(peerCh)
r.peerConnector.Subscribe(ctx, peerCh)
for _, p := range addrInfo {
peer := peermanager.PeerData{
Origin: peerstore.Rendezvous,
AddrInfo: p,
PubSubTopics: []string{namespace},
}
select {
case <-ctx.Done():
if !r.PushToChan(peer) {
return
case peerCh <- peer:
}
}
} else {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
}

func (r *Rendezvous) Stop() {
r.CommonService.Stop(func() {
r.CommonDiscoveryService.Stop(func() {
r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil
})
Expand Down
Loading

0 comments on commit a5f9ee5

Please sign in to comment.