Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : autoshard relay api #807

Merged
merged 30 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5476cfd
chore: remove duplicate import
chaitanyaprem Oct 13, 2023
71cbbe1
fix: using relay without bcaster should consume and drop messages
chaitanyaprem Oct 13, 2023
e0e4ccd
update relay api usage
chaitanyaprem Oct 16, 2023
c15da7e
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 16, 2023
5fa8512
move subscription to broadcaster
chaitanyaprem Oct 16, 2023
f4ffbbc
move filter logic under subscription
chaitanyaprem Oct 16, 2023
fc35d3a
fix error with test
chaitanyaprem Oct 16, 2023
de83bbf
fix lint error
chaitanyaprem Oct 16, 2023
5285e55
fix examples
chaitanyaprem Oct 16, 2023
06dd50a
fix rln test
chaitanyaprem Oct 16, 2023
b4b9e3d
remove notification from outside broadcaster in relay
chaitanyaprem Oct 16, 2023
7a4db68
fix: handle context deadline exceed error
chaitanyaprem Oct 16, 2023
4bcd91f
handle unregister for pubSubTopic in broadcaster
chaitanyaprem Oct 16, 2023
d732304
clarify TODO's and rename SubSimulation
chaitanyaprem Oct 17, 2023
83adec1
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 17, 2023
84cbd85
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 17, 2023
6009fae
address review comments
chaitanyaprem Oct 17, 2023
6dc99b0
Support more than 1 relay subscription for a pubSubTopic
chaitanyaprem Oct 18, 2023
460716d
modify relay Publish API to derive pubSubTopic based on autosharding
chaitanyaprem Oct 18, 2023
a435055
implement relay RPC methods for autosharding
chaitanyaprem Oct 18, 2023
edf5cd2
remove relay msgChannel and relay on pubsub buffersize for subscription
chaitanyaprem Oct 18, 2023
cf0c1a3
handle relay topic subscriptions during node init via cmd
chaitanyaprem Oct 18, 2023
4fc5134
fix: issues with autosharding relay RPC API
chaitanyaprem Oct 18, 2023
b4426b2
added unit tests for relay autoshard changes
chaitanyaprem Oct 19, 2023
d7610b9
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 19, 2023
4c22e60
Apply suggestions from code review
chaitanyaprem Oct 20, 2023
9b3a0b8
handle relay subscribe with noConsumer and address issue reported in …
chaitanyaprem Oct 20, 2023
b1f1b0b
address issue with relay test code
chaitanyaprem Oct 20, 2023
277721a
chore: reorg relay code
chaitanyaprem Oct 20, 2023
7f9eb84
chore: address codeclimate issue
chaitanyaprem Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,13 @@ func Execute(options NodeOptions) error {
var wg sync.WaitGroup

if options.Relay.Enable {
for nodeTopic := range pubSubTopicMap {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
if err != nil {
return err
}

sub.Unsubscribe()

if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
Expand Down Expand Up @@ -549,17 +547,18 @@ func Execute(options NodeOptions) error {
return nil
}

func processTopics(options NodeOptions) (map[string]struct{}, error) {
func processTopics(options NodeOptions) (map[string][]string, error) {

//Using a map to avoid duplicate pub-sub topics that can result from autosharding
// or same-topic being passed twice.
pubSubTopicMap := make(map[string]struct{})
pubSubTopicMap := make(map[string][]string)

for _, topic := range options.Relay.Topics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}

for _, topic := range options.Relay.PubSubTopics.Value() {
pubSubTopicMap[topic] = struct{}{}
pubSubTopicMap[topic] = []string{}
}

//Get pubSub topics from contentTopics if they are as per autosharding
Expand All @@ -569,11 +568,14 @@ func processTopics(options NodeOptions) (map[string]struct{}, error) {
return nil, err
}
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
pubSubTopicMap[pTopic.String()] = struct{}{}
if _, ok := pubSubTopicMap[pTopic.String()]; !ok {
pubSubTopicMap[pTopic.String()] = []string{}
}
pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopic)
}
//If no topics are passed, then use default waku topic.
if len(pubSubTopicMap) == 0 {
pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{}
pubSubTopicMap[relay.DefaultWakuTopic] = []string{}
}

return pubSubTopicMap, nil
Expand Down
8 changes: 5 additions & 3 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), topic)
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
Expand All @@ -129,18 +129,20 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
sub, err = r.node.Relay().Subscribe(req.Context())
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic)
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/server/rest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
cancel context.CancelFunc
adder Adder
}
Expand All @@ -26,7 +26,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for {
select {
case <-ctx.Done():
Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand All @@ -37,9 +38,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
require.NoError(t, err)

if isFullNode {
sub, err := n.Relay().SubscribeToTopic(context.Background(), testTopic)
sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
go func() {
for range sub.Ch {
for range sub[0].Ch {
}
}()
require.NoError(t, err)
Expand All @@ -62,14 +63,15 @@ func TestFilterSubscription(t *testing.T) {
err = node.Start(context.Background())
require.NoError(t, err)

_, err = node.SubscribeToTopic(context.Background(), testTopic)
_, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)

b2 := relay.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
f.SetHost(host)
err = f.Start(context.Background(), relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = f.Start(context.Background(), sub)
require.NoError(t, err)

d := makeFilterService(t, true)
Expand Down
98 changes: 86 additions & 12 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type RelayMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}

// RelayAutoMessageArgs represents the requests used for posting messages
type RelayAutoMessageArgs struct {
Message *RPCWakuMessage `json:"message,omitempty"`
}

// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing
type TopicsArgs struct {
Topics []string `json:"topics,omitempty"`
Expand Down Expand Up @@ -120,28 +125,97 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil
}

// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {

_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.

*reply = true
return nil
}

// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.
*reply = true
return nil
}

// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if msg == nil {
err := fmt.Errorf("invalid message format received")
r.log.Error("publishing message", zap.Error(err))
return err
}
if msg.ContentTopic == "" {
err := fmt.Errorf("content-topic cannot be empty")
r.log.Error("publishing message", zap.Error(err))
return err
}
if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
}

