Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Apr 25, 2024
1 parent 1e0deed commit 44df347
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 150 deletions.
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

result, err := d.node.Store().Query(ctx, *query, options...)
result, err := d.node.LegacyStore().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rpc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize),
store.WithCursor(args.PagingOptions.Cursor),
}
res, err := s.node.Store().Query(
res, err := s.node.LegacyStore().Query(
req.Context(),
store.Query{
PubsubTopic: args.Topic,
Expand Down
2 changes: 1 addition & 1 deletion library/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type storeMessagesReply struct {
}

func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) {
res, err := instance.node.Store().Query(
res, err := instance.node.LegacyStore().Query(
ctx,
store.Query{
PubsubTopic: args.Topic,
Expand Down
11 changes: 9 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.log.Info("Subscribing store to broadcaster")
}

w.storeV3.SetHost(host)

w.lightPush.SetHost(host)
if w.opts.enableLightPush {
if err := w.lightPush.Start(ctx); err != nil {
Expand Down Expand Up @@ -617,11 +619,16 @@ func (w *WakuNode) Relay() *relay.WakuRelay {
return nil
}

// Store is used to access any operation related to Waku Store protocol
func (w *WakuNode) Store() store.Store {
// LegacyStore is used to access any operation related to Waku Store protocol
func (w *WakuNode) LegacyStore() store.Store {
return w.store.(store.Store)
}

// Store is used to access any operation related to Waku Store protocol
func (w *WakuNode) Store() *storev3.WakuStoreV3 {
return w.storeV3
}

// LegacyFilter is used to access any operation related to Waku LegacyFilter protocol
func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter {
if result, ok := w.legacyFilter.(*legacy_filter.WakuFilter); ok {
Expand Down
65 changes: 64 additions & 1 deletion waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"fmt"
"math/big"
"net"
"sync"
Expand All @@ -11,6 +12,7 @@ import (

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
Expand All @@ -23,6 +25,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/storev3"

"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -114,6 +118,65 @@ func TestUpAndDown(t *testing.T) {
}
}

func TestStoreV3WIP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.ContentFilter{PubsubTopic: relay.DefaultWakuTopic})
require.NoError(t, err)
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-subs[0].Ch:
if msg == nil {
return
}
fmt.Println(string(msg.Message().Payload))
}
}
}()

err = wakuNode1.DialPeer(ctx, "/ip4/148.103.183.231/tcp/60000/p2p/16Uiu2HAm6eGoQraK1Bd5rmTrd5xQDBhXehgYjCNydfrKqjzbACFx")
require.NoError(t, err)

time.Sleep(2 * time.Second)

msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: "test",
Version: proto.Uint32(0),
Timestamp: utils.GetUnixEpoch(wakuNode1.timesource),
}

msgHash, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
require.NoError(t, err)

time.Sleep(1 * time.Second)

peerID, _ := peer.Decode("16Uiu2HAm6eGoQraK1Bd5rmTrd5xQDBhXehgYjCNydfrKqjzbACFx")

res, cursor, err := wakuNode1.storeV3.Exists(ctx, []pb.MessageHash{msgHash, [32]byte{1, 2}}, storev3.WithPeer(peerID))
require.NoError(t, err)

fmt.Println(res, cursor)

time.Sleep(5 * time.Second)
fmt.Println("SUCCESS")

}

func Test500(t *testing.T) {
maxMsgs := 500
maxMsgBytes := int2Bytes(maxMsgs)
Expand Down Expand Up @@ -315,7 +378,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
time.Sleep(2 * time.Second)
// NODE2 should have returned the message received via filter
result, err := wakuNode3.Store().Query(ctx, store.Query{})
result, err := wakuNode3.LegacyStore().Query(ctx, store.Query{})
require.NoError(t, err)
require.Len(t, result.Messages, 1)
require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/protocol/pb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func (h MessageHash) String() string {
return hexutil.Encode(h[:])
}

func (h MessageHash) Bytes() []byte {
return h[:]
}

// ToMessageHash converts a byte slice into a MessageHash
func ToMessageHash(b []byte) MessageHash {
var result MessageHash
Expand Down
25 changes: 15 additions & 10 deletions waku/v2/protocol/storev3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package storev3
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -21,8 +23,8 @@ import (
"google.golang.org/protobuf/proto"
)

// StoreID_v300 is the Store protocol v3 identifier
const StoreID_v300 = libp2pProtocol.ID("/vac/waku/store/3.0.0")
// StoreQueryID_v300 is the Store protocol v3 identifier
const StoreQueryID_v300 = libp2pProtocol.ID("/vac/waku/store-query/3.0.0")

// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
Expand All @@ -35,7 +37,7 @@ var (
// ErrNoPeersAvailable is returned when there are no store peers in the peer store
// that could be used to retrieve message history
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrMustSelectPeer = errors.New("a peer ID or multiaddress is required")
ErrMustSelectPeer = errors.New("a peer ID or multiaddress is required when checking for message hashes")
)

type WakuStoreV3 struct {
Expand Down Expand Up @@ -79,7 +81,7 @@ func (s *WakuStoreV3) Request(ctx context.Context, criteria Criteria, opts ...Re

//Add Peer to peerstore.
if s.pm != nil && params.peerAddr != nil {
pData, err := s.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v300)
pData, err := s.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreQueryID_v300)
if err != nil {
return nil, err
}
Expand All @@ -92,7 +94,7 @@ func (s *WakuStoreV3) Request(ctx context.Context, criteria Criteria, opts ...Re
selectedPeers, err := s.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: StoreID_v300,
Proto: StoreQueryID_v300,
PubsubTopics: []string{filterCriteria.PubsubTopic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
Expand All @@ -118,7 +120,7 @@ func (s *WakuStoreV3) Request(ctx context.Context, criteria Criteria, opts ...Re
pageLimit = MaxPageSize
}

storeRequest := &pb.StoreRequest{
storeRequest := &pb.StoreQueryRequest{
RequestId: hex.EncodeToString(params.requestID),
ReturnValues: params.returnValues,
PaginationForward: params.forward,
Expand Down Expand Up @@ -190,7 +192,7 @@ func (s *WakuStoreV3) Next(ctx context.Context, r *Result) (*Result, error) {
}, nil
}

storeRequest := proto.Clone(r.storeRequest).(*pb.StoreRequest)
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()

Expand All @@ -212,11 +214,11 @@ func (s *WakuStoreV3) Next(ctx context.Context, r *Result) (*Result, error) {

}

func (s *WakuStoreV3) queryFrom(ctx context.Context, storeRequest *pb.StoreRequest, selectedPeer peer.ID) (*pb.StoreResponse, error) {
func (s *WakuStoreV3) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", selectedPeer))
logger.Info("sending store request")

stream, err := s.h.NewStream(ctx, selectedPeer, StoreID_v300)
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return nil, err
Expand All @@ -225,6 +227,9 @@ func (s *WakuStoreV3) queryFrom(ctx context.Context, storeRequest *pb.StoreReque
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

request, _ := json.Marshal(storeRequest)
fmt.Println(string(request))

err = writer.WriteMsg(storeRequest)
if err != nil {
logger.Error("writing request", zap.Error(err))
Expand All @@ -234,7 +239,7 @@ func (s *WakuStoreV3) queryFrom(ctx context.Context, storeRequest *pb.StoreReque
return nil, err
}

storeResponse := &pb.StoreResponse{RequestId: storeRequest.RequestId}
storeResponse := &pb.StoreQueryResponse{RequestId: storeRequest.RequestId}
err = reader.ReadMsg(storeResponse)
if err != nil {
logger.Error("reading response", zap.Error(err))
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/protocol/storev3/criteria.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type Criteria interface {
PopulateStoreRequest(request *pb.StoreRequest)
PopulateStoreRequest(request *pb.StoreQueryRequest)
}

type FilterCriteria struct {
Expand All @@ -17,7 +17,7 @@ type FilterCriteria struct {
TimeEnd *int64
}

func (f FilterCriteria) PopulateStoreRequest(request *pb.StoreRequest) {
func (f FilterCriteria) PopulateStoreRequest(request *pb.StoreQueryRequest) {
request.ContentTopics = f.ContentTopicsList()
request.PubsubTopic = proto.String(f.PubsubTopic)
request.TimeStart = f.TimeStart
Expand All @@ -28,7 +28,7 @@ type MessageHashCriteria struct {
MessageHashes []wpb.MessageHash
}

func (m MessageHashCriteria) PopulateStoreRequest(request *pb.StoreRequest) {
func (m MessageHashCriteria) PopulateStoreRequest(request *pb.StoreQueryRequest) {
request.MessageHashes = make([][]byte, len(m.MessageHashes))
for i := range m.MessageHashes {
request.MessageHashes[i] = m.MessageHashes[i][:]
Expand Down
Loading

0 comments on commit 44df347

Please sign in to comment.