From d9ab648ab5fac99fdfc7dd44235d759cdf354117 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 4 Dec 2024 17:14:26 +0700 Subject: [PATCH 1/4] feat_: phase-1 of use single content-topic for all community chats --- eth-node/types/rpc.go | 27 ++++++++------- protocol/common/message_sender.go | 12 ++++--- protocol/common/raw_message.go | 1 + protocol/communities/community.go | 7 ++++ protocol/communities/manager.go | 14 ++++++++ protocol/communities/manager_archive.go | 4 +++ ...nities_messenger_token_permissions_test.go | 3 +- protocol/messenger.go | 12 ++++++- protocol/messenger_communities.go | 11 +++++- protocol/transport/filters_manager.go | 34 +++++++++++++------ protocol/transport/transport.go | 13 ++++--- 11 files changed, 101 insertions(+), 37 deletions(-) diff --git a/eth-node/types/rpc.go b/eth-node/types/rpc.go index 6105f3b0160..4af4b967a18 100644 --- a/eth-node/types/rpc.go +++ b/eth-node/types/rpc.go @@ -6,19 +6,20 @@ import ( // NewMessage represents a new whisper message that is posted through the RPC. type NewMessage struct { - SymKeyID string `json:"symKeyID"` - PublicKey []byte `json:"pubKey"` - SigID string `json:"sig"` - TTL uint32 `json:"ttl"` - PubsubTopic string `json:"pubsubTopic"` - Topic TopicType `json:"topic"` - Payload []byte `json:"payload"` - Padding []byte `json:"padding"` - PowTime uint32 `json:"powTime"` - PowTarget float64 `json:"powTarget"` - TargetPeer string `json:"targetPeer"` - Ephemeral bool `json:"ephemeral"` - Priority *int `json:"priority"` + SymKeyID string `json:"symKeyID"` + PublicKey []byte `json:"pubKey"` + SigID string `json:"sig"` + TTL uint32 `json:"ttl"` + PubsubTopic string `json:"pubsubTopic"` + Topic TopicType `json:"topic"` + Payload []byte `json:"payload"` + Padding []byte `json:"padding"` + PowTime uint32 `json:"powTime"` + PowTarget float64 `json:"powTarget"` + TargetPeer string `json:"targetPeer"` + Ephemeral bool `json:"ephemeral"` + Priority *int `json:"priority"` + ContentTopicOverride string `json:"contentTopicOverride"` } // Message is the RPC representation of a whisper message. diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 20e9ab54461..9662258e7e4 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes } newMessage := &types.NewMessage{ - TTL: whisperTTL, - Payload: payload, - PowTarget: calculatePoW(payload), - PowTime: whisperPoWTime, - PubsubTopic: rawMessage.PubsubTopic, + TTL: whisperTTL, + Payload: payload, + PowTarget: calculatePoW(payload), + PowTime: whisperPoWTime, + PubsubTopic: rawMessage.PubsubTopic, + ContentTopicOverride: rawMessage.ContentTopicOverride, } if rawMessage.BeforeDispatch != nil { @@ -765,6 +766,7 @@ func (s *MessageSender) SendPublic( newMessage.Ephemeral = rawMessage.Ephemeral newMessage.PubsubTopic = rawMessage.PubsubTopic newMessage.Priority = rawMessage.Priority + newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go index 687d3b78a9b..9c69801d4fe 100644 --- a/protocol/common/raw_message.go +++ b/protocol/common/raw_message.go @@ -84,4 +84,5 @@ type RawMessage struct { ResendType ResendType ResendMethod ResendMethod Priority *MessagePriority + ContentTopicOverride string } diff --git a/protocol/communities/community.go b/protocol/communities/community.go index 4e6a865f7ec..c7c82916dde 100644 --- a/protocol/communities/community.go +++ b/protocol/communities/community.go @@ -1568,6 +1568,13 @@ func (o *Community) setPrivateKey(pk *ecdsa.PrivateKey) { } } +func (o *Community) UniversalChatID() string { + // Using Member updates channelID as chatID to act as a universal content-topic for all chats in the community as explained here https://forum.vac.dev/t/status-communities-review-and-proposed-usage-of-waku-content-topics/335 + // This is to match filter criteria of community with the content-topic usage. + // This specific topic is chosen as existing users before the change are already subscribed to this and will not get affected by it. + return o.MemberUpdateChannelID() +} + func (o *Community) SetResendAccountsClock(clock uint64) { o.config.CommunityDescription.ResendAccountsClock = clock } diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index bce788afb68..7449b6fa81c 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -4277,6 +4277,20 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) { return chatIDs, nil } +func (m *Manager) GetOwnedCommunitiesUniversalChatIDs() (map[string]bool, error) { + ownedCommunities, err := m.Controlled() + if err != nil { + return nil, err + } + chatIDs := make(map[string]bool) + for _, c := range ownedCommunities { + if c.Joined() { + chatIDs[c.UniversalChatID()] = true + } + } + return chatIDs, nil +} + func (m *Manager) StoreWakuMessage(message *types.Message) error { return m.persistence.SaveWakuMessage(message) } diff --git a/protocol/communities/manager_archive.go b/protocol/communities/manager_archive.go index 4b62c99edc2..311aca2cb39 100644 --- a/protocol/communities/manager_archive.go +++ b/protocol/communities/manager_archive.go @@ -354,6 +354,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community, m.logger.Error("failed to get community chat topics ", zap.Error(err)) continue } + // adding the content-topic used for member updates. + // since member updates would not be too frequent i.e only addition/deletion would add a new message, + // this shouldn't cause too much increase in size of archive generated. + topics = append(topics, m.transport.FilterByChatID(community.MemberUpdateChannelID()).ContentTopic) ts := time.Now().Unix() to := time.Unix(ts, 0) diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index bbf05984c67..2e15dd6eae3 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe startDate := messageDate.Add(-time.Minute) endDate := messageDate.Add(time.Minute) topic := types.BytesToTopic(transport.ToTopic(chat.ID)) - topics := []types.TopicType{topic} + communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.MemberUpdateChannelID())) + topics := []types.TopicType{topic, communityCommonTopic} torrentConfig := params.TorrentConfig{ Enabled: true, diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025d..7eb6e79e2e2 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2016,7 +2016,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe ) return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID) } - + //setting content-topic over-ride for community messages to use memberUpdatesChannelID + rawMessage.ContentTopicOverride = community.MemberUpdateChannelID() logger.Debug("sending community chat message", zap.String("chatName", chat.Name)) isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID) if err != nil { @@ -3338,6 +3339,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Info("failed to retrieve admin communities", zap.Error(err)) } + //fetch universal chatIDs as well. + controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs() + if err != nil { + logger.Info("failed to retrieve admin communities", zap.Error(err)) + } + for chatID, flag := range controlledCommunitiesUniversalChatIDs { + controlledCommunitiesChatIDs[chatID] = flag + } + iterator := m.retrievedMessagesIteratorFactory(chatWithMessages) for iterator.HasNext() { filter, messages := iterator.Next() diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index b6e53ccc719..52ee06c8486 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -2715,7 +2715,12 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats())) publicFiltersToInit = append(publicFiltersToInit, defaultFilters...) - + for _, filter := range defaultFilters { + _, err := m.transport.RemoveFilterByChatID(filter.ChatID) + if err != nil { + return err + } + } for chatID := range community.Chats() { communityChatID := community.IDString() + chatID _, err := m.transport.RemoveFilterByChatID(communityChatID) @@ -3951,6 +3956,10 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community for _, filter := range filters { topics = append(topics, filter.ContentTopic) } + // adding the content-topic used for member updates. + // since member updates would not be too frequent i.e only addition/deletion would add a new message, + // this shouldn't cause too much increase in size of archive generated. + filters = append(filters, m.transport.FilterByChatID(c.MemberUpdateChannelID())) // First we need to know the timestamp of the latest waku message // we've received for this community, so we can request messages we've diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 5393d63bf78..3ac990508b0 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -99,7 +99,7 @@ func (f *FiltersManager) Init( // Add public, one-to-one and negotiated filters. for _, fi := range filtersToInit { - _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) + _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID) if err != nil { return nil, err } @@ -123,15 +123,16 @@ func (f *FiltersManager) Init( } type FiltersToInitialize struct { - ChatID string - PubsubTopic string + ChatID string + PubsubTopic string + ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager. } func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, pf := range publicFiltersToInit { - f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) + f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID) if err != nil { return nil, err } @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, } keyString := hex.EncodeToString(secret.Key) - filter, err := f.addSymmetric(keyString, "") + filter, err := f.addSymmetric(keyString, "", "") if err != nil { f.logger.Debug("could not register negotiated topic", zap.Error(err)) return nil, err @@ -534,11 +535,16 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { } // LoadPublic adds a filter for a public chat. -func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { +func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) { f.mutex.Lock() defer f.mutex.Unlock() - if chat, ok := f.filters[chatID]; ok { + chatIDToLoad := chatID + if contentTopicID != "" { + chatIDToLoad = contentTopicID + } + + if chat, ok := f.filters[chatIDToLoad]; ok { if chat.PubsubTopic != pubsubTopic { f.logger.Debug("updating pubsub topic for filter", zap.String("chatID", chatID), @@ -547,13 +553,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, zap.String("newTopic", pubsubTopic), ) chat.PubsubTopic = pubsubTopic - f.filters[chatID] = chat + f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification? } return chat, nil } - filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic) + filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID) if err != nil { f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -592,7 +598,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro return f.filters[chatID], nil } - contactCodeFilter, err := f.addSymmetric(chatID, "") + contactCodeFilter, err := f.addSymmetric(chatID, "", "") if err != nil { f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -615,7 +621,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro } // addSymmetric adds a symmetric key filter -func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) { +func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) { var symKeyID string var err error @@ -644,6 +650,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi } } + if contentTopicID != "" { + //add receive filter for the single default contentTopic for all community chats + topic = ToTopic(contentTopicID) + topics = append(topics, topic) + } + id, err := f.service.Subscribe(&types.SubscriptionOptions{ SymKeyID: symKeyID, PoW: minPow, diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 0f8df437c39..cd59cced193 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, "") + return t.filters.LoadPublic(chatID, "", "") } func (t *Transport) LeavePublic(chatID string) error { @@ -223,6 +223,8 @@ func (t *Transport) GetStats() types.StatsSummary { return t.waku.GetStats() } +// With change in filter used for communities, messages are indexed here with common filter and not their own chatID filter. +// The caller should not use chatID from the filter to determine chatID of the message, rather should dervice it from messaage itself. func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { result := make(map[Filter][]*types.Message) logger := t.logger.With(zap.String("site", "retrieveRawAll")) @@ -276,16 +278,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { // SendPublic sends a new message using the Whisper service. // For public filters, chat name is used as an ID as well as // a topic. +// In case of communities a single topic is used to send all messages. func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) { if err := t.addSig(newMessage); err != nil { return nil, err } - - filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic) + //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. + filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride) if err != nil { return nil, err } - newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic newMessage.PubsubTopic = filter.PubsubTopic @@ -362,7 +364,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. } // We load the filter to make sure we can post on it - filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic) + //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride) if err != nil { return nil, err } From afd106a8ec99418fc916f5a90758f94ad7af264a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 9 Dec 2024 14:54:08 +0530 Subject: [PATCH 2/4] chore_: apply suggestions from code review Co-authored-by: osmaczko <33099791+osmaczko@users.noreply.github.com> --- protocol/communities/manager_archive.go | 2 +- protocol/communities_messenger_token_permissions_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/communities/manager_archive.go b/protocol/communities/manager_archive.go index 311aca2cb39..57daa2aa92e 100644 --- a/protocol/communities/manager_archive.go +++ b/protocol/communities/manager_archive.go @@ -357,7 +357,7 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community, // adding the content-topic used for member updates. // since member updates would not be too frequent i.e only addition/deletion would add a new message, // this shouldn't cause too much increase in size of archive generated. - topics = append(topics, m.transport.FilterByChatID(community.MemberUpdateChannelID()).ContentTopic) + topics = append(topics, m.transport.FilterByChatID(community.UniversalChatID()).ContentTopic) ts := time.Now().Unix() to := time.Unix(ts, 0) diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 2e15dd6eae3..9a226603bfc 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -2152,7 +2152,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe startDate := messageDate.Add(-time.Minute) endDate := messageDate.Add(time.Minute) topic := types.BytesToTopic(transport.ToTopic(chat.ID)) - communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.MemberUpdateChannelID())) + communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.UniversalChatID())) topics := []types.TopicType{topic, communityCommonTopic} torrentConfig := params.TorrentConfig{ From 398c28c9c728886fd8ea18895ab2b20f59c222de Mon Sep 17 00:00:00 2001 From: osmaczko <33099791+osmaczko@users.noreply.github.com> Date: Wed, 11 Dec 2024 07:44:37 +0100 Subject: [PATCH 3/4] chore_: use single content topic for community chats (#6179) --- eth-node/types/rpc.go | 27 ++++++++++----------- protocol/common/message_sender.go | 14 +++++------ protocol/common/raw_message.go | 2 +- protocol/messenger.go | 14 +++++++---- protocol/transport/filters_manager.go | 34 +++++++++------------------ protocol/transport/transport.go | 13 ++++------ 6 files changed, 46 insertions(+), 58 deletions(-) diff --git a/eth-node/types/rpc.go b/eth-node/types/rpc.go index 4af4b967a18..6105f3b0160 100644 --- a/eth-node/types/rpc.go +++ b/eth-node/types/rpc.go @@ -6,20 +6,19 @@ import ( // NewMessage represents a new whisper message that is posted through the RPC. type NewMessage struct { - SymKeyID string `json:"symKeyID"` - PublicKey []byte `json:"pubKey"` - SigID string `json:"sig"` - TTL uint32 `json:"ttl"` - PubsubTopic string `json:"pubsubTopic"` - Topic TopicType `json:"topic"` - Payload []byte `json:"payload"` - Padding []byte `json:"padding"` - PowTime uint32 `json:"powTime"` - PowTarget float64 `json:"powTarget"` - TargetPeer string `json:"targetPeer"` - Ephemeral bool `json:"ephemeral"` - Priority *int `json:"priority"` - ContentTopicOverride string `json:"contentTopicOverride"` + SymKeyID string `json:"symKeyID"` + PublicKey []byte `json:"pubKey"` + SigID string `json:"sig"` + TTL uint32 `json:"ttl"` + PubsubTopic string `json:"pubsubTopic"` + Topic TopicType `json:"topic"` + Payload []byte `json:"payload"` + Padding []byte `json:"padding"` + PowTime uint32 `json:"powTime"` + PowTarget float64 `json:"powTarget"` + TargetPeer string `json:"targetPeer"` + Ephemeral bool `json:"ephemeral"` + Priority *int `json:"priority"` } // Message is the RPC representation of a whisper message. diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 9662258e7e4..3ee39f2b838 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -660,12 +660,11 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes } newMessage := &types.NewMessage{ - TTL: whisperTTL, - Payload: payload, - PowTarget: calculatePoW(payload), - PowTime: whisperPoWTime, - PubsubTopic: rawMessage.PubsubTopic, - ContentTopicOverride: rawMessage.ContentTopicOverride, + TTL: whisperTTL, + Payload: payload, + PowTarget: calculatePoW(payload), + PowTime: whisperPoWTime, + PubsubTopic: rawMessage.PubsubTopic, } if rawMessage.BeforeDispatch != nil { @@ -684,7 +683,7 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes hashes := make([][]byte, 0, len(newMessages)) for _, newMessage := range newMessages { - hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) + hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.ContentTopic) if err != nil { return nil, nil, err } @@ -766,7 +765,6 @@ func (s *MessageSender) SendPublic( newMessage.Ephemeral = rawMessage.Ephemeral newMessage.PubsubTopic = rawMessage.PubsubTopic newMessage.Priority = rawMessage.Priority - newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go index 9c69801d4fe..8a3bcce4b70 100644 --- a/protocol/common/raw_message.go +++ b/protocol/common/raw_message.go @@ -80,9 +80,9 @@ type RawMessage struct { Ephemeral bool BeforeDispatch func(*RawMessage) error HashRatchetGroupID []byte + ContentTopic string PubsubTopic string ResendType ResendType ResendMethod ResendMethod Priority *MessagePriority - ContentTopicOverride string } diff --git a/protocol/messenger.go b/protocol/messenger.go index 7eb6e79e2e2..dae4a2df77b 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1957,6 +1957,10 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec co } func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMessage) (common.RawMessage, error) { + if rawMessage.ContentTopic == "" { + rawMessage.ContentTopic = rawMessage.LocalChatID + } + var err error var id []byte logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", rawMessage.LocalChatID)) @@ -1991,7 +1995,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe case ChatTypePublic, ChatTypeProfile: logger.Debug("sending public message", zap.String("chatName", chat.Name)) - id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage) + id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage) if err != nil { return rawMessage, err } @@ -2001,6 +2005,9 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe if err != nil { return rawMessage, err } + // Use a single content-topic for all community chats. + // Reasoning: https://github.com/status-im/status-go/pull/5864 + rawMessage.ContentTopic = community.UniversalChatID() rawMessage.PubsubTopic = community.PubsubTopic() canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), rawMessage.MessageType) @@ -2016,8 +2023,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe ) return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID) } - //setting content-topic over-ride for community messages to use memberUpdatesChannelID - rawMessage.ContentTopicOverride = community.MemberUpdateChannelID() + logger.Debug("sending community chat message", zap.String("chatName", chat.Name)) isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID) if err != nil { @@ -2029,7 +2035,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe } isEncrypted := isCommunityEncrypted || isChannelEncrypted if !isEncrypted { - id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage) + id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage) if err != nil { return rawMessage, err } diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 3ac990508b0..5393d63bf78 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -99,7 +99,7 @@ func (f *FiltersManager) Init( // Add public, one-to-one and negotiated filters. for _, fi := range filtersToInit { - _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID) + _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) if err != nil { return nil, err } @@ -123,16 +123,15 @@ func (f *FiltersManager) Init( } type FiltersToInitialize struct { - ChatID string - PubsubTopic string - ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager. + ChatID string + PubsubTopic string } func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, pf := range publicFiltersToInit { - f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID) + f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) if err != nil { return nil, err } @@ -456,7 +455,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, } keyString := hex.EncodeToString(secret.Key) - filter, err := f.addSymmetric(keyString, "", "") + filter, err := f.addSymmetric(keyString, "") if err != nil { f.logger.Debug("could not register negotiated topic", zap.Error(err)) return nil, err @@ -535,16 +534,11 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { } // LoadPublic adds a filter for a public chat. -func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) { +func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { f.mutex.Lock() defer f.mutex.Unlock() - chatIDToLoad := chatID - if contentTopicID != "" { - chatIDToLoad = contentTopicID - } - - if chat, ok := f.filters[chatIDToLoad]; ok { + if chat, ok := f.filters[chatID]; ok { if chat.PubsubTopic != pubsubTopic { f.logger.Debug("updating pubsub topic for filter", zap.String("chatID", chatID), @@ -553,13 +547,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTo zap.String("newTopic", pubsubTopic), ) chat.PubsubTopic = pubsubTopic - f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification? + f.filters[chatID] = chat } return chat, nil } - filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID) + filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic) if err != nil { f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -598,7 +592,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro return f.filters[chatID], nil } - contactCodeFilter, err := f.addSymmetric(chatID, "", "") + contactCodeFilter, err := f.addSymmetric(chatID, "") if err != nil { f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -621,7 +615,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro } // addSymmetric adds a symmetric key filter -func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) { +func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) { var symKeyID string var err error @@ -650,12 +644,6 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, content } } - if contentTopicID != "" { - //add receive filter for the single default contentTopic for all community chats - topic = ToTopic(contentTopicID) - topics = append(topics, topic) - } - id, err := f.service.Subscribe(&types.SubscriptionOptions{ SymKeyID: symKeyID, PoW: minPow, diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index cd59cced193..0f8df437c39 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, "", "") + return t.filters.LoadPublic(chatID, "") } func (t *Transport) LeavePublic(chatID string) error { @@ -223,8 +223,6 @@ func (t *Transport) GetStats() types.StatsSummary { return t.waku.GetStats() } -// With change in filter used for communities, messages are indexed here with common filter and not their own chatID filter. -// The caller should not use chatID from the filter to determine chatID of the message, rather should dervice it from messaage itself. func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { result := make(map[Filter][]*types.Message) logger := t.logger.With(zap.String("site", "retrieveRawAll")) @@ -278,16 +276,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { // SendPublic sends a new message using the Whisper service. // For public filters, chat name is used as an ID as well as // a topic. -// In case of communities a single topic is used to send all messages. func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) { if err := t.addSig(newMessage); err != nil { return nil, err } - //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. - filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride) + + filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic) if err != nil { return nil, err } + newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic newMessage.PubsubTopic = filter.PubsubTopic @@ -364,8 +362,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. } // We load the filter to make sure we can post on it - //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. - filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride) + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic) if err != nil { return nil, err } From a8951fd5e737aa782c5f003b5a067ee02a69562c Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 11 Dec 2024 14:04:16 +0530 Subject: [PATCH 4/4] chore_: apply review comments --- protocol/messenger.go | 4 ++-- protocol/messenger_communities.go | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/protocol/messenger.go b/protocol/messenger.go index dae4a2df77b..6b5f5376a00 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3345,10 +3345,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Info("failed to retrieve admin communities", zap.Error(err)) } - //fetch universal chatIDs as well. + // fetch universal chatIDs as well. controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs() if err != nil { - logger.Info("failed to retrieve admin communities", zap.Error(err)) + logger.Info("failed to retrieve controlled communities", zap.Error(err)) } for chatID, flag := range controlledCommunitiesUniversalChatIDs { controlledCommunitiesChatIDs[chatID] = flag diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 52ee06c8486..f5bed916f91 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3956,10 +3956,8 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community for _, filter := range filters { topics = append(topics, filter.ContentTopic) } - // adding the content-topic used for member updates. - // since member updates would not be too frequent i.e only addition/deletion would add a new message, - // this shouldn't cause too much increase in size of archive generated. - filters = append(filters, m.transport.FilterByChatID(c.MemberUpdateChannelID())) + + filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID())) // First we need to know the timestamp of the latest waku message // we've received for this community, so we can request messages we've