diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 6ded3fa27..967dfb944 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -483,3 +483,17 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, } return } + +func (pm *PeerManager) HandlePeerSelection(selectionType utils.PeerSelection, proto protocol.ID, + pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { + + switch selectionType { + case utils.Automatic: + return pm.SelectPeer(proto, pubSubTopic, specificPeers...) + case utils.LowestRTT: + //TODO: Move this to peer-manager + return utils.SelectPeerWithLowestRTT(context.Background(), pm.host, proto, specificPeers, pm.logger) + default: + return "", errors.New("unknown peer selection type specified") + } +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8570ce818..1eef2f92c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -150,11 +151,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, errors.New("lightpush params are mandatory") } - if params.selectedPeer == "" { - wakuLP.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - if len(params.requestID) == 0 { return nil, ErrInvalidID } @@ -210,16 +206,12 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - if message == nil { - return nil, errors.New("message can't be null") - } +func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) { params := new(lightPushParameters) params.host = wakuLP.h params.log = wakuLP.log params.pm = wakuLP.pm + var err error optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { @@ -227,12 +219,38 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa } if params.pubsubTopic == "" { - var err error params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic) if err != nil { return nil, err } } + + //This condition is hacky and will be fixed later, + // once RTT based monitoring is also available in peer-manager + if params.pm == nil || params.peerSelectionType == utils.LowestRTT { + params.selectedPeer, err = utils.HandlePeerSelection(params.peerSelectionType, params.host, LightPushID_v20beta1, params.preferredPeers, params.log) + } else { + params.selectedPeer, err = wakuLP.pm.HandlePeerSelection(params.peerSelectionType, LightPushID_v20beta1, params.pubsubTopic, params.preferredPeers...) + } + if err != nil { + params.log.Error("selecting peer", zap.Error(err)) + wakuLP.metrics.RecordError(peerNotFoundFailure) + return nil, ErrNoPeersAvailable + } + return params, nil +} + +// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol +// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { + if message == nil { + return nil, errors.New("message can't be null") + } + + params, err := wakuLP.handleOpts(ctx, message, opts...) + if err != nil { + return nil, err + } req := new(pb.PushRequest) req.Message = message req.PubsubTopic = params.pubsubTopic diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index f0d496d62..8d4263892 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -12,12 +12,14 @@ import ( ) type lightPushParameters struct { - host host.Host - selectedPeer peer.ID - requestID []byte - pm *peermanager.PeerManager - log *zap.Logger - pubsubTopic string + host host.Host + selectedPeer peer.ID + requestID []byte + pm *peermanager.PeerManager + log *zap.Logger + pubsubTopic string + peerSelectionType utils.PeerSelection + preferredPeers peer.IDSlice } // Option is the type of options accepted when performing LightPush protocol requests @@ -36,18 +38,8 @@ func WithPeer(p peer.ID) Option { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.Automatic + params.preferredPeers = fromThesePeers } } @@ -63,12 +55,7 @@ func WithPubSubTopic(pubsubTopic string) Option { // from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = utils.LowestRTT } } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index e2173b307..5dae987e4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -15,6 +15,14 @@ import ( "go.uber.org/zap" ) +type PeerSelection int + +const ( + Unknown PeerSelection = iota + Automatic + LowestRTT +) + // ErrNoPeersAvailable is emitted when no suitable peers are found for // some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") @@ -161,3 +169,15 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID pro return "", ErrNoPeersAvailable } } + +func HandlePeerSelection(selectionType PeerSelection, host host.Host, proto protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { + switch selectionType { + case Automatic: + return SelectPeer(host, proto, specificPeers, log) + case LowestRTT: + //TODO: Move this to peer-manager + return SelectPeerWithLowestRTT(context.Background(), host, proto, specificPeers, log) + default: + return "", errors.New("unknown peer selection type specified") + } +}