Skip to content

Commit

Permalink
Merge branch 'master' into test-utils-string-generators
Browse files Browse the repository at this point in the history
  • Loading branch information
romanzac committed Nov 8, 2023
2 parents 4a3e49e + fab51be commit 22d7352
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 58 deletions.
13 changes: 11 additions & 2 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
} else {
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
Expand Down Expand Up @@ -126,7 +126,16 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
if err != nil {
r.log.Error("writing response", zap.Error(err))
}
return
}
response = append(response, msg.Message())
default:
break
Expand Down
20 changes: 15 additions & 5 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpc

import (
"errors"
"fmt"
"net/http"

Expand All @@ -12,6 +13,8 @@ import (
"go.uber.org/zap"
)

var errChannelClosed = errors.New("consume channel is closed for subscription")

// RelayService represents the JSON RPC service for WakuRelay
type RelayService struct {
node *node.WakuNode
Expand Down Expand Up @@ -93,7 +96,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {

_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...))
_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity)))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
Expand Down Expand Up @@ -150,7 +153,11 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
return err
}
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
return errChannelClosed
}
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
Expand All @@ -165,14 +172,13 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

for _, topic := range args.Topics {
var err error
if topic == "" {
topic = relay.DefaultWakuTopic
}
_, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity)))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
Expand Down Expand Up @@ -207,7 +213,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
return err
}
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
return errChannelClosed
}
m, err := ProtoToRPC(msg.Message())
if err == nil {
*reply = append(*reply, m)
Expand Down
2 changes: 1 addition & 1 deletion examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
var log = utils.Logger().Named("basic2")

func main() {
var cTopic, err = protocol.NewContentTopic("basic2", 1, "test", "proto")
var cTopic, err = protocol.NewContentTopic("basic2", "1", "test", "proto")
if err != nil {
fmt.Println("Invalid contentTopic")
return
Expand Down
38 changes: 24 additions & 14 deletions examples/c-bindings/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@ char *bobPubKey =
"0x045eef61a98ba1cf44a2736fac91183ea2bd86e67de20fe4bff467a71249a8a0c05f795d"
"d7f28ced7c15eaa69c89d4212cc4f526ca5e9a62e88008f506d850cccd";

void on_error(int ret, const char *result, void *user_data) {
if (ret == 0) {
void on_error(int ret, const char *result, void *user_data)
{
if (ret == 0)
{
return;
}

printf("function execution failed. Returned code: %d\n", ret);
exit(1);
}

void on_response(int ret, const char *result, void *user_data) {
if (ret != 0) {
void on_response(int ret, const char *result, void *user_data)
{
if (ret != 0)
{
printf("function execution failed. Returned code: %d\n", ret);
exit(1);
}
Expand All @@ -46,7 +50,8 @@ void on_response(int ret, const char *result, void *user_data) {
strcpy(*data_ref, result);
}

void callBack(int ret, const char *signal, void *user_data) {
void callBack(int ret, const char *signal, void *user_data)
{
// This callback will be executed each time a new message is received

// Example signal:
Expand All @@ -65,21 +70,24 @@ void callBack(int ret, const char *signal, void *user_data) {
}
}*/

if (ret != 0) {
if (ret != 0)
{
printf("function execution failed. Returned code: %d\n", ret);
exit(1);
}

const nx_json *json = nx_json_parse((char *)signal, 0);
const char *type = nx_json_get(json, "type")->text_value;

if (strcmp(type, "message") == 0) {
if (strcmp(type, "message") == 0)
{
const nx_json *wakuMsgJson =
nx_json_get(nx_json_get(json, "event"), "wakuMessage");
const char *contentTopic =
nx_json_get(wakuMsgJson, "contentTopic")->text_value;

if (strcmp(contentTopic, "/example/1/default/rfc26") == 0) {
if (strcmp(contentTopic, "/example/1/default/rfc26") == 0)
{
char *msg = utils_extract_wakumessage_from_signal(wakuMsgJson);

// Decode a message using asymmetric encryption
Expand Down Expand Up @@ -109,7 +117,8 @@ void callBack(int ret, const char *signal, void *user_data) {
nx_json_free(json);
}

int main(int argc, char *argv[]) {
int main(int argc, char *argv[])
{
// Set callback to be executed each time a message is received
waku_set_event_callback(callBack);

Expand All @@ -134,7 +143,7 @@ int main(int argc, char *argv[]) {

// Build a content topic
char *contentTopic = NULL;
waku_content_topic("example", 1, "default", "rfc26", on_response,
waku_content_topic("example", "1", "default", "rfc26", on_response,
(void *)&contentTopic);
printf("Content Topic: %s\n", contentTopic);

Expand Down Expand Up @@ -166,14 +175,15 @@ int main(int argc, char *argv[]) {
// waku_store_query(query, NULL, 0, on_response, (void*)&query_result);
// printf("%s\n", query_result);
char contentFilter[1000];
sprintf(contentFilter,
"{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}",
defaultPubsubTopic, contentTopic);
sprintf(contentFilter,
"{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}",
defaultPubsubTopic, contentTopic);
waku_relay_subscribe(contentFilter, on_error, NULL);

int i = 0;
int version = 1;
while (i < 5) {
while (i < 5)
{
i++;

char wakuMsg[1000];
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func getFlags() []cli.Flag {
// Defaults
options.Fleet = fleetProd

testCT, err := protocol.NewContentTopic("toy-chat", 3, "mingde", "proto")
testCT, err := protocol.NewContentTopic("toy-chat", "3", "mingde", "proto")
if err != nil {
panic("invalid contentTopic")
}
Expand Down
2 changes: 1 addition & 1 deletion examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var contractAddress = "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4"
var keystorePath = "./rlnKeystore.json"
var keystorePassword = "password"
var membershipIndex = uint(0)
var contentTopic, _ = protocol.NewContentTopic("rln", 1, "test", "proto")
var contentTopic, _ = protocol.NewContentTopic("rln", "1", "test", "proto")
var pubsubTopic = protocol.DefaultPubsubTopic{}

// ============================================================================
Expand Down
5 changes: 2 additions & 3 deletions library/c/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,11 @@ func waku_peer_cnt(cb C.WakuCallBack, userData unsafe.Pointer) C.int {
// Create a content topic string according to RFC 23
//
//export waku_content_topic
func waku_content_topic(applicationName *C.char, applicationVersion C.uint, contentTopicName *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), uint32(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding))
func waku_content_topic(applicationName *C.char, applicationVersion *C.char, contentTopicName *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), C.GoString(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding))
return onSuccesfulResponse(contentTopic.String(), cb, userData)
}


// Get the default pubsub topic used in waku2: /waku/2/default-waku/proto
//
//export waku_default_pubsub_topic
Expand Down
4 changes: 2 additions & 2 deletions library/mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func PeerCnt() string {
}

// ContentTopic creates a content topic string according to RFC 23
func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string {
contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding)
func ContentTopic(applicationName string, applicationVersion string, contentTopicName string, encoding string) string {
contentTopic, _ := protocol.NewContentTopic(applicationName, applicationVersion, contentTopicName, encoding)
return contentTopic.String()
}

Expand Down
4 changes: 2 additions & 2 deletions library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ func PeerCnt() (int, error) {
}

// ContentTopic creates a content topic string according to RFC 23
func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string {
contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding)
func ContentTopic(applicationName string, applicationVersion string, contentTopicName string, encoding string) string {
contentTopic, _ := protocol.NewContentTopic(applicationName, applicationVersion, contentTopicName, encoding)
return contentTopic.String()
}

Expand Down
22 changes: 11 additions & 11 deletions waku/v2/protocol/content_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var ErrInvalidGeneration = errors.New("generation should be a number")
type ContentTopic struct {
ContentTopicParams
ApplicationName string
ApplicationVersion uint32
ApplicationVersion string
ContentTopicName string
Encoding string
}
Expand All @@ -35,12 +35,13 @@ type ContentTopicOption func(*ContentTopicParams)

// String formats a content topic in string format as per RFC 23.
func (ct ContentTopic) String() string {
return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding)
return fmt.Sprintf("/%s/%s/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding)
}

// NewContentTopic creates a new content topic based on params specified.
// Returns ErrInvalidGeneration if an unsupported generation is specified.
func NewContentTopic(applicationName string, applicationVersion uint32,
// Note that this is recommended to be used for autosharding where contentTopic format is enforced as per https://rfc.vac.dev/spec/51/#content-topics-format-for-autosharding
func NewContentTopic(applicationName string, applicationVersion string,
contentTopicName string, encoding string, opts ...ContentTopicOption) (ContentTopic, error) {

params := new(ContentTopicParams)
Expand Down Expand Up @@ -83,18 +84,19 @@ func (ct ContentTopic) Equal(ct2 ContentTopic) bool {
}

// StringToContentTopic can be used to create a ContentTopic object from a string
// Note that this has to be used only when following the rfc format of contentTopic, which is currently validated only for Autosharding.
// For static and named-sharding, contentTopic can be of any format and hence it is not recommended to use this function.
// This can be updated if required to handle such a case.
func StringToContentTopic(s string) (ContentTopic, error) {
p := strings.Split(s, "/")
switch len(p) {
case 5:
vNum, err := strconv.ParseUint(p[2], 10, 32)
if err != nil {
if len(p[1]) == 0 || len(p[2]) == 0 || len(p[3]) == 0 || len(p[4]) == 0 {
return ContentTopic{}, ErrInvalidFormat
}

return ContentTopic{
ApplicationName: p[1],
ApplicationVersion: uint32(vNum),
ApplicationVersion: p[2],
ContentTopicName: p[3],
Encoding: p[4],
}, nil
Expand All @@ -106,15 +108,13 @@ func StringToContentTopic(s string) (ContentTopic, error) {
if err != nil || generation > 0 {
return ContentTopic{}, ErrInvalidGeneration
}
vNum, err := strconv.ParseUint(p[3], 10, 32)
if err != nil {
if len(p[2]) == 0 || len(p[3]) == 0 || len(p[4]) == 0 || len(p[5]) == 0 {
return ContentTopic{}, ErrInvalidFormat
}

return ContentTopic{
ContentTopicParams: ContentTopicParams{Generation: generation},
ApplicationName: p[2],
ApplicationVersion: uint32(vNum),
ApplicationVersion: p[3],
ContentTopicName: p[4],
Encoding: p[5],
}, nil
Expand Down
2 changes: 0 additions & 2 deletions waku/v2/protocol/envelope_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package protocol

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -22,7 +21,6 @@ func TestEnvelope(t *testing.T) {
topic := e.PubsubTopic()
require.Equal(t, "test", topic)
hash := e.Hash()
fmt.Println(hash)

require.Equal(
t,
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
continue
}
return sub, nil
}
}
Expand All @@ -308,6 +311,9 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error)
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
continue
}
return sub, nil
}
}
Expand Down
4 changes: 1 addition & 3 deletions waku/v2/protocol/relay/waku_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package relay
import (
"context"
"crypto/rand"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -53,7 +52,6 @@ func TestWakuRelay(t *testing.T) {
bytesToSend := []byte{1}
go func() {
defer cancel()

env := <-subs[0].Ch
t.Log("received msg", logging.HexString("hash", env.Hash()))
}()
Expand Down Expand Up @@ -114,7 +112,7 @@ func TestGossipsubScore(t *testing.T) {
for {
_, err := sub.Next(context.Background())
if err != nil {
fmt.Println(err)
t.Log(err)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func FromBitVector(buf []byte) (RelayShards, error) {
// This is based on Autosharding algorithm defined in RFC 51
func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic {
bytes := []byte(topic.ApplicationName)
bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...)
bytes = append(bytes, []byte(topic.ApplicationVersion)...)

hash := hash.SHA256(bytes)
//We only use the last 64 bits of the hash as having more shards is unlikely.
Expand Down
Loading

0 comments on commit 22d7352

Please sign in to comment.