From 6b56b45e14db950ab1f02f49643d80faa404fd48 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 23 May 2024 16:05:12 +0800 Subject: [PATCH] feat_: hash based query for outgoing messages. --- protocol/messenger_peers.go | 2 +- wakuv2/common/message.go | 3 +- wakuv2/waku.go | 139 +++++++++++++++++++++++++++++++++--- 3 files changed, 134 insertions(+), 10 deletions(-) diff --git a/protocol/messenger_peers.go b/protocol/messenger_peers.go index 59933ff4a51..1f5b28ebafe 100644 --- a/protocol/messenger_peers.go +++ b/protocol/messenger_peers.go @@ -13,7 +13,7 @@ func (m *Messenger) AddStorePeer(address string) (peer.ID, error) { } func (m *Messenger) AddRelayPeer(address string) (peer.ID, error) { - return m.transport.AddStorePeer(address) + return m.transport.AddRelayPeer(address) } func (m *Messenger) DialPeer(address string) error { diff --git a/wakuv2/common/message.go b/wakuv2/common/message.go index d89cfc5a021..4c4703da0b8 100644 --- a/wakuv2/common/message.go +++ b/wakuv2/common/message.go @@ -22,6 +22,7 @@ type MessageType int const ( RelayedMessageType MessageType = iota StoreMessageType + SendMessageType ) // MessageParams specifies the exact way a message should be wrapped @@ -46,7 +47,7 @@ type ReceivedMessage struct { Padding []byte Signature []byte - Sent uint32 // Time when the message was posted into the network + Sent uint32 // Time when the message was posted into the network in seconds Src *ecdsa.PublicKey // Message recipient (identity used to decode the message) Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 7667e933b41..08a0bcd89f0 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -19,6 +19,7 @@ package wakuv2 import ( + "bytes" "context" "crypto/ecdsa" "crypto/sha256" @@ -66,6 +67,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -128,6 +130,9 @@ type Waku struct { storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDsMu sync.RWMutex + sendMsgIDs map[string]map[gethcommon.Hash]uint32 + sendMsgIDsMu sync.RWMutex + topicHealthStatusChan chan peermanager.TopicHealthStatus connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusMu sync.Mutex @@ -208,6 +213,8 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge storeMsgIDs: make(map[gethcommon.Hash]bool), timesource: ts, storeMsgIDsMu: sync.RWMutex{}, + sendMsgIDs: make(map[string]map[gethcommon.Hash]uint32), + sendMsgIDsMu: sync.RWMutex{}, logger: logger, discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, @@ -1007,6 +1014,38 @@ func (w *Waku) broadcast() { } } +func (w *Waku) checkIfMessagesStored() { + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + w.logger.Debug("Stop the look for message stored check") + return + case <-ticker.C: + w.logger.Debug("Running loop for messages stored check") + w.logger.Debug("Send Message IDs", zap.Any("sendMsgIDs", w.sendMsgIDs)) + w.sendMsgIDsMu.Lock() + for pubsubTopic, subMsgs := range w.sendMsgIDs { + var queryMsgIds []gethcommon.Hash + for msgID, sendTime := range subMsgs { + // message is sent 5 seconds ago, check if it's stored + if uint32(w.timesource.Now().Unix()) > sendTime+5 { + queryMsgIds = append(queryMsgIds, msgID) + } + } + w.logger.Debug("Store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic)) + if len(queryMsgIds) > 0 { + w.messageHashBasedQuery(w.ctx, queryMsgIds, pubsubTopic) + } + } + + w.sendMsgIDsMu.Unlock() + } + } +} + type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) { @@ -1016,14 +1055,11 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, if err := publishFn(envelope, logger); err != nil { logger.Error("could not send message", zap.Error(err)) event = common.EventEnvelopeExpired - } else { - event = common.EventEnvelopeSent + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: event, + }) } - - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: event, - }) } // Send injects a message into the waku send queue, to be distributed in the @@ -1052,7 +1088,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes())) w.poolMu.Unlock() if !alreadyCached { - recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType) + recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType) w.postEvent(recvMessage) // notify the local node about the new message w.addEnvelope(recvMessage) } @@ -1060,6 +1096,80 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { return envelope.Hash().Bytes(), nil } +// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes +func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, pubsubTopic string) { + selectedPeers, err := w.node.PeerManager().SelectPeers( + peermanager.PeerSelectionCriteria{ + SelectionType: peermanager.Automatic, + Proto: store.StoreQueryID_v300, + PubsubTopics: []string{pubsubTopic}, + Ctx: ctx, + }, + ) + if err != nil { + w.logger.Warn("could not select peers", zap.Error(err)) + return + } + + var opts []store.RequestOption + requestID := protocol.GenerateRequestID() + opts = append(opts, store.WithRequestID(requestID)) + opts = append(opts, store.WithPeer(selectedPeers[0])) + + messageHashes := make([]pb.MessageHash, len(hashes)) + for i, hash := range hashes { + messageHashes[i] = pb.ToMessageHash(hash.Bytes()) + } + + result, err := w.node.Store().QueryByHash(ctx, messageHashes, opts...) + if err != nil { + w.logger.Warn("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Error(err)) + return + } + + var ackHashes []gethcommon.Hash + var missedHashes []gethcommon.Hash + for _, hash := range hashes { + found := false + for _, msg := range result.Messages() { + if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) { + found = true + break + } + } + if found { + ackHashes = append(ackHashes, hash) + } else { + missedHashes = append(missedHashes, hash) + } + + subMsgs := w.sendMsgIDs[pubsubTopic] + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(w.sendMsgIDs, pubsubTopic) + } else { + w.sendMsgIDs[pubsubTopic] = subMsgs + } + } + + w.logger.Debug("Ack message hashes", zap.Any("ackHashes", ackHashes)) + w.logger.Debug("Missed message hashes", zap.Any("missedHashes", missedHashes)) + + for _, hash := range ackHashes { + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeSent, + }) + } + + for _, hash := range missedHashes { + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeExpired, + }) + } +} + func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, requestID []byte, opts []legacy_store.HistoryRequestOption) (*legacy_store.Result, error) { opts = append(opts, legacy_store.WithRequestID(requestID)) @@ -1263,6 +1373,8 @@ func (w *Waku) Start() error { go w.broadcast() + go w.checkIfMessagesStored() + // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` w.wg.Add(1) go w.seedBootnodesForDiscV5() @@ -1447,6 +1559,17 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } + if e.MsgType == common.SendMessageType && !(*e.Envelope.Message().Ephemeral) { + w.sendMsgIDsMu.Lock() + subMsgs, ok := w.sendMsgIDs[e.PubsubTopic] + if !ok { + subMsgs = make(map[gethcommon.Hash]uint32) + } + subMsgs[e.Hash()] = e.Sent + w.sendMsgIDs[e.PubsubTopic] = subMsgs + w.sendMsgIDsMu.Unlock() + } + matched := w.filters.NotifyWatchers(e) // If not matched we remove it