Skip to content

Commit

Permalink
[WPB-4928] Stop using Servant client to enqueue federation notificati…
Browse files Browse the repository at this point in the history
…ons (#3647)

* Move a Brig federation endpoint in the API
* Move a Brig fed API notif endpoint to a module
* Move Galley federation endpoints in the API
* Move Galley notification endpoints
* A type alias for notification endpoints
* Add a changelog
* Define Galley notification API via types
* Convert a federation notification endpoint to a BackendNotification
* Stop using Servant client for 'fedQueueClient'
  • Loading branch information
mdimjasevic authored Oct 20, 2023
1 parent 80637b8 commit 4b765e5
Show file tree
Hide file tree
Showing 18 changed files with 378 additions and 182 deletions.
1 change: 1 addition & 0 deletions changelog.d/6-federation/WPB-4928-notification-endpoints
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reorganise the federation API such that queueing notification endpoints are separate from synchronous endpoints. Also simplify queueing federation notification endpoints.
37 changes: 30 additions & 7 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ module Wire.API.Federation.API
)
where

import Data.Aeson
import Data.Domain
import Data.Kind
import Data.Proxy
import GHC.TypeLits
import Imports
import Network.AMQP
import Servant
import Servant.Client
import Servant.Client.Core
Expand All @@ -46,6 +49,8 @@ import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MakesFederatedCall
import Wire.API.Routes.Named

Expand Down Expand Up @@ -94,14 +99,32 @@ fedClient ::
fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall (comp :: Component) (name :: Symbol) m api.
( HasEmptyResponse api,
HasFedEndpoint comp api name,
HasClient m api,
m ~ FedQueueClient comp
forall tag api.
( HasNotificationEndpoint tag,
-- FUTUREWORK: Include this API constraint and get it working
-- api ~ NotificationAPI tag (NotificationComponent tag),
HasEmptyResponse api,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent tag),
ToJSON (Payload tag),
HasFedEndpoint (NotificationComponent tag) api (NotificationPath tag)
) =>
Client m api
fedQueueClient = clientIn (Proxy @api) (Proxy @m)
Payload tag ->
FedQueueClient (NotificationComponent tag) ()
fedQueueClient payload = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.originDomain payload
msg =
newMsg
{ msgBody = encode notif,
msgDeliveryMode = Just (env.deliveryMode),
msgContentType = Just "application/json"
}
-- Empty string means default exchange
exchange = ""
liftIO $ do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
Expand Down
25 changes: 9 additions & 16 deletions libs/wire-api-federation/src/Wire/API/Federation/API/Brig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.API.Brig where
module Wire.API.Federation.API.Brig
( module Notifications,
module Wire.API.Federation.API.Brig,
)
where

import Data.Aeson
import Data.Domain (Domain)
import Data.Handle (Handle)
import Data.Id
import Data.Range
import Imports
import Servant.API
import Test.QuickCheck (Arbitrary)
import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Brig.Notifications as Notifications
import Wire.API.Federation.Endpoint
import Wire.API.Federation.Version
import Wire.API.MLS.CipherSuite
Expand Down Expand Up @@ -70,9 +73,11 @@ type BrigApi =
:<|> FedEndpoint "get-user-clients" GetUserClients (UserMap (Set PubClient))
:<|> FedEndpoint "get-mls-clients" MLSClientsRequest (Set ClientInfo)
:<|> FedEndpoint "send-connection-action" NewConnectionRequest NewConnectionResponse
:<|> FedEndpoint "on-user-deleted-connections" UserDeletedConnectionsNotification EmptyResponse
:<|> FedEndpoint "claim-key-packages" ClaimKeyPackageRequest (Maybe KeyPackageBundle)
:<|> FedEndpoint "get-not-fully-connected-backends" DomainSet NonConnectedBackends
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> BrigNotificationAPI

newtype DomainSet = DomainSet
{ domains :: Set Domain
Expand Down Expand Up @@ -143,18 +148,6 @@ data NewConnectionResponse
deriving (Arbitrary) via (GenericUniform NewConnectionResponse)
deriving (FromJSON, ToJSON) via (CustomEncoded NewConnectionResponse)

type UserDeletedNotificationMaxConnections = 1000

data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification
{ -- | This is qualified implicitly by the origin domain
user :: UserId,
-- | These are qualified implicitly by the target domain
connections :: Range 1 UserDeletedNotificationMaxConnections [UserId]
}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification)
deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification)

