diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index d80e00733..0da7cc0f6 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } else { topicToSubscribe = topic } - _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) @@ -126,7 +126,16 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { } var response []*pb.WakuMessage select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("consume channel is closed for subscription")) + if err != nil { + r.log.Error("writing response", zap.Error(err)) + } + return + } response = append(response, msg.Message()) default: break diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 2e24c7c15..5c7245b41 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,6 +1,7 @@ package rpc import ( + "errors" "fmt" "net/http" @@ -12,6 +13,8 @@ import ( "go.uber.org/zap" ) +var errChannelClosed = errors.New("consume channel is closed for subscription") + // RelayService represents the JSON RPC service for WakuRelay type RelayService struct { node *node.WakuNode @@ -93,7 +96,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, // Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...)) + _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err)) return err @@ -150,7 +153,11 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } rpcMsg, err := ProtoToRPC(msg.Message()) if err != nil { r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err)) @@ -165,14 +172,13 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep // PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - ctx := req.Context() for _, topic := range args.Topics { var err error if topic == "" { topic = relay.DefaultWakuTopic } - _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err @@ -207,7 +213,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } m, err := ProtoToRPC(msg.Message()) if err == nil { *reply = append(*reply, m) diff --git a/examples/basic2/main.go b/examples/basic2/main.go index c2534b556..5907de368 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -25,7 +25,7 @@ import ( var log = utils.Logger().Named("basic2") func main() { - var cTopic, err = protocol.NewContentTopic("basic2", 1, "test", "proto") + var cTopic, err = protocol.NewContentTopic("basic2", "1", "test", "proto") if err != nil { fmt.Println("Invalid contentTopic") return diff --git a/examples/c-bindings/main.c b/examples/c-bindings/main.c index 71c2af79b..de436a84c 100644 --- a/examples/c-bindings/main.c +++ b/examples/c-bindings/main.c @@ -18,8 +18,10 @@ char *bobPubKey = "0x045eef61a98ba1cf44a2736fac91183ea2bd86e67de20fe4bff467a71249a8a0c05f795d" "d7f28ced7c15eaa69c89d4212cc4f526ca5e9a62e88008f506d850cccd"; -void on_error(int ret, const char *result, void *user_data) { - if (ret == 0) { +void on_error(int ret, const char *result, void *user_data) +{ + if (ret == 0) + { return; } @@ -27,8 +29,10 @@ void on_error(int ret, const char *result, void *user_data) { exit(1); } -void on_response(int ret, const char *result, void *user_data) { - if (ret != 0) { +void on_response(int ret, const char *result, void *user_data) +{ + if (ret != 0) + { printf("function execution failed. Returned code: %d\n", ret); exit(1); } @@ -46,7 +50,8 @@ void on_response(int ret, const char *result, void *user_data) { strcpy(*data_ref, result); } -void callBack(int ret, const char *signal, void *user_data) { +void callBack(int ret, const char *signal, void *user_data) +{ // This callback will be executed each time a new message is received // Example signal: @@ -65,7 +70,8 @@ void callBack(int ret, const char *signal, void *user_data) { } }*/ - if (ret != 0) { + if (ret != 0) + { printf("function execution failed. Returned code: %d\n", ret); exit(1); } @@ -73,13 +79,15 @@ void callBack(int ret, const char *signal, void *user_data) { const nx_json *json = nx_json_parse((char *)signal, 0); const char *type = nx_json_get(json, "type")->text_value; - if (strcmp(type, "message") == 0) { + if (strcmp(type, "message") == 0) + { const nx_json *wakuMsgJson = nx_json_get(nx_json_get(json, "event"), "wakuMessage"); const char *contentTopic = nx_json_get(wakuMsgJson, "contentTopic")->text_value; - if (strcmp(contentTopic, "/example/1/default/rfc26") == 0) { + if (strcmp(contentTopic, "/example/1/default/rfc26") == 0) + { char *msg = utils_extract_wakumessage_from_signal(wakuMsgJson); // Decode a message using asymmetric encryption @@ -109,7 +117,8 @@ void callBack(int ret, const char *signal, void *user_data) { nx_json_free(json); } -int main(int argc, char *argv[]) { +int main(int argc, char *argv[]) +{ // Set callback to be executed each time a message is received waku_set_event_callback(callBack); @@ -134,7 +143,7 @@ int main(int argc, char *argv[]) { // Build a content topic char *contentTopic = NULL; - waku_content_topic("example", 1, "default", "rfc26", on_response, + waku_content_topic("example", "1", "default", "rfc26", on_response, (void *)&contentTopic); printf("Content Topic: %s\n", contentTopic); @@ -166,14 +175,15 @@ int main(int argc, char *argv[]) { // waku_store_query(query, NULL, 0, on_response, (void*)&query_result); // printf("%s\n", query_result); char contentFilter[1000]; - sprintf(contentFilter, - "{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}", - defaultPubsubTopic, contentTopic); + sprintf(contentFilter, + "{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}", + defaultPubsubTopic, contentTopic); waku_relay_subscribe(contentFilter, on_error, NULL); int i = 0; int version = 1; - while (i < 5) { + while (i < 5) + { i++; char wakuMsg[1000]; diff --git a/examples/chat2/flags.go b/examples/chat2/flags.go index 514ae64dc..a15660e08 100644 --- a/examples/chat2/flags.go +++ b/examples/chat2/flags.go @@ -34,7 +34,7 @@ func getFlags() []cli.Flag { // Defaults options.Fleet = fleetProd - testCT, err := protocol.NewContentTopic("toy-chat", 3, "mingde", "proto") + testCT, err := protocol.NewContentTopic("toy-chat", "3", "mingde", "proto") if err != nil { panic("invalid contentTopic") } diff --git a/examples/rln/main.go b/examples/rln/main.go index 2c0e0952a..2d82b358d 100644 --- a/examples/rln/main.go +++ b/examples/rln/main.go @@ -32,7 +32,7 @@ var contractAddress = "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4" var keystorePath = "./rlnKeystore.json" var keystorePassword = "password" var membershipIndex = uint(0) -var contentTopic, _ = protocol.NewContentTopic("rln", 1, "test", "proto") +var contentTopic, _ = protocol.NewContentTopic("rln", "1", "test", "proto") var pubsubTopic = protocol.DefaultPubsubTopic{} // ============================================================================ diff --git a/library/c/api.go b/library/c/api.go index 0ae2c0941..7e0019ba5 100644 --- a/library/c/api.go +++ b/library/c/api.go @@ -194,12 +194,11 @@ func waku_peer_cnt(cb C.WakuCallBack, userData unsafe.Pointer) C.int { // Create a content topic string according to RFC 23 // //export waku_content_topic -func waku_content_topic(applicationName *C.char, applicationVersion C.uint, contentTopicName *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), uint32(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding)) +func waku_content_topic(applicationName *C.char, applicationVersion *C.char, contentTopicName *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), C.GoString(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding)) return onSuccesfulResponse(contentTopic.String(), cb, userData) } - // Get the default pubsub topic used in waku2: /waku/2/default-waku/proto // //export waku_default_pubsub_topic diff --git a/library/mobile/api.go b/library/mobile/api.go index d7489e7ff..8a8d3b66b 100644 --- a/library/mobile/api.go +++ b/library/mobile/api.go @@ -73,8 +73,8 @@ func PeerCnt() string { } // ContentTopic creates a content topic string according to RFC 23 -func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string { - contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding) +func ContentTopic(applicationName string, applicationVersion string, contentTopicName string, encoding string) string { + contentTopic, _ := protocol.NewContentTopic(applicationName, applicationVersion, contentTopicName, encoding) return contentTopic.String() } diff --git a/library/node.go b/library/node.go index cebc37975..2596a2a1f 100644 --- a/library/node.go +++ b/library/node.go @@ -335,8 +335,8 @@ func PeerCnt() (int, error) { } // ContentTopic creates a content topic string according to RFC 23 -func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string { - contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding) +func ContentTopic(applicationName string, applicationVersion string, contentTopicName string, encoding string) string { + contentTopic, _ := protocol.NewContentTopic(applicationName, applicationVersion, contentTopicName, encoding) return contentTopic.String() } diff --git a/waku/v2/protocol/content_topic.go b/waku/v2/protocol/content_topic.go index 1f931347e..bfe7df329 100644 --- a/waku/v2/protocol/content_topic.go +++ b/waku/v2/protocol/content_topic.go @@ -15,7 +15,7 @@ var ErrInvalidGeneration = errors.New("generation should be a number") type ContentTopic struct { ContentTopicParams ApplicationName string - ApplicationVersion uint32 + ApplicationVersion string ContentTopicName string Encoding string } @@ -35,12 +35,13 @@ type ContentTopicOption func(*ContentTopicParams) // String formats a content topic in string format as per RFC 23. func (ct ContentTopic) String() string { - return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding) + return fmt.Sprintf("/%s/%s/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding) } // NewContentTopic creates a new content topic based on params specified. // Returns ErrInvalidGeneration if an unsupported generation is specified. -func NewContentTopic(applicationName string, applicationVersion uint32, +// Note that this is recommended to be used for autosharding where contentTopic format is enforced as per https://rfc.vac.dev/spec/51/#content-topics-format-for-autosharding +func NewContentTopic(applicationName string, applicationVersion string, contentTopicName string, encoding string, opts ...ContentTopicOption) (ContentTopic, error) { params := new(ContentTopicParams) @@ -83,18 +84,19 @@ func (ct ContentTopic) Equal(ct2 ContentTopic) bool { } // StringToContentTopic can be used to create a ContentTopic object from a string +// Note that this has to be used only when following the rfc format of contentTopic, which is currently validated only for Autosharding. +// For static and named-sharding, contentTopic can be of any format and hence it is not recommended to use this function. +// This can be updated if required to handle such a case. func StringToContentTopic(s string) (ContentTopic, error) { p := strings.Split(s, "/") switch len(p) { case 5: - vNum, err := strconv.ParseUint(p[2], 10, 32) - if err != nil { + if len(p[1]) == 0 || len(p[2]) == 0 || len(p[3]) == 0 || len(p[4]) == 0 { return ContentTopic{}, ErrInvalidFormat } - return ContentTopic{ ApplicationName: p[1], - ApplicationVersion: uint32(vNum), + ApplicationVersion: p[2], ContentTopicName: p[3], Encoding: p[4], }, nil @@ -106,15 +108,13 @@ func StringToContentTopic(s string) (ContentTopic, error) { if err != nil || generation > 0 { return ContentTopic{}, ErrInvalidGeneration } - vNum, err := strconv.ParseUint(p[3], 10, 32) - if err != nil { + if len(p[2]) == 0 || len(p[3]) == 0 || len(p[4]) == 0 || len(p[5]) == 0 { return ContentTopic{}, ErrInvalidFormat } - return ContentTopic{ ContentTopicParams: ContentTopicParams{Generation: generation}, ApplicationName: p[2], - ApplicationVersion: uint32(vNum), + ApplicationVersion: p[3], ContentTopicName: p[4], Encoding: p[5], }, nil diff --git a/waku/v2/protocol/envelope_test.go b/waku/v2/protocol/envelope_test.go index 5a07d9ce8..f0feaa446 100644 --- a/waku/v2/protocol/envelope_test.go +++ b/waku/v2/protocol/envelope_test.go @@ -1,7 +1,6 @@ package protocol import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -22,7 +21,6 @@ func TestEnvelope(t *testing.T) { topic := e.PubsubTopic() require.Equal(t, "test", topic) hash := e.Hash() - fmt.Println(hash) require.Equal( t, diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 2bbef61aa..887260287 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -292,6 +292,9 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } } @@ -308,6 +311,9 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index c0aaa3f4c..158432f41 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -3,7 +3,6 @@ package relay import ( "context" "crypto/rand" - "fmt" "sync" "testing" "time" @@ -53,7 +52,6 @@ func TestWakuRelay(t *testing.T) { bytesToSend := []byte{1} go func() { defer cancel() - env := <-subs[0].Ch t.Log("received msg", logging.HexString("hash", env.Hash())) }() @@ -114,7 +112,7 @@ func TestGossipsubScore(t *testing.T) { for { _, err := sub.Next(context.Background()) if err != nil { - fmt.Println(err) + t.Log(err) } } }() diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 5a426182e..3f4779d25 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -225,7 +225,7 @@ func FromBitVector(buf []byte) (RelayShards, error) { // This is based on Autosharding algorithm defined in RFC 51 func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic { bytes := []byte(topic.ApplicationName) - bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...) + bytes = append(bytes, []byte(topic.ApplicationVersion)...) hash := hash.SHA256(bytes) //We only use the last 64 bits of the hash as having more shards is unlikely. diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index e2b02db68..c1708d4a5 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -9,28 +9,28 @@ import ( ) func TestContentTopicAndSharding(t *testing.T) { - ct, err := NewContentTopic("waku", 2, "test", "proto") + ct, err := NewContentTopic("waku", "2", "test", "proto") require.NoError(t, err) require.Equal(t, ct.String(), "/waku/2/test/proto") _, err = StringToContentTopic("/waku/-1/a/b") - require.Error(t, ErrInvalidFormat, err) + require.NoError(t, err) _, err = StringToContentTopic("waku/1/a/b") - require.Error(t, ErrInvalidFormat, err) + require.Error(t, err, ErrInvalidFormat) _, err = StringToContentTopic("////") - require.Error(t, ErrInvalidFormat, err) + require.Error(t, err, ErrInvalidFormat) _, err = StringToContentTopic("/waku/1/a") - require.Error(t, ErrInvalidFormat, err) + require.Error(t, err, ErrInvalidFormat) ct2, err := StringToContentTopic("/waku/2/test/proto") require.NoError(t, err) require.Equal(t, ct.String(), ct2.String()) require.True(t, ct.Equal(ct2)) - ct3, err := NewContentTopic("waku", 2, "test2", "proto") + ct3, err := NewContentTopic("waku", "2a", "test2", "proto") require.NoError(t, err) require.False(t, ct.Equal(ct3)) @@ -45,12 +45,12 @@ func TestContentTopicAndSharding(t *testing.T) { require.Equal(t, NewStaticShardingPubsubTopic(ClusterIndex, 3), nsPubSubT1) _, err = StringToContentTopic("/abc/toychat/2/huilong/proto") - require.Error(t, ErrInvalidGeneration, err) + require.Error(t, err, ErrInvalidGeneration) _, err = StringToContentTopic("/1/toychat/2/huilong/proto") - require.Error(t, ErrInvalidGeneration, err) + require.Error(t, err, ErrInvalidGeneration) - ct5, err := NewContentTopic("waku", 2, "test2", "proto", WithGeneration(0)) + ct5, err := NewContentTopic("waku", "2b", "test2", "proto", WithGeneration(0)) require.NoError(t, err) require.Equal(t, ct5.Generation, 0) } @@ -65,7 +65,7 @@ func randomContentTopic() (ContentTopic, error) { randomChar := 'a' + rune(rand.Intn(26)) app = app + string(randomChar) } - version := uint32(1) + version := "1" var name = ""