Skip to content

Commit

Permalink
fix: stream close/reset (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 24, 2023
1 parent a3c3aab commit fa51d10
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 117 deletions.
48 changes: 34 additions & 14 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,48 +106,60 @@ func (wf *WakuFilterLightNode) Stop() {
})
}

func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
if !wf.subscriptions.IsSubscribedTo(s.Conn().RemotePeer()) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", s.Conn().RemotePeer()))
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
logger := wf.log.With(logging.HostID("peer", peerID))
if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}

reader := pbio.NewDelimitedReader(s, math.MaxInt32)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

messagePush := &pb.MessagePushV2{}
err := reader.ReadMsg(messagePush)
if err != nil {
logger.Error("reading message push", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}

stream.Close()

pubSubTopic := ""
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
pubSubTopic, err = protocol.GetPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
if err != nil {
logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}
} else {
pubSubTopic = *messagePush.PubsubTopic
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}

wf.metrics.RecordMessage()

wf.notify(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage)
wf.notify(peerID, pubSubTopic, messagePush.WakuMessage)

logger.Info("received message push")
}
Expand All @@ -166,15 +178,14 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,

func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
}
defer conn.Close()

writer := pbio.NewDelimitedWriter(conn)
reader := pbio.NewDelimitedReader(conn, math.MaxInt32)
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
Expand All @@ -188,6 +199,9 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return err
}

Expand All @@ -196,8 +210,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return err
}

stream.Close()

if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
Expand Down
87 changes: 49 additions & 38 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {
return nil
}

func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))

reader := pbio.NewDelimitedReader(s, math.MaxInt32)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

subscribeRequest := &pb.FilterSubscribeRequest{}
err := reader.ReadMsg(subscribeRequest)
if err != nil {
wf.metrics.RecordError(decodeRPCFailure)
logger.Error("reading request", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}

Expand All @@ -104,22 +106,24 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre

switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(ctx, s, subscribeRequest)
wf.subscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(ctx, s, subscribeRequest)
wf.ping(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(ctx, s, subscribeRequest)
wf.unsubscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(ctx, s, subscribeRequest)
wf.unsubscribeAll(ctx, stream, subscribeRequest)
}

stream.Close()

wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))

logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String()))
}
}

func (wf *WakuFilterFullNode) reply(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest, statusCode int, description ...string) {
func (wf *WakuFilterFullNode) reply(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest, statusCode int, description ...string) {
response := &pb.FilterSubscribeResponse{
RequestId: request.RequestId,
StatusCode: uint32(statusCode),
Expand All @@ -131,45 +135,48 @@ func (wf *WakuFilterFullNode) reply(ctx context.Context, s network.Stream, reque
response.StatusDesc = http.StatusText(statusCode)
}

writer := pbio.NewDelimitedWriter(s)
writer := pbio.NewDelimitedWriter(stream)
err := writer.WriteMsg(response)
if err != nil {
wf.metrics.RecordError(writeResponseFailure)
wf.log.Error("sending response", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
}
}

func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(stream.Conn().RemotePeer())

if exists {
wf.reply(ctx, s, request, http.StatusOK)
wf.reply(ctx, stream, request, http.StatusOK)
} else {
wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription)
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)
}
}

func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}

if len(request.ContentTopics) == 0 {
wf.reply(ctx, s, request, http.StatusBadRequest, "at least one contenttopic should be specified")
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}

if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, s, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}

if wf.subscriptions.Count() >= wf.maxSubscriptions {
wf.reply(ctx, s, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
wf.reply(ctx, stream, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
}

peerID := s.Conn().RemotePeer()
peerID := stream.Conn().RemotePeer()

if totalSubs, exists := wf.subscriptions.Get(peerID); exists {
ctTotal := 0
Expand All @@ -178,48 +185,48 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, r
}

if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription {
wf.reply(ctx, s, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
wf.reply(ctx, stream, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
return
}
}

wf.subscriptions.Set(peerID, *request.PubsubTopic, request.ContentTopics)

wf.metrics.RecordSubscriptions(wf.subscriptions.Count())
wf.reply(ctx, s, request, http.StatusOK)
wf.reply(ctx, stream, request, http.StatusOK)
}

func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}

if len(request.ContentTopics) == 0 {
wf.reply(ctx, s, request, http.StatusBadRequest, "at least one contenttopic should be specified")
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}

if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, s, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}

err := wf.subscriptions.Delete(s.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics)
err := wf.subscriptions.Delete(stream.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics)
if err != nil {
wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription)
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)
} else {
wf.metrics.RecordSubscriptions(wf.subscriptions.Count())
wf.reply(ctx, s, request, http.StatusOK)
wf.reply(ctx, stream, request, http.StatusOK)
}
}

func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(stream.Conn().RemotePeer())
if err != nil {
wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription)
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)
} else {
wf.metrics.RecordSubscriptions(wf.subscriptions.Count())
wf.reply(ctx, s, request, http.StatusOK)
wf.reply(ctx, stream, request, http.StatusOK)
}
}

Expand Down Expand Up @@ -279,7 +286,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
ctx, cancel := context.WithTimeout(ctx, MessagePushTimeout)
defer cancel()

conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
stream, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
if errors.Is(context.DeadlineExceeded, err) {
Expand All @@ -291,8 +298,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
return err
}

defer conn.Close()
writer := pbio.NewDelimitedWriter(conn)
writer := pbio.NewDelimitedWriter(stream)
err = writer.WriteMsg(messagePush)
if err != nil {
if errors.Is(context.DeadlineExceeded, err) {
Expand All @@ -302,12 +308,17 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
}
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscriptions.FlagAsFailure(peerID)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return nil
}

stream.Close()

wf.subscriptions.FlagAsSuccess(peerID)

logger.Info("message pushed succesfully") // TODO: remove or change to debug once dogfooding of filter is complete
logger.Debug("message pushed succesfully")

return nil
}
Expand Down
Loading

0 comments on commit fa51d10

Please sign in to comment.