From 6931b532cafd0df0e15b0620c3001018a9e24c7c Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 5 Jun 2024 11:15:25 +0800 Subject: [PATCH] chore_: add test for waku confirm message sent. --- api/messenger_raw_message_resend_test.go | 2 +- wakuv2/waku.go | 3 +- wakuv2/waku_test.go | 59 ++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/api/messenger_raw_message_resend_test.go b/api/messenger_raw_message_resend_test.go index 750de88bae5..a9db1eab3ec 100644 --- a/api/messenger_raw_message_resend_test.go +++ b/api/messenger_raw_message_resend_test.go @@ -211,7 +211,7 @@ func (s *MessengerRawMessageResendTest) TestMessageSent() { rawMessage, err := s.bobMessenger.RawMessageByID(ids[0]) s.Require().NoError(err) s.Require().NotNil(rawMessage) - if rawMessage.Sent { + if rawMessage.SendCount > 0 { return nil } return errors.New("raw message should be sent finally") diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 02b324a8911..bb648b97c18 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1541,7 +1541,8 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } - if e.MsgType == common.SendMessageType && !(*e.Envelope.Message().Ephemeral) { + ephemeral := e.Envelope.Message().Ephemeral + if e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) { w.sendMsgIDsMu.Lock() subMsgs, ok := w.sendMsgIDs[e.PubsubTopic] if !ok { diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 88d53b0433a..8393a0618d4 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v3" + ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" ethdnsdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc" @@ -517,3 +518,61 @@ func waitForEnvelope(t *testing.T, contentTopic string, envCh chan common.Envelo } } } + +func TestConfirmMessageDelivered(t *testing.T) { + aliceConfig := &Config{} + aliceNode, err := New(nil, "", aliceConfig, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, aliceNode.Start()) + + bobConfig := &Config{} + bobNode, err := New(nil, "", bobConfig, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, bobNode.Start()) + + addrs := aliceNode.ListenAddresses() + require.Greater(t, len(addrs), 0) + _, err = bobNode.AddRelayPeer(addrs[0]) + require.NoError(t, err) + err = bobNode.DialPeer(addrs[0]) + require.NoError(t, err) + + filter := &common.Filter{ + Messages: common.NewMemoryMessageStore(), + ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}), + } + + _, err = aliceNode.Subscribe(filter) + require.NoError(t, err) + + msgTimestamp := aliceNode.timestamp() + contentTopic := maps.Keys(filter.ContentTopics)[0] + + _, err = aliceNode.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: contentTopic.ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + Ephemeral: proto.Bool(false), + }) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + messages := filter.Retrieve() + require.Len(t, messages, 1) + + require.Len(t, aliceNode.sendMsgIDs, 1) + for _, msgs := range aliceNode.sendMsgIDs { + require.Len(t, msgs, 1) + for hash := range msgs { + require.Equal(t, hash, messages[0].Hash()) + } + } + + aliceNode.ConfirmMessageDelivered([]ethcommon.Hash{messages[0].Hash()}) + require.Len(t, aliceNode.sendMsgIDs, 0) + + require.NoError(t, aliceNode.Stop()) + require.NoError(t, bobNode.Stop()) +}