Skip to content

Commit

Permalink
Update peer selection options for lightPush
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Sep 29, 2023
1 parent 47c961d commit 4a18aa3
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 36 deletions.
14 changes: 14 additions & 0 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,17 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
}
return
}

func (pm *PeerManager) HandlePeerSelection(selectionType utils.PeerSelection, proto protocol.ID,
pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {

switch selectionType {
case utils.Automatic:
return pm.SelectPeer(proto, pubSubTopic, specificPeers...)
case utils.LowestRTT:
//TODO: Move this to peer-manager
return utils.SelectPeerWithLowestRTT(context.Background(), pm.host, proto, specificPeers, pm.logger)
default:
return "", errors.New("unknown peer selection type specified")
}
}
42 changes: 30 additions & 12 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -150,11 +151,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
return nil, errors.New("lightpush params are mandatory")
}

if params.selectedPeer == "" {
wakuLP.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable
}

if len(params.requestID) == 0 {
return nil, ErrInvalidID
}
Expand Down Expand Up @@ -210,29 +206,51 @@ func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}

// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) {
params := new(lightPushParameters)
params.host = wakuLP.h
params.log = wakuLP.log
params.pm = wakuLP.pm
var err error

optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {
opt(params)
}

if params.pubsubTopic == "" {
var err error
params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
}

//This condition is hacky and will be fixed later,
// once RTT based monitoring is also available in peer-manager
if params.pm == nil || params.peerSelectionType == utils.LowestRTT {
params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, LightPushID_v20beta1, params.preferredPeers, params.log)
} else {
params.selectedPeer, err = wakuLP.pm.HandlePeerSelection(params.peerSelectionType, LightPushID_v20beta1, params.pubsubTopic, params.preferredPeers...)
}
if err != nil {
params.log.Error("selecting peer", zap.Error(err))
wakuLP.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable
}
return params, nil
}

// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}

params, err := wakuLP.handleOpts(ctx, message, opts...)
if err != nil {
return nil, err
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = params.pubsubTopic
Expand Down
35 changes: 11 additions & 24 deletions waku/v2/protocol/lightpush/waku_lightpush_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
)

type lightPushParameters struct {
host host.Host
selectedPeer peer.ID
requestID []byte
pm *peermanager.PeerManager
log *zap.Logger
pubsubTopic string
host host.Host
selectedPeer peer.ID
requestID []byte
pm *peermanager.PeerManager
log *zap.Logger
pubsubTopic string
peerSelectionType utils.PeerSelection
preferredPeers peer.IDSlice
}

// Option is the type of options accepted when performing LightPush protocol requests
Expand All @@ -36,18 +38,8 @@ func WithPeer(p peer.ID) Option {
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
return func(params *lightPushParameters) {
var p peer.ID
var err error
if params.pm == nil {
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...)
}
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
params.peerSelectionType = utils.Automatic
params.preferredPeers = fromThesePeers
}
}

Expand All @@ -63,12 +55,7 @@ func WithPubSubTopic(pubsubTopic string) Option {
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option {
return func(params *lightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log)
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
params.peerSelectionType = utils.LowestRTT
}
}

Expand Down
20 changes: 20 additions & 0 deletions waku/v2/utils/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ import (
"go.uber.org/zap"
)

type PeerSelection int

const (
Unknown PeerSelection = iota
Automatic
LowestRTT
)

// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")
Expand Down Expand Up @@ -161,3 +169,15 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID pro
return "", ErrNoPeersAvailable
}
}

func HandlePeerSelection(selectionType PeerSelection, host host.Host, proto protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
switch selectionType {
case Automatic:
return SelectPeer(host, proto, specificPeers, log)
case LowestRTT:
//TODO: Move this to peer-manager
return SelectPeerWithLowestRTT(context.Background(), host, proto, specificPeers, log)
default:
return "", errors.New("unknown peer selection type specified")
}
}

0 comments on commit 4a18aa3

Please sign in to comment.