Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WPB-3916: Filtering out duplicate members when sending defederation notifications #3515

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/duplicate-member-notifications
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Defederation notifications, federation.delete and federation.connectionRemoved, now deduplicate the user list so that we don't send them more notifications than required.
108 changes: 57 additions & 51 deletions integration/test/Test/Defederation.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module Test.Defederation where

import API.BrigInternal
-- import API.BrigInternal qualified as Internal
-- import API.Galley (defProteus, getConversation, postConversation, qualifiedUsers)
-- import Control.Applicative
-- import Data.Aeson qualified as Aeson
import API.BrigInternal qualified as Internal
import API.Galley (defProteus, getConversation, postConversation, qualifiedUsers)
import Control.Applicative
import Data.Aeson qualified as Aeson
import GHC.Stack
import SetupHelpers
import Testlib.Prelude
Expand All @@ -26,54 +26,60 @@ testDefederationRemoteNotifications = do
void $ deleteFedConn OwnDomain remoteDomain
void $ awaitNMatches 2 3 (\n -> nPayload n %. "type" `isEqual` "federation.connectionRemoved") ws

-- FUTUREWORK: temporarily disabled, enable when fixed on CI
-- testDefederationNonFullyConnectedGraph :: HasCallStack => App ()
-- testDefederationNonFullyConnectedGraph = do
-- let setFederationConfig =
-- setField "optSettings.setFederationStrategy" "allowDynamic"
-- >=> removeField "optSettings.setFederationDomainConfigs"
-- >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1)
-- startDynamicBackends
-- [ def {dbBrig = setFederationConfig},
-- def {dbBrig = setFederationConfig},
-- def {dbBrig = setFederationConfig}
-- ]
-- $ \dynDomains -> do
-- domains@[domainA, domainB, domainC] <- pure dynDomains
-- connectAllDomainsAndWaitToSync 1 domains
-- [uA, uB, uC] <- createAndConnectUsers [domainA, domainB, domainC]
-- -- create group conversation owned by domainA with users from domainB and domainC
-- convId <- bindResponse (postConversation uA (defProteus {qualifiedUsers = [uB, uC]})) $ \r -> do
-- r.status `shouldMatchInt` 201
-- r.json %. "qualified_id"
testDefederationNonFullyConnectedGraph :: HasCallStack => App ()
testDefederationNonFullyConnectedGraph = do
let setFederationConfig =
setField "optSettings.setFederationStrategy" "allowDynamic"
>=> removeField "optSettings.setFederationDomainConfigs"
>=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1)
startDynamicBackends
[ def {dbBrig = setFederationConfig},
def {dbBrig = setFederationConfig},
def {dbBrig = setFederationConfig}
]
$ \dynDomains -> do
domains@[domainA, domainB, domainC] <- pure dynDomains
connectAllDomainsAndWaitToSync 1 domains

-- -- check conversation exists on all backends
-- for [uB, uC] objQidObject >>= checkConv convId uA
-- create a few extra users and connections to make sure that does not lead to any extra `connectionRemoved` notifications
[uA, uA2, _, _, uB, uC] <- createAndConnectUsers [domainA, domainA, domainA, domainA, domainB, domainC] >>= traverse objQidObject

-- -- one of the 2 non-conversation-owning domains (domainB and domainC)
-- -- defederate from the other non-conversation-owning domain
-- void $ Internal.deleteFedConn domainB domainC
-- create group conversation owned by domainA with users from domainB and domainC
convId <- bindResponse (postConversation uA (defProteus {qualifiedUsers = [uA2, uB, uC]})) $ \r -> do
r.status `shouldMatchInt` 201
r.json %. "qualified_id"

-- -- assert that clients from domainA receive federation.connectionRemoved events
-- -- Notifications being delivered at least n times is what we want to ensure here,
-- -- however they are often delivered more than once, so check that it doesn't happen
-- -- hundreds of times.
-- let isConnectionRemoved n = do
-- correctType <- nPayload n %. "type" `isEqual` "federation.connectionRemoved"
-- if correctType
-- then do
-- domsV <- nPayload n %. "domains" & asList
-- domsStr <- for domsV asString <&> sort
-- pure $ domsStr == sort [domainB, domainC]
-- else pure False
-- void $ awaitNToMMatches 2 10 20 isConnectionRemoved wsA
-- check conversation exists on all backends
checkConv convId uA [uB, uC, uA2]
checkConv convId uB [uA, uC, uA2]
checkConv convId uC [uA, uB, uA2]

-- retryT $ checkConv convId uA []
-- where
-- checkConv :: Value -> Value -> [Value] -> App ()
-- checkConv convId user expectedOtherMembers = do
-- bindResponse (getConversation user convId) $ \r -> do
-- r.status `shouldMatchInt` 200
-- members <- r.json %. "members.others" & asList
-- qIds <- for members (\m -> m %. "qualified_id")
-- qIds `shouldMatchSet` expectedOtherMembers
withWebSocket uA $ \wsA -> do
-- one of the 2 non-conversation-owning domains (domainB and domainC)
-- defederate from the other non-conversation-owning domain
void $ Internal.deleteFedConn domainB domainC

-- assert that clients from domainA receive federation.connectionRemoved events
-- Notifications being delivered exactly twice
void $ awaitNMatches 2 20 (isConnectionRemoved [domainB, domainC]) wsA

