Skip to content

Commit

Permalink
[WPB-183] Version federation API queue notifications (#3831)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Fischmann <mf@zerobuzz.net>
Co-authored-by: Paolo Capriotti <paolo@capriotti.io>
Co-authored-by: Akshay Mankar <akshay@wire.com>
  • Loading branch information
4 people committed Mar 7, 2024
1 parent 930db54 commit e6ee09f
Show file tree
Hide file tree
Showing 61 changed files with 1,224 additions and 733 deletions.
3 changes: 3 additions & 0 deletions changelog.d/6-federation/on-conversation-updated-async
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The on-conversation-updated notification is now queued instead of being sent directly. A new version of the notification has been introduced with a different JSON format for the body, mostly for testing purposes of the versioning system.

Since the notification is now sent asynchronously, some error conditions in case of unreachable backends cannot be triggered anymore.
2 changes: 2 additions & 0 deletions changelog.d/6-federation/wpb-183-versioned-async-b2b
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Versioning of backend to backend notifications. Notifications are now stored in "bundles" containing a serialised payload for each supported version. The background worker then dynamically selects the best version to use and sends only the notification corresponding to that version.

2 changes: 2 additions & 0 deletions deploy/dockerephemeral/federation-v0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ services:
networks:
- demo_wire
- coredns
extra_hosts:
- "host.docker.internal.:host-gateway"
ports:
- '127.0.0.1:21097:8080'
- '127.0.0.1:21098:8081'
Expand Down
14 changes: 14 additions & 0 deletions integration/test/Test/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -850,3 +850,17 @@ testGuestLinksExpired = do
liftIO $ threadDelay (1_100_000)
bindResponse (getJoinCodeConv tm k v) $ \resp -> do
resp.status `shouldMatchInt` 404

testConversationWithFedV0 :: HasCallStack => App ()
testConversationWithFedV0 = do
alice <- randomUser OwnDomain def
bob <- randomUser FedV0Domain def
withAPIVersion 4 $ connectTwoUsers alice bob

conv <-
postConversation alice (defProteus {qualifiedUsers = [bob]})
>>= getJSON 201

withWebSocket bob $ \ws -> do
void $ changeConversationName alice conv "foobar" >>= getJSON 200
void $ awaitMatch isConvNameChangeNotif ws
4 changes: 4 additions & 0 deletions integration/test/Testlib/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Testlib.Env where

import Control.Monad.Codensity
import Control.Monad.IO.Class
import Control.Monad.Reader
import Data.Default
import Data.Function ((&))
import Data.Functor
Expand Down Expand Up @@ -184,3 +185,6 @@ mkMLSState = Codensity $ \k ->
ciphersuite = def,
protocol = MLSProtocolMLS
}

withAPIVersion :: Int -> App a -> App a
withAPIVersion v = local $ \e -> e {defaultAPIVersion = v}
46 changes: 28 additions & 18 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ module Wire.API.Federation.API
HasUnsafeFedEndpoint,
fedClient,
fedQueueClient,
sendBundle,
fedClientIn,
unsafeFedClientIn,
module Wire.API.MakesFederatedCall,

-- * Re-exports
Component (..),
makeConversationUpdateBundle,
)
where

Expand All @@ -45,6 +47,7 @@ import Servant.Client.Core
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Cargohold
import Wire.API.Federation.API.Galley
import Wire.API.Federation.API.Util
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
Expand Down Expand Up @@ -88,21 +91,21 @@ fedClient ::
Client m api
fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall {k} (tag :: k).
( HasNotificationEndpoint tag,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent k),
ToJSON (Payload tag)
) =>
Payload tag ->
FedQueueClient (NotificationComponent k) ()
fedQueueClient payload = do
fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
(HasFedEndpoint comp api name, HasClient m api) =>
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)

sendBundle ::
KnownComponent c =>
PayloadBundle c ->
FedQueueClient c ()
sendBundle bundle = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.requestId env.originDomain payload
msg =
let msg =
newMsg
{ msgBody = encode notif,
{ msgBody = encode bundle,
msgDeliveryMode = Just (env.deliveryMode),
msgContentType = Just "application/json"
}
Expand All @@ -112,11 +115,18 @@ fedQueueClient payload = do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
(HasFedEndpoint comp api name, HasClient m api) =>
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)
fedQueueClient ::
forall {k} (tag :: k) c.
( HasNotificationEndpoint tag,
HasVersionRange tag,
HasFedPath tag,
KnownComponent (NotificationComponent k),
ToJSON (Payload tag),
c ~ NotificationComponent k
) =>
Payload tag ->
FedQueueClient c ()
fedQueueClient payload = sendBundle =<< makeBundle @tag payload

-- | Like 'fedClientIn', but doesn't propagate a 'CallsFed' constraint. Intended
-- to be used in test situations only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ import Wire.API.Conversation.Action
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Federation.Version
import Wire.API.MLS.SubConversation
import Wire.API.Message
import Wire.API.Routes.Version (From, Until)
import Wire.API.Util.Aeson
import Wire.Arbitrary

data GalleyNotificationTag
= OnClientRemovedTag
| OnMessageSentTag
| OnMLSMessageSentTag
| OnConversationUpdatedTagV0
| OnConversationUpdatedTag
| OnUserDeletedConversationsTag
deriving (Show, Eq, Generic, Bounded, Enum)
Expand All @@ -66,9 +69,16 @@ instance HasNotificationEndpoint 'OnMLSMessageSentTag where

-- used by the backend that owns a conversation to inform this backend of
-- changes to the conversation
instance HasNotificationEndpoint 'OnConversationUpdatedTagV0 where
type Payload 'OnConversationUpdatedTagV0 = ConversationUpdateV0
type NotificationPath 'OnConversationUpdatedTagV0 = "on-conversation-updated"
type NotificationVersionTag 'OnConversationUpdatedTagV0 = 'Just 'V0
type NotificationMods 'OnConversationUpdatedTagV0 = '[Until 'V1]

instance HasNotificationEndpoint 'OnConversationUpdatedTag where
type Payload 'OnConversationUpdatedTag = ConversationUpdate
type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated"
type NotificationMods 'OnConversationUpdatedTag = '[From 'V1]

instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where
type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification
Expand All @@ -79,6 +89,7 @@ type GalleyNotificationAPI =
NotificationFedEndpoint 'OnClientRemovedTag
:<|> NotificationFedEndpoint 'OnMessageSentTag
:<|> NotificationFedEndpoint 'OnMLSMessageSentTag
:<|> NotificationFedEndpoint 'OnConversationUpdatedTagV0
:<|> NotificationFedEndpoint 'OnConversationUpdatedTag
:<|> NotificationFedEndpoint 'OnUserDeletedConversationsTag

Expand Down Expand Up @@ -129,7 +140,7 @@ data RemoteMLSMessage = RemoteMLSMessage

instance ToSchema RemoteMLSMessage

data ConversationUpdate = ConversationUpdate
data ConversationUpdateV0 = ConversationUpdateV0
{ cuTime :: UTCTime,
cuOrigUserId :: Qualified UserId,
-- | The unqualified ID of the conversation where the update is happening.
Expand All @@ -147,12 +158,56 @@ data ConversationUpdate = ConversationUpdate
}
deriving (Eq, Show, Generic)

instance ToJSON ConversationUpdateV0

instance FromJSON ConversationUpdateV0

instance ToSchema ConversationUpdateV0

data ConversationUpdate = ConversationUpdate
{ time :: UTCTime,
origUserId :: Qualified UserId,
-- | The unqualified ID of the conversation where the update is happening.
-- The ID is local to the sender to prevent putting arbitrary domain that
-- is different than that of the backend making a conversation membership
-- update request.
convId :: ConvId,
-- | A list of users from the receiving backend that need to be sent
-- notifications about this change. This is required as we do not expect a
-- non-conversation owning backend to have an indexed mapping of
-- conversation to users.
alreadyPresentUsers :: [UserId],
-- | Information on the specific action that caused the update.
action :: SomeConversationAction
}
deriving (Eq, Show, Generic)

instance ToJSON ConversationUpdate

instance FromJSON ConversationUpdate

instance ToSchema ConversationUpdate

conversationUpdateToV0 :: ConversationUpdate -> ConversationUpdateV0
conversationUpdateToV0 cu =
ConversationUpdateV0
{ cuTime = cu.time,
cuOrigUserId = cu.origUserId,
cuConvId = cu.convId,
cuAlreadyPresentUsers = cu.alreadyPresentUsers,
cuAction = cu.action
}

conversationUpdateFromV0 :: ConversationUpdateV0 -> ConversationUpdate
conversationUpdateFromV0 cu =
ConversationUpdate
{ time = cu.cuTime,
origUserId = cu.cuOrigUserId,
convId = cu.cuConvId,
alreadyPresentUsers = cu.cuAlreadyPresentUsers,
action = cu.cuAction
}

type UserDeletedNotificationMaxConvs = 1000

data UserDeletedConversationsNotification = UserDeletedConversationsNotification
Expand Down
29 changes: 29 additions & 0 deletions libs/wire-api-federation/src/Wire/API/Federation/API/Util.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2023 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.API.Util where

import Imports
import Wire.API.Federation.API.Galley.Notifications
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Component

makeConversationUpdateBundle ::
ConversationUpdate ->
FedQueueClient 'Galley (PayloadBundle 'Galley)
makeConversationUpdateBundle update =
(<>) <$> makeBundle update <*> makeBundle (conversationUpdateToV0 update)
Loading

0 comments on commit e6ee09f

Please sign in to comment.