Skip to content

Commit

Permalink
feat : autoshard relay api (#807)
Browse files Browse the repository at this point in the history
* fix: using relay without bcaster should consume and drop messages

* update relay api usage

* move subscription to broadcaster

* move filter logic under subscription

* Support more than 1 relay subscription for a pubSubTopic

* modify relay Publish API to derive pubSubTopic based on autosharding

* implement relay RPC methods for autosharding

* remove relay msgChannel and relay on pubsub buffersize for subscription

Co-authored-by: richΛrd <info@richardramos.me>

* handle relay subscribe with noConsumer and address issue reported in code review

* chore: reorg relay code

---------

Co-authored-by: richΛrd <info@richardramos.me>
  • Loading branch information
chaitanyaprem and richard-ramos authored Oct 20, 2023
1 parent 4af7e7a commit b5be83a
Show file tree
Hide file tree
Showing 37 changed files with 979 additions and 484 deletions.
22 changes: 12 additions & 10 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,13 @@ func Execute(options NodeOptions) error {
var wg sync.WaitGroup

if options.Relay.Enable {
for nodeTopic := range pubSubTopicMap {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
if err != nil {
return err
}

sub.Unsubscribe()

if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
Expand Down Expand Up @@ -549,17 +547,18 @@ func Execute(options NodeOptions) error {
return nil
}

func processTopics(options NodeOptions) (map[string]struct{}, error) {
func processTopics(options NodeOptions) (map[string][]string, error) {

//Using a map to avoid duplicate pub-sub topics that can result from autosharding
// or same-topic being passed twice.
pubSubTopicMap := make(map[string]struct{})
pubSubTopicMap := make(map[string][]string)

for _, topic := range options.Relay.Topics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}

for _, topic := range options.Relay.PubSubTopics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}

//Get pubSub topics from contentTopics if they are as per autosharding
Expand All @@ -569,11 +568,14 @@ func processTopics(options NodeOptions) (map[string]struct{}, error) {
return nil, err
}
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
pubSubTopicMap[pTopic.String()] = struct{}{}
if _, ok := pubSubTopicMap[pTopic.String()]; !ok {
pubSubTopicMap[pTopic.String()] = []string{}
}
pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopic)
}
//If no topics are passed, then use default waku topic.
if len(pubSubTopicMap) == 0 {
pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{}
pubSubTopicMap[relay.DefaultWakuTopic] = []string{}
}

return pubSubTopicMap, nil
Expand Down
8 changes: 5 additions & 3 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), topic)
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
Expand All @@ -129,18 +129,20 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
sub, err = r.node.Relay().Subscribe(req.Context())
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic)
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/server/rest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
cancel context.CancelFunc
adder Adder
}
Expand All @@ -26,7 +26,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for {
select {
case <-ctx.Done():
Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand All @@ -37,9 +38,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
require.NoError(t, err)

if isFullNode {
sub, err := n.Relay().SubscribeToTopic(context.Background(), testTopic)
sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
go func() {
for range sub.Ch {
for range sub[0].Ch {
}
}()
require.NoError(t, err)
Expand All @@ -62,14 +63,15 @@ func TestFilterSubscription(t *testing.T) {
err = node.Start(context.Background())
require.NoError(t, err)

_, err = node.SubscribeToTopic(context.Background(), testTopic)
_, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)

b2 := relay.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
f.SetHost(host)
err = f.Start(context.Background(), relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = f.Start(context.Background(), sub)
require.NoError(t, err)

d := makeFilterService(t, true)
Expand Down
98 changes: 86 additions & 12 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type RelayMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}

// RelayAutoMessageArgs represents the requests used for posting messages
type RelayAutoMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}

// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing
type TopicsArgs struct {
Topics []string `json:"topics,omitempty"`
Expand Down Expand Up @@ -120,28 +125,97 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil
}

// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {

_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.

*reply = true
return nil
}

// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.
*reply = true
return nil
}

// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if msg == nil {
err := fmt.Errorf("invalid message format received")
r.log.Error("publishing message", zap.Error(err))
return err
}
if msg.ContentTopic == "" {
err := fmt.Errorf("content-topic cannot be empty")
r.log.Error("publishing message", zap.Error(err))
return err
}
if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
}

_, err = r.node.Relay().Publish(req.Context(), msg)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
}

*reply = true
return nil
}

// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method
// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding.
func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
sub, err := r.node.Relay().GetSubscription(args.Topic)
if err != nil {
return err
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}
return nil
}

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

for _, topic := range args.Topics {
var err error
if topic == "" {
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
sub.Unsubscribe()
} else {
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub.Unsubscribe()
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()
Expand All @@ -155,7 +229,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
err := r.node.Relay().Unsubscribe(ctx, topic)
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/server/rpc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
adder Adder
}

Expand All @@ -21,7 +21,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
}

func (r *runnerService) Start() {
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for envelope := range r.sub.Ch {
r.adder(envelope)
}
Expand Down
5 changes: 3 additions & 2 deletions examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -116,13 +117,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic {
continue
}
Expand Down
12 changes: 2 additions & 10 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"chat2/pb"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
"golang.org/x/crypto/pbkdf2"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -84,13 +82,13 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
} else {

for _, topic := range topics {
sub, err := node.Relay().SubscribeToTopic(ctx, topic)
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = make(chan *protocol.Envelope)
go func() {
for e := range sub.Ch {
for e := range sub[0].Ch {
chat.C <- e
}
}()
Expand Down Expand Up @@ -356,12 +354,6 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Mess
return msg, nil
}

func generateSymKey(password string) []byte {
// AesKeyLength represents the length (in bytes) of an private key
AESKeyLength := 256 / 8
return pbkdf2.Key([]byte(password), nil, 65356, AESKeyLength, sha256.New)
}

func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()

Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
pubsubTopic := pubSubTopic.String()
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
if err != nil {
log.Error("Could not subscribe: ", err)
return
}

for value := range sub.Ch {
for value := range sub[0].Ch {
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
if err != nil {
fmt.Println(err)
Expand Down
4 changes: 2 additions & 2 deletions examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic.String())
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic.String()))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic.String() {
continue
}
Expand Down
Loading

0 comments on commit b5be83a

Please sign in to comment.