-- remote members should be removed from local conversation eventually
retryT $ checkConv convId uA [uA2]
where
isConnectionRemoved :: [String] -> Value -> App Bool
isConnectionRemoved domains n = do
correctType <- nPayload n %. "type" `isEqual` "federation.connectionRemoved"
if correctType
then do
domsV <- nPayload n %. "domains" & asList
domsStr <- for domsV asString <&> sort
pure $ domsStr == sort domains
else pure False

checkConv :: Value -> Value -> [Value] -> App ()
checkConv convId user expectedOtherMembers = do
bindResponse (getConversation user convId) $ \r -> do
r.status `shouldMatchInt` 200
members <- r.json %. "members.others" & asList
qIds <- for members (\m -> m %. "qualified_id")
qIds `shouldMatchSet` expectedOtherMembers
40 changes: 31 additions & 9 deletions services/galley/src/Galley/Intra/Effects.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import Cassandra (ClientState, Consistency (LocalQuorum), Page (hasMore, nextPag
import Control.Lens ((.~))
import Data.Id (ProviderId, ServiceId, UserId)
import Data.Range (Range (fromRange))
import Data.Set qualified as Set
import Galley.API.Error
import Galley.API.Util (localBotsAndUsers)
import Galley.Cassandra.Conversation.Members (toMember)
Expand Down Expand Up @@ -141,6 +142,13 @@ interpretGundeckAccess = interpret $ \case
Push ps -> embedApp $ G.push ps
PushSlowly ps -> embedApp $ G.pushSlowly ps

-- FUTUREWORK:
-- This functions uses an in-memory set for tracking UserIds that we have already
-- sent notifications to. This set will only grow throughout the lifttime of this
-- function, and may cause memory & performance problems with millions of users.
-- How we are tracking which users have already been sent 0, 1, or 2 defederation
-- messages should be rethought to be more fault tollerant, e.g. this method doesn't
-- handle the server crashing and restarting.
interpretDefederationNotifications ::
forall r a.
( Member (Embed IO) r,
Expand All @@ -154,34 +162,48 @@ interpretDefederationNotifications ::
interpretDefederationNotifications = interpret $ \case
SendDefederationNotifications domain ->
getPage
>>= void . sendNotificationPage (Federation.FederationDelete domain)
>>= void . sendNotificationPage mempty (Federation.FederationDelete domain)
SendOnConnectionRemovedNotifications domainA domainB ->
getPage
>>= void . sendNotificationPage (Federation.FederationConnectionRemoved (domainA, domainB))
>>= void . sendNotificationPage mempty (Federation.FederationConnectionRemoved (domainA, domainB))
where
getPage :: Sem r (Page PageType)
getPage = do
maxPage <- inputs (fromRange . currentFanoutLimit . _options) -- This is based on the limits in removeIfLargeFanout
-- selectAllMembers will return duplicate members when they are in more than one chat
-- however we need the full row to build out the bot members to send notifications
-- to them. We have to do the duplicate filtering here.
embedClient $ paginate selectAllMembers (paramsP LocalQuorum () maxPage)
pushEvents :: Federation.Event -> [LocalMember] -> Sem r ()
pushEvents eventData results = do
pushEvents :: Set UserId -> Federation.Event -> [LocalMember] -> Sem r (Set UserId)
pushEvents seenRecipients eventData results = do
let (bots, mems) = localBotsAndUsers results
recipients = Intra.recipient <$> mems
event = Intra.FederationEvent eventData
for_ (Intra.newPush ListComplete Nothing event recipients) $ \p -> do
filteredRecipients =
-- Deduplicate by UserId the page of recipients that we are working on
nubBy (\a b -> a._recipientUserId == b._recipientUserId)
-- Sort the remaining recipients by their IDs
$
sortBy (\a b -> a._recipientUserId `compare` b._recipientUserId)
-- Filter out any recipient that we have already seen in a previous page
$
filter (\r -> r._recipientUserId `notElem` seenRecipients) recipients
for_ (Intra.newPush ListComplete Nothing event filteredRecipients) $ \p -> do
-- Futurework: Transient or not?
-- RouteAny is used as it will wake up mobile clients
-- and notify them of the changes to federation state.
push1 $ p & Intra.pushRoute .~ Intra.RouteAny
deliverAsync (bots `zip` repeat (G.pushEventJson event))
sendNotificationPage :: Federation.Event -> Page PageType -> Sem r ()
sendNotificationPage eventData page = do
-- Add the users to the set of users we've sent messages to.
pure $ seenRecipients <> Set.fromList ((._recipientUserId) <$> filteredRecipients)
sendNotificationPage :: Set UserId -> Federation.Event -> Page PageType -> Sem r ()
sendNotificationPage seenRecipients eventData page = do
let res = result page
mems = mapMaybe toMember res
pushEvents eventData mems
seenRecipients' <- pushEvents seenRecipients eventData mems
when (hasMore page) $ do
page' <- embedClient $ nextPage page
sendNotificationPage eventData page'
sendNotificationPage seenRecipients' eventData page'

type PageType =
( UserId,
Expand Down
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Intra/Push/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ data RecipientBy user = Recipient
{ _recipientUserId :: user,
_recipientClients :: RecipientClients
}
deriving stock (Functor, Foldable, Traversable, Show)
deriving stock (Functor, Foldable, Traversable, Show, Ord, Eq)

makeLenses ''RecipientBy

Expand Down