Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 30, 2023
1 parent a7c020d commit e3fcc97
Show file tree
Hide file tree
Showing 60 changed files with 693 additions and 606 deletions.
7 changes: 6 additions & 1 deletion cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -198,7 +199,11 @@ func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushRes
var peerIds string
ind := 0
for entry := range ch {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if entry.Err == nil {
continue
}

s.log.Error("can't unsubscribe", logging.HostID("peer", entry.PeerID), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestLightpushMessagev1(t *testing.T) {
Message: &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
}
Expand Down
13 changes: 6 additions & 7 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {
Expand All @@ -38,7 +39,6 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -73,17 +73,17 @@ func TestRelaySubscription(t *testing.T) {
require.Equal(t, "true", rr.Body.String())

// Test max messages in subscription
now := utils.GetUnixEpoch()
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test"))
now := *utils.GetUnixEpoch()
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", proto.Int64(now+1)), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", proto.Int64(now+2)), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", proto.Int64(now+3)), now, "test"))

// Wait for the messages to be processed
time.Sleep(500 * time.Millisecond)

require.Len(t, d.messages["test"], 3)

d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", proto.Int64(now+4)), now+4, "test"))

time.Sleep(500 * time.Millisecond)

Expand Down Expand Up @@ -139,7 +139,6 @@ func TestRelayGetV1Messages(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
Expand Down
12 changes: 7 additions & 5 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type HistoryCursor struct {
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"`
Version int32 `json:"version"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Meta []byte `json:"meta"`
}
Expand Down Expand Up @@ -83,18 +83,20 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store

startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
query.StartTime, err = strconv.ParseInt(startTimeStr, 10, 64)
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.StartTime = &startTime
}

endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
query.EndTime, err = strconv.ParseInt(endTimeStr, 10, 64)
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.EndTime = &endTime
}

var cursor *pb.Index
Expand Down Expand Up @@ -178,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: int32(m.Version),
Timestamp: m.Timestamp,
Version: m.GetVersion(),
Timestamp: m.GetTimestamp(),
Meta: m.Meta,
})
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/waku/server/rest/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func TestGetMessages(t *testing.T) {
Expand All @@ -32,14 +33,14 @@ func TestGetMessages(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"

now := utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, now+1)
msg2 := tests.CreateWakuMessage(topic1, now+2)
msg3 := tests.CreateWakuMessage(topic1, now+3)
now := *utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, proto.Int64(now+1))
msg2 := tests.CreateWakuMessage(topic1, proto.Int64(now+2))
msg3 := tests.CreateWakuMessage(topic1, proto.Int64(now+3))

node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubsubTopic1))

n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)
Expand Down
5 changes: 4 additions & 1 deletion cmd/waku/server/rpc/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ func TestV1Peers(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
Expand Down
8 changes: 5 additions & 3 deletions cmd/waku/server/rpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs,
}

for i := range f.messages[args.ContentTopic] {
rpcMsg, err := ProtoToRPC(f.messages[args.ContentTopic][i])
msg := f.messages[args.ContentTopic][i]
rpcMsg, err := ProtoToRPC(msg)
if err != nil {
return err
f.log.Warn("could not include message in response", zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
*reply = append(*reply, rpcMsg)
}

f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0)
Expand Down
6 changes: 4 additions & 2 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand Down Expand Up @@ -194,9 +195,10 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
case msg := <-sub.Ch:
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
return err
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
*reply = append(*reply, rpcMsg)
default:
break
}
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rpc/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/waku/server/rpc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type StorePagingOptions struct {
}

type StoreMessagesArgs struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []string `json:"contentFilters,omitempty"`
StartTime int64 `json:"startTime,omitempty"`
EndTime int64 `json:"endTime,omitempty"`
PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"`
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []string `json:"contentFilters,omitempty"`
StartTime *int64 `json:"startTime,omitempty"`
EndTime *int64 `json:"endTime,omitempty"`
PagingOptions *StorePagingOptions `json:"pagingOptions,omitempty"`
}

type StoreMessagesReply struct {
Expand Down
28 changes: 15 additions & 13 deletions cmd/waku/server/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package rpc

import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb"

"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -32,18 +34,18 @@ func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) {
rpcWakuMsg := &RPCWakuMessage{
Payload: input.Payload,
ContentTopic: input.ContentTopic,
Version: input.Version,
Timestamp: input.Timestamp,
Ephemeral: input.Ephemeral,
}

rateLimitProof := &pb.RateLimitProof{}
err := proto.Unmarshal(input.RateLimitProof, rateLimitProof)
if err != nil {
return nil, err
Version: input.GetVersion(),
Timestamp: input.GetTimestamp(),
Ephemeral: input.GetEphemeral(),
}

if input.RateLimitProof != nil {
rateLimitProof := &rlnpb.RateLimitProof{}
err := proto.Unmarshal(input.RateLimitProof, rateLimitProof)
if err != nil {
return nil, err
}

rpcWakuMsg.RateLimitProof = &RateLimitProof{
Proof: rateLimitProof.Proof,
MerkleRoot: rateLimitProof.MerkleRoot,
Expand All @@ -66,13 +68,13 @@ func (r *RPCWakuMessage) toProto() (*pb.WakuMessage, error) {
msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Ephemeral: r.Ephemeral,
Version: proto.Uint32(r.Version),
Timestamp: proto.Int64(r.Timestamp),
Ephemeral: proto.Bool(r.Ephemeral),
}

if r.RateLimitProof != nil {
rateLimitProof := &pb.RateLimitProof{
rateLimitProof := &rlnpb.RateLimitProof{
Proof: r.RateLimitProof.Proof,
MerkleRoot: r.RateLimitProof.MerkleRoot,
Epoch: r.RateLimitProof.Epoch,
Expand Down
6 changes: 3 additions & 3 deletions examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

var log = utils.Logger().Named("basic2")
Expand Down Expand Up @@ -84,7 +85,6 @@ func randomHex(n int) (string, error) {

func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, msgContent string) {
var version uint32 = 0
var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource())

p := new(payload.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
Expand All @@ -98,9 +98,9 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms

msg := &pb.WakuMessage{
Payload: payload,
Version: version,
Version: proto.Uint32(version),
ContentTopic: contentTopic,
Timestamp: timestamp,
Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()),
}

_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
Expand Down
4 changes: 2 additions & 2 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (c *Chat) publish(ctx context.Context, message string) error {

wakuMsg := &wpb.WakuMessage{
Payload: payload,
Version: version,
Version: proto.Uint32(version),
ContentTopic: options.ContentTopic,
Timestamp: timestamp,
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
c.ui.InfoMessage("0 historic messages available")
} else {
for _, msg := range response.Messages {
c.C <- protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic)
c.C <- protocol.NewEnvelope(msg, msg.GetTimestamp(), relay.DefaultWakuTopic)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

var log = logging.Logger("filter2")
Expand Down Expand Up @@ -144,7 +145,6 @@ func randomHex(n int) (string, error) {

func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var version uint32 = 0
var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource())

p := new(payload.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
Expand All @@ -154,9 +154,9 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {

msg := &pb.WakuMessage{
Payload: payload,
Version: version,
Version: proto.Uint32(version),
ContentTopic: contentTopic,
Timestamp: timestamp,
Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()),
}

_, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String()))
Expand Down
2 changes: 1 addition & 1 deletion examples/noise/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P
continue
}

msg.Timestamp = wakuNode.Timesource().Now().UnixNano()
msg.Timestamp = utils.GetUnixEpoch(wakuNode.Timesource())

_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

var log = utils.Logger().Named("rln")
Expand Down Expand Up @@ -105,7 +106,6 @@ func randomHex(n int) (string, error) {

func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var version uint32 = 0
var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource())

p := new(payload.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
Expand All @@ -119,9 +119,9 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {

msg := &pb.WakuMessage{
Payload: payload,
Version: version,
Version: proto.Uint32(version),
ContentTopic: contentTopic.String(),
Timestamp: timestamp,
Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()),
}

err = wakuNode.RLNRelay().AppendRLNProof(msg, wakuNode.Timesource().Now())
Expand Down
6 changes: 0 additions & 6 deletions library/Create multiple nodes1

This file was deleted.

Loading

0 comments on commit e3fcc97

Please sign in to comment.