From fab51beadf681b81dcc8bb812289f9b2ca63ac1e Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 8 Nov 2023 18:46:24 +0530 Subject: [PATCH] fix : issues with get messages API (#878) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix issues with get messages API --------- Co-authored-by: richΛrd --- cmd/waku/server/rest/relay.go | 13 +++++++++++-- cmd/waku/server/rpc/relay.go | 20 +++++++++++++++----- waku/v2/protocol/relay/waku_relay.go | 6 ++++++ 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index d80e00733..0da7cc0f6 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } else { topicToSubscribe = topic } - _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) @@ -126,7 +126,16 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { } var response []*pb.WakuMessage select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("consume channel is closed for subscription")) + if err != nil { + r.log.Error("writing response", zap.Error(err)) + } + return + } response = append(response, msg.Message()) default: break diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 2e24c7c15..5c7245b41 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,6 +1,7 @@ package rpc import ( + "errors" "fmt" "net/http" @@ -12,6 +13,8 @@ import ( "go.uber.org/zap" ) +var errChannelClosed = errors.New("consume channel is closed for subscription") + // RelayService represents the JSON RPC service for WakuRelay type RelayService struct { node *node.WakuNode @@ -93,7 +96,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, // Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...)) + _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err)) return err @@ -150,7 +153,11 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } rpcMsg, err := ProtoToRPC(msg.Message()) if err != nil { r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err)) @@ -165,14 +172,13 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep // PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - ctx := req.Context() for _, topic := range args.Topics { var err error if topic == "" { topic = relay.DefaultWakuTopic } - _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err @@ -207,7 +213,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } m, err := ProtoToRPC(msg.Message()) if err == nil { *reply = append(*reply, m) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 2bbef61aa..887260287 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -292,6 +292,9 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } } @@ -308,6 +311,9 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } }