From 5514e6a5a55308d62021de5ab3742de0ed52055a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 20 Oct 2023 15:34:50 -0700 Subject: [PATCH 1/6] fix: REST API endpoint register for version --- cmd/waku/server/rest/debug.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/waku/server/rest/debug.go b/cmd/waku/server/rest/debug.go index b0e188054..6606f5d0d 100644 --- a/cmd/waku/server/rest/debug.go +++ b/cmd/waku/server/rest/debug.go @@ -21,7 +21,7 @@ type InfoReply struct { } const routeDebugInfoV1 = "/debug/v1/info" -const routeDebugVersionV1 = "/debug/v1/info" +const routeDebugVersionV1 = "/debug/v1/version" func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService { d := &DebugService{ From 4810ccf6895068f8aa3728862626786e7109a6e6 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 20 Oct 2023 16:05:45 -0700 Subject: [PATCH 2/6] feat: add new autosharding relay REST API support --- cmd/waku/server/rest/relay.go | 106 ++++++++++++++++++++++++++++ cmd/waku/server/rest/relay_api.yaml | 99 +++++++++++++++++++++++++- 2 files changed, 204 insertions(+), 1 deletion(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index af4f5d417..03d16d85c 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "net/http" + "net/url" "strings" "sync" @@ -20,6 +21,9 @@ import ( const routeRelayV1Subscriptions = "/relay/v1/subscriptions" const routeRelayV1Messages = "/relay/v1/messages/{topic}" +const routeRelayV1AutoSubscriptions = "/relay/v1/auto/subscriptions" +const routeRelayV1AutoMessages = "/relay/v1/auto/messages" + // RelayService represents the REST service for WakuRelay type RelayService struct { node *node.WakuNode @@ -50,6 +54,14 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za m.Get(routeRelayV1Messages, s.getV1Messages) m.Post(routeRelayV1Messages, s.postV1Message) + m.Post(routeRelayV1AutoSubscriptions, s.postV1AutoSubscriptions) + m.Delete(routeRelayV1AutoSubscriptions, s.deleteV1AutoSubscriptions) + + m.Route(routeRelayV1AutoMessages, func(r chi.Router) { + r.Get("/{contentTopic}", s.getV1AutoMessages) + r.Post("/", s.postV1AutoMessage) + }) + return s } @@ -215,3 +227,97 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { writeErrOrResponse(w, err, true) } + +func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { + var cTopics []string + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&cTopics); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + + r.messagesMutex.Lock() + defer r.messagesMutex.Unlock() + + err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...)) + if err != nil { + r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } + + writeErrOrResponse(w, err, true) +} + +func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { + var cTopics []string + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&cTopics); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + + var err error + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...)) + if err != nil { + r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } + + writeErrOrResponse(w, err, true) +} + +func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { + cTopic := chi.URLParam(req, "contentTopic") + if cTopic == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + cTopic, err := url.QueryUnescape(cTopic) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("invalid contentTopic format")) + r.log.Error("writing response", zap.Error(err)) + return + } + + sub, err := r.node.Relay().GetSubscription(cTopic) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("not subscribed to topic")) + r.log.Error("writing response", zap.Error(err)) + return + } + var response []*pb.WakuMessage + select { + case msg := <-sub.Ch: + response = append(response, msg.Message()) + default: + break + } + + writeErrOrResponse(w, nil, response) +} + +func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { + + var message *pb.WakuMessage + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&message); err != nil { + r.log.Error("decoding message failure", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + var err error + if err = server.AppendRLNProof(r.node, message); err != nil { + writeErrOrResponse(w, err, nil) + return + } + + _, err = r.node.Relay().Publish(req.Context(), message) + if err != nil { + r.log.Error("publishing message", zap.Error(err)) + } + + writeErrOrResponse(w, err, true) +} diff --git a/cmd/waku/server/rest/relay_api.yaml b/cmd/waku/server/rest/relay_api.yaml index 0cf1edd06..d9e8e59cc 100644 --- a/cmd/waku/server/rest/relay_api.yaml +++ b/cmd/waku/server/rest/relay_api.yaml @@ -106,6 +106,103 @@ paths: '5XX': description: Unexpected error. + /relay/v1/auto/messages/{contentTopic}: # Note the plural in messages + get: # get_waku_v2_relay_v1_auto_messages + summary: Get the latest messages on the polled topic + description: Get a list of messages that were received on a subscribed Content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - relay + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: The user ID + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/RelayGetMessagesResponse' + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + /relay/v1/auto/messages/{contentTopic}: # Note the plural in messages + post: # post_waku_v2_relay_v1_auto_message + summary: Publish a message to be relayed + description: Publishes a message to be relayed on a Content topic. + operationId: postMessagesToTopic + tags: + - relay + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RelayPostMessagesRequest' + responses: + '200': + description: OK + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + /relay/v1/auto/subscriptions: + post: # post_waku_v2_relay_v1_auto_subscriptions + summary: Subscribe a node to an array of topics + description: Subscribe a node to an array of Content topics. + operationId: postSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + type array: + items: + $ref: '#/components/schemas/ContentTopic' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + delete: # delete_waku_v2_relay_v1_auto_subscriptions + summary: Unsubscribe a node from an array of topics + description: Unsubscribe a node from an array of Content topics. + operationId: deleteSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + type array: + items: + $ref: '#/components/schemas/ContentTopic' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + components: schemas: PubSubTopic: @@ -145,4 +242,4 @@ components: type: array items: $ref: '#/components/schemas/PubSubTopic' - \ No newline at end of file + From 3b138688942668e0ae5ec7a918d7c81b4aae456d Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 20 Oct 2023 16:15:45 -0700 Subject: [PATCH 3/6] chore: removed unnecessary duplicate check --- cmd/waku/server/rpc/relay.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index ee0679fda..8c7d10a4f 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -164,11 +164,7 @@ func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessa r.log.Error("publishing message", zap.Error(err)) return err } - if msg.ContentTopic == "" { - err := fmt.Errorf("content-topic cannot be empty") - r.log.Error("publishing message", zap.Error(err)) - return err - } + if err = server.AppendRLNProof(r.node, msg); err != nil { return err } From 37ad04cf7f6522029711ddefc4d53bc9b0cdb279 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 23 Oct 2023 11:29:07 -0700 Subject: [PATCH 4/6] chore: address review comments --- cmd/waku/server/rest/relay.go | 5 +---- cmd/waku/server/rest/relay_api.yaml | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 03d16d85c..bf0c7be9c 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -237,9 +237,6 @@ func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *htt } defer req.Body.Close() - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...)) if err != nil { r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) @@ -274,7 +271,7 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques } cTopic, err := url.QueryUnescape(cTopic) if err != nil { - w.WriteHeader(http.StatusNotFound) + w.WriteHeader(http.StatusBadRequest) _, err = w.Write([]byte("invalid contentTopic format")) r.log.Error("writing response", zap.Error(err)) return diff --git a/cmd/waku/server/rest/relay_api.yaml b/cmd/waku/server/rest/relay_api.yaml index d9e8e59cc..e42432a13 100644 --- a/cmd/waku/server/rest/relay_api.yaml +++ b/cmd/waku/server/rest/relay_api.yaml @@ -132,7 +132,7 @@ paths: '5XX': description: Unexpected error. - /relay/v1/auto/messages/{contentTopic}: # Note the plural in messages + /relay/v1/auto/messages: # Note the plural in messages post: # post_waku_v2_relay_v1_auto_message summary: Publish a message to be relayed description: Publishes a message to be relayed on a Content topic. From 0b04ab1693905e4ba2a9bb1cce07584680eaec27 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 23 Oct 2023 12:20:13 -0700 Subject: [PATCH 5/6] chore: fix responses in relay rest api --- cmd/waku/server/rest/relay.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index bf0c7be9c..b79b9006a 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -260,7 +260,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) } - writeErrOrResponse(w, err, true) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte(err.Error())) + r.log.Error("writing response", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } + } func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { @@ -316,5 +323,12 @@ func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Reques r.log.Error("publishing message", zap.Error(err)) } - writeErrOrResponse(w, err, true) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte(err.Error())) + r.log.Error("writing response", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } + } From e6ec939bcc37fdefb35cd40a98dd98066bca10f4 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 23 Oct 2023 13:26:07 -0700 Subject: [PATCH 6/6] address review comment --- cmd/waku/server/rest/relay.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index b79b9006a..b788ed99f 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -274,6 +274,8 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques cTopic := chi.URLParam(req, "contentTopic") if cTopic == "" { w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte("contentTopic is required")) + r.log.Error("writing response", zap.Error(err)) return } cTopic, err := url.QueryUnescape(cTopic)