data ClaimKeyPackageRequest = ClaimKeyPackageRequest
{ -- | The user making the request, implictly qualified by the origin domain.
claimant :: UserId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2023 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.API.Brig.Notifications where

import Data.Aeson
import Data.Id
import Data.Range
import Imports
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Util.Aeson
import Wire.Arbitrary

type UserDeletedNotificationMaxConnections = 1000

data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification
{ -- | This is qualified implicitly by the origin domain
user :: UserId,
-- | These are qualified implicitly by the target domain
connections :: Range 1 UserDeletedNotificationMaxConnections [UserId]
}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification)
deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification)

data BrigNotificationTag = OnUserDeletedConnectionsTag
deriving (Show, Eq, Generic, Bounded, Enum)

instance HasNotificationEndpoint 'OnUserDeletedConnectionsTag where
type Payload 'OnUserDeletedConnectionsTag = UserDeletedConnectionsNotification
type NotificationPath 'OnUserDeletedConnectionsTag = "on-user-deleted-connections"
type NotificationComponent 'OnUserDeletedConnectionsTag = 'Brig
type
NotificationAPI 'OnUserDeletedConnectionsTag 'Brig =
NotificationFedEndpoint 'OnUserDeletedConnectionsTag

-- | All the notification endpoints return an 'EmptyResponse'.
type BrigNotificationAPI =
-- FUTUREWORK: Use NotificationAPI 'OnUserDeletedConnectionsTag 'Brig instead
NotificationFedEndpoint 'OnUserDeletedConnectionsTag
110 changes: 10 additions & 100 deletions libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.API.Galley where
module Wire.API.Federation.API.Galley
( module Wire.API.Federation.API.Galley,
module Notifications,
)
where

import Data.Aeson (FromJSON, ToJSON)
import Data.Domain
import Data.Id
import Data.Json.Util
import Data.List.NonEmpty (NonEmpty)
import Data.Misc (Milliseconds)
import Data.Qualified
import Data.Range
import Data.Time.Clock (UTCTime)
import Imports
import Network.Wai.Utilities.JSONResponse
Expand All @@ -36,12 +38,13 @@ import Wire.API.Conversation.Role (RoleName)
import Wire.API.Conversation.Typing
import Wire.API.Error.Galley
import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Galley.Notifications as Notifications
import Wire.API.Federation.Endpoint
import Wire.API.MLS.SubConversation
import Wire.API.MakesFederatedCall
import Wire.API.Message
import Wire.API.Routes.Public.Galley.Messaging
import Wire.API.Util.Aeson (CustomEncoded (..), CustomEncodedLensable (..))
import Wire.API.Util.Aeson (CustomEncoded (..))
import Wire.Arbitrary (Arbitrary, GenericUniform (..))

-- FUTUREWORK: data types, json instances, more endpoints. See
Expand All @@ -58,9 +61,6 @@ type GalleyApi =
-- This endpoint is called the first time a user from this backend is
-- added to a remote conversation.
:<|> FedEndpoint "get-conversations" GetConversationsRequest GetConversationsResponse
-- used by the backend that owns a conversation to inform this backend of
-- changes to the conversation
:<|> FedEndpoint "on-conversation-updated" ConversationUpdate EmptyResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Galley "on-mls-message-sent",
Expand All @@ -70,9 +70,6 @@ type GalleyApi =
"leave-conversation"
LeaveConversationRequest
LeaveConversationResponse
-- used to notify this backend that a new message has been posted to a
-- remote conversation
:<|> FedEndpoint "on-message-sent" (RemoteMessage ConvId) EmptyResponse
-- used by a remote backend to send a message to a conversation owned by
-- this backend
:<|> FedEndpointWithMods
Expand All @@ -82,14 +79,6 @@ type GalleyApi =
"send-message"
ProteusMessageSendRequest
MessageSendResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent",
MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Brig "api-version"
]
"on-user-deleted-conversations"
UserDeletedConversationsNotification
EmptyResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Galley "on-mls-message-sent",
Expand All @@ -100,7 +89,6 @@ type GalleyApi =
ConversationUpdateRequest
ConversationUpdateResponse
:<|> FedEndpoint "mls-welcome" MLSWelcomeRequest MLSWelcomeResponse
:<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage EmptyResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Galley "on-mls-message-sent",
Expand All @@ -123,12 +111,6 @@ type GalleyApi =
MLSMessageSendRequest
MLSMessageResponse
:<|> FedEndpoint "query-group-info" GetGroupInfoRequest GetGroupInfoResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent"
]
"on-client-removed"
ClientRemovedRequest
EmptyResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-typing-indicator-updated"
]
Expand All @@ -153,6 +135,9 @@ type GalleyApi =
"get-one2one-conversation"
GetOne2OneConversationRequest
GetOne2OneConversationResponse
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> GalleyNotificationAPI

