Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use waku-org/waku-proto repository for protobuffer definitions #828

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "libs/waku-rln-contract"]
path = libs/waku-rln-contract
url = https://github.com/waku-org/waku-rln-contract.git
[submodule "waku/v2/protocol/waku-proto"]
path = waku/v2/protocol/waku-proto
url = git@github.com:waku-org/waku-proto.git
3 changes: 2 additions & 1 deletion cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -241,7 +242,7 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
ind := 0
for _, entry := range result.Errors() {
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
s.log.Error("can't unsubscribe", logging.HostID("peer", entry.PeerID), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope {
&pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: contentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
0,
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestLightpushMessagev1(t *testing.T) {
Message: &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
}
Expand Down
17 changes: 7 additions & 10 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {
Expand All @@ -38,7 +39,6 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -70,16 +70,16 @@ func TestRelaySubscription(t *testing.T) {
require.Equal(t, "true", rr.Body.String())

// Test max messages in subscription
now := utils.GetUnixEpoch()
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+1)), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+2)), relay.WithPubSubTopic("test"))
require.NoError(t, err)

_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+3)), relay.WithPubSubTopic("test"))
require.NoError(t, err)

// Wait for the messages to be processed
Expand Down Expand Up @@ -130,7 +130,6 @@ func TestRelayGetV1Messages(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -168,7 +167,6 @@ func TestPostAutoV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -201,9 +199,9 @@ func TestRelayAutoSubUnsub(t *testing.T) {
require.Equal(t, "true", rr.Body.String())

// Test publishing messages after subscription
now := utils.GetUnixEpoch()
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage(cTopic1, now+1))
tests.CreateWakuMessage(cTopic1, proto.Int64(now+1)))
require.NoError(t, err)

// Wait for the messages to be processed
Expand Down Expand Up @@ -267,7 +265,6 @@ func TestRelayGetV1AutoMessages(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: cTopic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
Expand Down
12 changes: 7 additions & 5 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type HistoryCursor struct {
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"`
Version int32 `json:"version"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Meta []byte `json:"meta"`
}
Expand Down Expand Up @@ -83,18 +83,20 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store

startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
query.StartTime, err = strconv.ParseInt(startTimeStr, 10, 64)
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.StartTime = &startTime
}

endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
query.EndTime, err = strconv.ParseInt(endTimeStr, 10, 64)
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.EndTime = &endTime
}

var cursor *pb.Index
Expand Down Expand Up @@ -178,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: int32(m.Version),
Timestamp: m.Timestamp,
Version: m.GetVersion(),
Timestamp: m.GetTimestamp(),
Meta: m.Meta,
})
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/waku/server/rest/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func TestGetMessages(t *testing.T) {
Expand All @@ -32,14 +33,14 @@ func TestGetMessages(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"

now := utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, now+1)
msg2 := tests.CreateWakuMessage(topic1, now+2)
msg3 := tests.CreateWakuMessage(topic1, now+3)
now := *utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, proto.Int64(now+1))
msg2 := tests.CreateWakuMessage(topic1, proto.Int64(now+2))
msg3 := tests.CreateWakuMessage(topic1, proto.Int64(now+3))

node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubsubTopic1))

n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)
Expand Down
6 changes: 4 additions & 2 deletions cmd/waku/server/rpc/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func TestV1Peers(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcast := relay.NewBroadcaster(10)
relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion cmd/waku/server/rpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs,
}

for i := range f.messages[args.ContentTopic] {
*reply = append(*reply, ProtoToRPC(f.messages[args.ContentTopic][i]))
msg := f.messages[args.ContentTopic][i]
rpcMsg, err := ProtoToRPC(msg)
if err != nil {
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
f.log.Warn("could not include message in response", zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
}

f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0)
Expand Down
25 changes: 18 additions & 7 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand Down Expand Up @@ -69,7 +70,10 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
topic = args.Topic
}

msg := args.Message.toProto()
msg, err := args.Message.toProto()
if err != nil {
return err
}

if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
Expand Down Expand Up @@ -117,10 +121,9 @@ func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsA

// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if msg == nil {
err := fmt.Errorf("invalid message format received")
msg, err := args.Message.toProto()
if err != nil {
err = fmt.Errorf("invalid message format received: %w", err)
r.log.Error("publishing message", zap.Error(err))
return err
}
Expand Down Expand Up @@ -148,7 +151,12 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
default:
break
}
Expand Down Expand Up @@ -200,7 +208,10 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
m, err := ProtoToRPC(msg.Message())
if err == nil {
*reply = append(*reply, m)
}
default:
break
}
Expand Down
32 changes: 24 additions & 8 deletions cmd/waku/server/rpc/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}

err := d.PostV1Message(
rpcWakuMsg, err := ProtoToRPC(msg)
require.NoError(t, err)

err = d.PostV1Message(
makeRequest(t),
&RelayMessageArgs{
Message: ProtoToRPC(msg),
Message: rpcWakuMsg,
},
&reply,
)
Expand Down Expand Up @@ -95,6 +97,17 @@ func TestRelayGetV1Messages(t *testing.T) {
time.Sleep(1 * time.Second)

args := &TopicsArgs{Topics: []string{"test"}}

// Subscribe A to topic
err = serviceA.PostV1Subscription(
makeRequest(t),
args,
&reply,
)
require.NoError(t, err)
require.True(t, reply)

// Subscribe B to topic
err = serviceB.PostV1Subscription(
makeRequest(t),
args,
Expand All @@ -106,14 +119,17 @@ func TestRelayGetV1Messages(t *testing.T) {
// Wait for the subscription to be started
time.Sleep(1 * time.Second)

rpcWakuMsg, err := ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
ContentTopic: "test",
})
require.NoError(t, err)

err = serviceA.PostV1Message(
makeRequest(t),
&RelayMessageArgs{
Topic: "test",
Message: ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
ContentTopic: "testContentTopic",
}),
Topic: "test",
Message: rpcWakuMsg,
},
&reply,
)
Expand Down
10 changes: 7 additions & 3 deletions cmd/waku/server/rpc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type StorePagingOptions struct {
type StoreMessagesArgs struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []string `json:"contentFilters,omitempty"`
StartTime int64 `json:"startTime,omitempty"`
EndTime int64 `json:"endTime,omitempty"`
StartTime *int64 `json:"startTime,omitempty"`
EndTime *int64 `json:"endTime,omitempty"`
PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"`
}

Expand Down Expand Up @@ -63,7 +63,11 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,

reply.Messages = make([]*RPCWakuMessage, len(res.Messages))
for i := range res.Messages {
reply.Messages[i] = ProtoToRPC(res.Messages[i])
msg, err := ProtoToRPC(res.Messages[i])
if err != nil {
return err
}
reply.Messages[i] = msg
}

reply.PagingInfo = StorePagingOptions{
Expand Down
6 changes: 4 additions & 2 deletions cmd/waku/server/rpc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ func makeRequest(t *testing.T) *http.Request {
func TestBase64Encoding(t *testing.T) {
input := "Hello World"

rpcMsg := ProtoToRPC(&pb.WakuMessage{
Payload: []byte(input),
rpcMsg, err := ProtoToRPC(&pb.WakuMessage{
Payload: []byte(input),
ContentTopic: "test",
})
require.NoError(t, err)

jsonBytes, err := json.Marshal(rpcMsg)
require.NoError(t, err)
Expand Down
Loading