From d78064f4c89964c345bf356ea3b7414132fa9414 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Mon, 3 Jun 2024 16:00:12 +0800 Subject: [PATCH] chore_: remove message ids from query queue after ack --- eth-node/bridge/geth/waku.go | 3 +++ eth-node/bridge/geth/wakuv2.go | 4 ++++ eth-node/types/waku.go | 3 +++ protocol/messenger_peersyncing.go | 2 ++ protocol/transport/transport.go | 12 ++++++++++++ wakuv2/waku.go | 15 +++++++++++++++ 6 files changed, 39 insertions(+) diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 42a222d885d..d13de226817 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -314,3 +314,6 @@ func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter { func (w *wakuFilterWrapper) ID() string { return w.id } + +func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) { +} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index e4227266fb6..568ef8c8aec 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -327,3 +327,7 @@ func GetWakuV2FilterFrom(f types.Filter) *wakucommon.Filter { func (w *wakuV2FilterWrapper) ID() string { return w.id } + +func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) { + w.waku.ConfirmMessageDelivered(hashes) +} diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index eba3cc0bf9d..693e1ad7ab6 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -178,4 +178,7 @@ type Waku interface { // ClearEnvelopesCache clears waku envelopes cache ClearEnvelopesCache() + + // ConfirmMessageDelivered updates a message has been delivered in waku + ConfirmMessageDelivered(hash []common.Hash) } diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index 2495f14b76e..718c34db2d4 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -43,6 +43,8 @@ func (m *Messenger) markDeliveredMessages(acks [][]byte) { m.logger.Debug("Can't set message status as delivered", zap.Error(err)) } + m.transport.ConfirmMessageDelivered(messageID) + //send signal to client that message status updated if m.config.messengerSignalsHandler != nil { message, err := m.persistence.MessageByID(messageID) diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index b3031101acf..0f62cb0110a 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -717,3 +717,15 @@ func (t *Transport) RemovePubsubTopicKey(topic string) error { } return nil } + +func (t *Transport) ConfirmMessageDelivered(messageID string) { + hashes, ok := t.envelopesMonitor.identifierHashes[messageID] + if !ok { + return + } + commHashes := make([]common.Hash, len(hashes)) + for i, h := range hashes { + commHashes[i] = common.BytesToHash(h[:]) + } + t.waku.ConfirmMessageDelivered(commHashes) +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index e39da7297c5..dd78dc6fa57 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1042,6 +1042,21 @@ func (w *Waku) checkIfMessagesStored() { } } +func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { + w.sendMsgIDsMu.Lock() + defer w.sendMsgIDsMu.Unlock() + for pubsubTopic, subMsgs := range w.sendMsgIDs { + for _, hash := range hashes { + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(w.sendMsgIDs, pubsubTopic) + } else { + w.sendMsgIDs[pubsubTopic] = subMsgs + } + } + } +} + type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) {