data TypingDataUpdateRequest = TypingDataUpdateRequest
{ typingStatus :: TypingStatus,
Expand Down Expand Up @@ -180,15 +165,6 @@ data TypingDataUpdated = TypingDataUpdated
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (CustomEncoded TypingDataUpdated)

data ClientRemovedRequest = ClientRemovedRequest
{ user :: UserId,
client :: ClientId,
convs :: [ConvId]
}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform ClientRemovedRequest)
deriving (FromJSON, ToJSON) via (CustomEncoded ClientRemovedRequest)

data GetConversationsRequest = GetConversationsRequest
{ userId :: UserId,
convIds :: [ConvId]
Expand Down Expand Up @@ -281,28 +257,6 @@ data ConversationCreated conv = ConversationCreated
ccRemoteOrigUserId :: ConversationCreated (Remote ConvId) -> Remote UserId
ccRemoteOrigUserId cc = qualifyAs cc.cnvId cc.origUserId

data ConversationUpdate = ConversationUpdate
{ cuTime :: UTCTime,
cuOrigUserId :: Qualified UserId,
-- | The unqualified ID of the conversation where the update is happening.
-- The ID is local to the sender to prevent putting arbitrary domain that
-- is different than that of the backend making a conversation membership
-- update request.
cuConvId :: ConvId,
-- | A list of users from the receiving backend that need to be sent
-- notifications about this change. This is required as we do not expect a
-- non-conversation owning backend to have an indexed mapping of
-- conversation to users.
cuAlreadyPresentUsers :: [UserId],
-- | Information on the specific action that caused the update.
cuAction :: SomeConversationAction
}
deriving (Eq, Show, Generic)

instance ToJSON ConversationUpdate

instance FromJSON ConversationUpdate

data LeaveConversationRequest = LeaveConversationRequest
{ -- | The conversation is assumed to be owned by the target domain, which
-- allows us to protect against relay attacks
Expand All @@ -324,38 +278,6 @@ data RemoveFromConversationError
(ToJSON, FromJSON)
via (CustomEncoded RemoveFromConversationError)

-- Note: this is parametric in the conversation type to allow it to be used
-- both for conversations with a fixed known domain (e.g. as the argument of the
-- federation RPC), and for conversations with an arbitrary Qualified or Remote id
-- (e.g. as the argument of the corresponding handler).
data RemoteMessage conv = RemoteMessage
{ time :: UTCTime,
_data :: Maybe Text,
sender :: Qualified UserId,
senderClient :: ClientId,
conversation :: conv,
priority :: Maybe Priority,
push :: Bool,
transient :: Bool,
recipients :: UserClientMap Text
}
deriving stock (Eq, Show, Generic, Functor)
deriving (Arbitrary) via (GenericUniform (RemoteMessage conv))
deriving (ToJSON, FromJSON) via (CustomEncodedLensable (RemoteMessage conv))

data RemoteMLSMessage = RemoteMLSMessage
{ time :: UTCTime,
metadata :: MessageMetadata,
sender :: Qualified UserId,
conversation :: ConvId,
subConversation :: Maybe SubConvId,
recipients :: Map UserId (NonEmpty ClientId),
message :: Base64ByteString
}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform RemoteMLSMessage)
deriving (ToJSON, FromJSON) via (CustomEncoded RemoteMLSMessage)

data RemoteMLSMessageResponse
= RemoteMLSMessageOk
| RemoteMLSMessageMLSNotEnabled
Expand Down Expand Up @@ -406,18 +328,6 @@ newtype LeaveConversationResponse = LeaveConversationResponse
(ToJSON, FromJSON)
via (Either (CustomEncoded RemoveFromConversationError) ())

type UserDeletedNotificationMaxConvs = 1000

data UserDeletedConversationsNotification = UserDeletedConversationsNotification
{ -- | This is qualified implicitly by the origin domain
user :: UserId,
-- | These are qualified implicitly by the target domain
conversations :: Range 1 UserDeletedNotificationMaxConvs [ConvId]
}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform UserDeletedConversationsNotification)
deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConversationsNotification)

data ConversationUpdateRequest = ConversationUpdateRequest
{ -- | The user that is attempting to perform the action. This is qualified
-- implicitly by the origin domain
Expand Down
Loading

0 comments on commit 4b765e5

Please sign in to comment.