Skip to content

Commit

Permalink
feat(nwaku)_: storenode requestor for missing message retrieval and r…
Browse files Browse the repository at this point in the history
…esult iterator impl
  • Loading branch information
richard-ramos committed Oct 23, 2024
1 parent d6079c7 commit d79e622
Showing 1 changed file with 180 additions and 6 deletions.
186 changes: 180 additions & 6 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ import (

"github.com/libp2p/go-libp2p/core/metrics"

commonapi "github.com/waku-org/go-waku/waku/v2/api/common"

filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/api/missing"
Expand All @@ -322,6 +324,7 @@ import (
gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/wakuv2/common"
"github.com/status-im/status-go/wakuv2/persistence"
Expand Down Expand Up @@ -1427,13 +1430,13 @@ func (w *Waku) Start() error {
w.wg.Add(1)
go w.runPeerExchangeLoop()
*/

if w.cfg.EnableMissingMessageVerification {
w.missingMsgVerifier = missing.NewMissingMessageVerifier(
missing.NewDefaultStorenodeRequestor(w.node.Store()),
newStorenodeRequestor(w.wakuCtx, w.logger),
w,
w.node.Timesource(),
w.timesource,
w.logger)

w.missingMsgVerifier.Start(w.ctx)
Expand All @@ -1456,6 +1459,7 @@ func (w *Waku) Start() error {
}()
}

/* TODO: nwaku
if w.cfg.LightClient {
// Create FilterManager that will main peer connectivity
// for installed filters
Expand Down Expand Up @@ -1715,7 +1719,6 @@ func (w *Waku) Stop() error {
return nil
}

/* TODO-nwaku
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
if envelope == nil {
return nil
Expand Down Expand Up @@ -1751,7 +1754,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag

return nil
}
*/

// addEnvelope adds an envelope to the envelope map, used for sending
func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
Expand Down Expand Up @@ -2977,7 +2979,7 @@ type defaultStorenodeMessageVerifier struct {
}

func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
requestIDStr := hexutil.Encode(requestID)
requestIDStr := hex.EncodeToString(requestID)
storeRequest := &storepb.StoreQueryRequest{
RequestId: requestIDStr,
MessageHashes: make([][]byte, len(messageHashes)),
Expand Down Expand Up @@ -3019,3 +3021,175 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context

return result, nil
}

func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor {
return &storenodeRequestor{
wakuCtx: wakuCtx,
logger: logger.Named("storenodeRequestor"),
}
}

type storenodeRequestor struct {
wakuCtx unsafe.Pointer
logger *zap.Logger
}

func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())

logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))

logger.Debug("sending store request")

storeRequest := &storepb.StoreQueryRequest{
RequestId: requestIDStr,
MessageHashes: make([][]byte, len(messageHashes)),
IncludeData: true,
PaginationCursor: nil,
PaginationForward: false,
PaginationLimit: proto.Uint64(pageSize),
}

for i, mhash := range messageHashes {
storeRequest.MessageHashes[i] = mhash.Bytes()
}

jsonQuery, err := json.Marshal(storeRequest)
if err != nil {
return nil, err
}

// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
if err != nil {
return nil, err
}

storeResponse := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
if err != nil {
return nil, err
}

if storeResponse.GetStatusCode() != http.StatusOK {
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
}

return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
}

func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) {
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())

logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))

logger.Debug("sending store request")

storeRequest := &storepb.StoreQueryRequest{
RequestId: requestIDStr,
PubsubTopic: proto.String(pubsubTopic),
ContentTopics: contentTopics,
TimeStart: from,
TimeEnd: to,
IncludeData: false,
PaginationCursor: nil,
PaginationForward: false,
PaginationLimit: proto.Uint64(pageSize),
}

jsonQuery, err := json.Marshal(storeRequest)
if err != nil {
return nil, err
}

// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
if err != nil {
return nil, err
}

storeResponse := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
if err != nil {
return nil, err
}

if storeResponse.GetStatusCode() != http.StatusOK {
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
}

return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
}

type storeResultImpl struct {
done bool

wakuCtx unsafe.Pointer
storeRequest *storepb.StoreQueryRequest
storeResponse *storepb.StoreQueryResponse
peerID peer.ID
}

func newStoreResultImpl(wakuCtx unsafe.Pointer, peerID peer.ID, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl {
return &storeResultImpl{
wakuCtx: wakuCtx,
storeRequest: storeRequest,
storeResponse: storeResponse,
peerID: peerID,
}
}

func (r *storeResultImpl) Cursor() []byte {
return r.storeResponse.GetPaginationCursor()
}

func (r *storeResultImpl) IsComplete() bool {
return r.done
}

func (r *storeResultImpl) PeerID() peer.ID {
return r.peerID
}

func (r *storeResultImpl) Query() *storepb.StoreQueryRequest {
return r.storeRequest
}

func (r *storeResultImpl) Response() *storepb.StoreQueryResponse {
return r.storeResponse
}

func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error {
// TODO: opts is being ignored. Will require some changes in go-waku. For now using this
// is not necessary

if r.storeResponse.GetPaginationCursor() == nil {
r.done = true
return nil
}

r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor

jsonQuery, err := json.Marshal(r.storeRequest)
if err != nil {
return err
}

// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
jsonResponse, err := wakuStoreQuery(r.wakuCtx, string(jsonQuery), r.peerID.String(), int(time.Minute.Milliseconds()))
if err != nil {
return err
}

err = json.Unmarshal([]byte(jsonResponse), r.storeResponse)
if err != nil {
return err
}

return nil
}

func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue {
return r.storeResponse.GetMessages()
}

0 comments on commit d79e622

Please sign in to comment.