_, err = r.node.Relay().Publish(req.Context(), msg)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
}

*reply = true
return nil
}

// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method
// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding.
func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
sub, err := r.node.Relay().GetSubscription(args.Topic)
if err != nil {
return err
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}
return nil
}

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

for _, topic := range args.Topics {
var err error
if topic == "" {
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
sub.Unsubscribe()
} else {
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub.Unsubscribe()
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()
Expand All @@ -155,7 +229,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
err := r.node.Relay().Unsubscribe(ctx, topic)
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/waku/server/rpc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
adder Adder
}

Expand All @@ -21,7 +21,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService
}

func (r *runnerService) Start() {
r.sub = r.broadcaster.RegisterForAll(1024)
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for envelope := range r.sub.Ch {
r.adder(envelope)
}
Expand Down
5 changes: 3 additions & 2 deletions examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -116,13 +117,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic {
continue
}
Expand Down
12 changes: 2 additions & 10 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"chat2/pb"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
"golang.org/x/crypto/pbkdf2"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -84,13 +82,13 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
} else {

for _, topic := range topics {
sub, err := node.Relay().SubscribeToTopic(ctx, topic)
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = make(chan *protocol.Envelope)
go func() {
for e := range sub.Ch {
for e := range sub[0].Ch {
chat.C <- e
}
}()
Expand Down Expand Up @@ -356,12 +354,6 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Mess
return msg, nil
}

func generateSymKey(password string) []byte {
// AesKeyLength represents the length (in bytes) of an private key
AESKeyLength := 256 / 8
return pbkdf2.Key([]byte(password), nil, 65356, AESKeyLength, sha256.New)
}

func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()

Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
pubsubTopic := pubSubTopic.String()
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
if err != nil {
log.Error("Could not subscribe: ", err)
return
}

for value := range sub.Ch {
for value := range sub[0].Ch {
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
if err != nil {
fmt.Println(err)
Expand Down
4 changes: 2 additions & 2 deletions examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic.String())
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic.String()))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic.String() {
continue
}
Expand Down
Loading