Skip to content

Commit

Permalink
test: getMessage filter v2
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Nov 1, 2023
1 parent d569363 commit 4b6b08f
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 77 deletions.
4 changes: 1 addition & 3 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,7 @@ func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *ht
}
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
s.log.Error("bad content topic", zap.Error(err))
writeResponse(w, "bad content topic", http.StatusBadRequest)
s.writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest)
return
}
s.getMessages(w, req, pubsubTopic, contentTopic)
Expand Down Expand Up @@ -362,7 +361,6 @@ func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pu
s.writeGetMessageErr(w, err, http.StatusNotFound)
return
}

writeResponse(w, msgs, http.StatusOK)
}

Expand Down
26 changes: 13 additions & 13 deletions cmd/waku/server/rest/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)

type filterCache struct {
Expand All @@ -25,12 +26,17 @@ func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
c.mu.Lock()
defer c.mu.Unlock()

pubsubTopic := contentFilter.PubsubTopic
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage)
}
for topic := range contentFilter.ContentTopics {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{}
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage)
}
for _, topic := range contentTopics {
utils.Logger().Info(fmt.Sprintf("pb %s ct %s ", pubsubTopic, topic))
if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{}
}
}
}
}

Expand Down Expand Up @@ -67,12 +73,6 @@ func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*p
return nil, fmt.Errorf("Not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = nil
c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{}
return msgs, nil
}
func (c *filterCache) clear() {
c.mu.Lock()
defer c.mu.Unlock()

c.data = map[string]map[string][]*pb.WakuMessage{}
}
149 changes: 88 additions & 61 deletions cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peerstore"
Expand All @@ -34,26 +35,24 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
}

// node2 connects to node1
func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) {
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter

node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)

if pubSubTopic != "" {
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
require.NoError(t, err)
}
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), pubSubTopics)
require.NoError(t, err)

return node1, node2
}

// test 400, 404 status code for ping rest endpoint
// both requests are not successful
func TestFilterPingFailure(t *testing.T) {
node1, node2 := twoFilterConnectedNodes(t, "")
node1, node2 := twoFilterConnectedNodes(t)
defer func() {
node1.Stop()
node2.Stop()
Expand Down Expand Up @@ -248,6 +247,12 @@ func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionRespo
require.NoError(t, err)
return resp
}
func getMessageResponse(t *testing.T, body *bytes.Buffer) []*pb.WakuMessage {
resp := []*pb.WakuMessage{}
err := json.Unmarshal(body.Bytes(), &resp)
require.NoError(t, err)
return resp
}
func toString(t *testing.T, data interface{}) string {
bytes, err := json.Marshal(data)
require.NoError(t, err)
Expand All @@ -259,7 +264,9 @@ func TestFilterGetMessages(t *testing.T) {
contentTopic := "/waku/2/app/1"

// get nodes add connect them
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
require.NoError(t, err)
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
Expand All @@ -271,6 +278,25 @@ func TestFilterGetMessages(t *testing.T) {
go service.Start(context.Background())
defer service.Stop()

{ // create subscription so that messages are cached
for _, pubsubTopic := range []string{"", pubsubTopic} {
requestId := protocol.GenerateRequestID()
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
PubsubTopic: pubsubTopic,
ContentFilters: []string{contentTopic},
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
}

// submit messages
messageByContentTopic := []*protocol.Envelope{
genMessage("", contentTopic),
Expand All @@ -279,69 +305,70 @@ func TestFilterGetMessages(t *testing.T) {
}
messageByPubsubTopic := []*protocol.Envelope{
genMessage(pubsubTopic, contentTopic),
genMessage(pubsubTopic, contentTopic),
}
for _, envelope := range append(messageByContentTopic, messageByPubsubTopic...) {
node2.Broadcaster().Submit(envelope)
}
time.Sleep(1 * time.Second)

// with malformed contentTopic
utils.Logger().Info(url.QueryEscape("/waku/2/wrong/topic"))
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "bad content topic", rr.Body.String())
{ // with malformed contentTopic
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "bad content topic", rr.Body.String())
}

// with check if the cache is working properly
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
req.Body.Close()
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
router.ServeHTTP(rr, req)
checkJSON(t, toMessage(messageByContentTopic[1:]), getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
{ // with check if the cache is working properly
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByContentTopic[1:]), getMessageResponse(t, rr.Body))
}

// check if pubsubTopic is present in the url
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
req.Body.Close()
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "missing pubsubTopic", rr.Body.String())
{ // check if pubsubTopic is present in the url
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "missing pubsubTopic", rr.Body.String())
}

// check messages by pubsub/contentTopic pair
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
req.Body.Close()
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByPubsubTopic), getFilterResponse(t, rr.Body))
{ // check messages by pubsub/contentTopic pair
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByPubsubTopic), getMessageResponse(t, rr.Body))
}

// check if pubsubTopic/contentTOpic is subscribed or not.
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", "/waku/2/test2/proto", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
req.Body.Close()
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t, "missing pubsubTopic", rr.Body.String())
{ // check if pubsubTopic/contentTOpic is subscribed or not.
rr := httptest.NewRecorder()
notSubscibredPubsubTopic := "/waku/2/test2/proto"
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
}

func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage {
Expand Down

0 comments on commit 4b6b08f

Please sign in to comment.