Skip to content

Commit

Permalink
WIP: Refactoring fedQueueClient
Browse files Browse the repository at this point in the history
This does not run for now as it has an undefined body
  • Loading branch information
mdimjasevic committed Oct 19, 2023
1 parent 0c53ff5 commit 808050d
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 113 deletions.
16 changes: 9 additions & 7 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MakesFederatedCall
import Wire.API.Routes.Named

Expand Down Expand Up @@ -94,14 +95,15 @@ fedClient ::
fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall (comp :: Component) (name :: Symbol) m api.
( HasEmptyResponse api,
HasFedEndpoint comp api name,
HasClient m api,
m ~ FedQueueClient comp
forall tag api.
( HasNotificationEndpoint tag,
-- api ~ NotificationAPI tag (NotificationComponent tag),
HasEmptyResponse api,
HasFedEndpoint (NotificationComponent tag) api (NotificationPath tag)
) =>
Client m api
fedQueueClient = clientIn (Proxy @api) (Proxy @m)
Payload tag ->
FedQueueClient (NotificationComponent tag) ()
fedQueueClient = undefined

fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type BrigApi =
:<|> FedEndpoint "get-not-fully-connected-backends" DomainSet NonConnectedBackends
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> NotificationAPI
:<|> BrigNotificationAPI

newtype DomainSet = DomainSet
{ domains :: Set Domain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import Data.Aeson
import Data.Id
import Data.Range
import Imports
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Util.Aeson
import Wire.Arbitrary

Expand All @@ -37,6 +39,17 @@ data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification
deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification)
deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification)

data BrigNotificationTag = OnUserDeletedConnectionsTag
deriving (Show, Eq, Generic, Bounded, Enum)

instance HasNotificationEndpoint 'OnUserDeletedConnectionsTag where
type Payload 'OnUserDeletedConnectionsTag = UserDeletedConnectionsNotification
type NotificationPath 'OnUserDeletedConnectionsTag = "on-user-deleted-connections"
type NotificationComponent 'OnUserDeletedConnectionsTag = 'Brig
type
NotificationAPI 'OnUserDeletedConnectionsTag 'Brig =
NotificationFedEndpoint 'OnUserDeletedConnectionsTag

-- | All the notification endpoints return an 'EmptyResponse'.
type NotificationAPI =
NotificationFedEndpoint "on-user-deleted-connections" UserDeletedConnectionsNotification
type BrigNotificationAPI =
NotificationFedEndpoint 'OnUserDeletedConnectionsTag
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type GalleyApi =
GetOne2OneConversationResponse
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> NotificationAPI
:<|> GalleyNotificationAPI

data TypingDataUpdateRequest = TypingDataUpdateRequest
{ typingStatus :: TypingStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import Data.Aeson
import Data.Domain
import Data.Id
import Data.Json.Util
import Data.Kind
import Data.List.NonEmpty
import Data.Proxy
import Data.Qualified
Expand All @@ -37,6 +36,7 @@ import Servant.API
import Wire.API.Conversation.Action
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MLS.SubConversation
import Wire.API.MakesFederatedCall
import Wire.API.Message
Expand All @@ -52,65 +52,73 @@ data GalleyNotificationTag
| OnUserDeletedConversationsTag
deriving (Show, Eq, Generic, Bounded, Enum)

type family GalleyNotification (tag :: GalleyNotificationTag) :: Type where
GalleyNotification 'OnClientRemovedTag = ClientRemovedRequest
GalleyNotification 'OnMessageSentTag = RemoteMessage ConvId
GalleyNotification 'OnMLSMessageSentTag = RemoteMLSMessage
GalleyNotification 'OnConversationUpdatedTag = ConversationUpdate
GalleyNotification 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification

-- | The central path component of a Galley notification endpoint
type family GNPath (tag :: GalleyNotificationTag) :: Symbol where
GNPath 'OnClientRemovedTag = "on-client-removed"
GNPath 'OnMessageSentTag = "on-message-sent"
GNPath 'OnMLSMessageSentTag = "on-mls-message-sent"
GNPath 'OnConversationUpdatedTag = "on-conversation-updated"
GNPath 'OnUserDeletedConversationsTag = "on-user-deleted-conversations"

type GalleyNotifEndpoint (tag :: GalleyNotificationTag) =
NotificationFedEndpoint (GNPath tag) (GalleyNotification tag)

type family GalleyNotificationToServantAPI (gn :: GalleyNotificationTag) :: Type where
GalleyNotificationToServantAPI 'OnClientRemovedTag =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent"
]
(GNPath 'OnClientRemovedTag)
(GalleyNotification 'OnClientRemovedTag)
instance HasNotificationEndpoint 'OnClientRemovedTag where
type Payload 'OnClientRemovedTag = ClientRemovedRequest
type NotificationPath 'OnClientRemovedTag = "on-client-removed"
type NotificationComponent 'OnClientRemovedTag = 'Galley
type
NotificationAPI 'OnClientRemovedTag 'Galley =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent"
]
(NotificationPath 'OnClientRemovedTag)
(Payload 'OnClientRemovedTag)

instance HasNotificationEndpoint 'OnMessageSentTag where
type Payload 'OnMessageSentTag = RemoteMessage ConvId
type NotificationPath 'OnMessageSentTag = "on-message-sent"
type NotificationComponent 'OnMessageSentTag = 'Galley

-- used to notify this backend that a new message has been posted to a
-- remote conversation
GalleyNotificationToServantAPI 'OnMessageSentTag = GalleyNotifEndpoint 'OnMessageSentTag
GalleyNotificationToServantAPI 'OnMLSMessageSentTag = GalleyNotifEndpoint 'OnMLSMessageSentTag
type NotificationAPI 'OnMessageSentTag 'Galley = NotificationFedEndpoint 'OnMessageSentTag

instance HasNotificationEndpoint 'OnMLSMessageSentTag where
type Payload 'OnMLSMessageSentTag = RemoteMLSMessage
type NotificationPath 'OnMLSMessageSentTag = "on-mls-message-sent"
type NotificationComponent 'OnMLSMessageSentTag = 'Galley
type NotificationAPI 'OnMLSMessageSentTag 'Galley = NotificationFedEndpoint 'OnMLSMessageSentTag

instance HasNotificationEndpoint 'OnConversationUpdatedTag where
type Payload 'OnConversationUpdatedTag = ConversationUpdate
type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated"
type NotificationComponent 'OnConversationUpdatedTag = 'Galley

-- used by the backend that owns a conversation to inform this backend of
-- changes to the conversation
GalleyNotificationToServantAPI 'OnConversationUpdatedTag =
GalleyNotifEndpoint 'OnConversationUpdatedTag
GalleyNotificationToServantAPI 'OnUserDeletedConversationsTag =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent",
MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Brig "api-version"
]
(GNPath 'OnUserDeletedConversationsTag)
(GalleyNotification 'OnUserDeletedConversationsTag)
type NotificationAPI 'OnConversationUpdatedTag 'Galley = NotificationFedEndpoint 'OnConversationUpdatedTag

instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where
type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification
type NotificationPath 'OnUserDeletedConversationsTag = "on-user-deleted-conversations"
type NotificationComponent 'OnUserDeletedConversationsTag = 'Galley
type
NotificationAPI 'OnUserDeletedConversationsTag 'Galley =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent",
MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Brig "api-version"
]
(NotificationPath 'OnUserDeletedConversationsTag)
(Payload 'OnUserDeletedConversationsTag)

-- | All the notification endpoints return an 'EmptyResponse'.
type NotificationAPI =
GalleyNotificationToServantAPI 'OnClientRemovedTag
:<|> GalleyNotificationToServantAPI 'OnMessageSentTag
:<|> GalleyNotificationToServantAPI 'OnMLSMessageSentTag
:<|> GalleyNotificationToServantAPI 'OnConversationUpdatedTag
:<|> GalleyNotificationToServantAPI 'OnUserDeletedConversationsTag
type GalleyNotificationAPI =
NotificationAPI 'OnClientRemovedTag 'Galley
:<|> NotificationAPI 'OnMessageSentTag 'Galley
:<|> NotificationAPI 'OnMLSMessageSentTag 'Galley
:<|> NotificationAPI 'OnConversationUpdatedTag 'Galley
:<|> NotificationAPI 'OnUserDeletedConversationsTag 'Galley

galleyToBackendNotification ::
forall tag.
KnownSymbol (GNPath tag) =>
ToJSON (GalleyNotification tag) =>
forall (tag :: GalleyNotificationTag).
KnownSymbol (NotificationPath tag) =>
ToJSON (Payload tag) =>
Domain ->
GalleyNotification tag ->
Payload tag ->
BackendNotification
galleyToBackendNotification ownDomain gn =
let p = symbolVal (Proxy @(GNPath tag))
let p = symbolVal (Proxy @(NotificationPath tag))
b = RawJson . encode $ gn
in toNotif (T.pack . show $ p) b
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@ module Wire.API.Federation.BackendNotifications where
import Control.Exception
import Control.Monad.Except
import Data.Aeson
import Data.ByteString.Builder qualified as Builder
import Data.ByteString.Lazy qualified as LBS
import Data.Domain
import Data.Map qualified as Map
import Data.Sequence qualified as Seq
import Data.Text qualified as Text
import Data.Text.Encoding
import Data.Text.Lazy.Encoding qualified as TL
import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Types qualified as Q
import Network.HTTP.Types
import Servant
import Servant.Client
import Servant.Client.Core
import Servant.Types.SourceT
import Wire.API.Federation.API.Common
import Wire.API.Federation.Client
import Wire.API.Federation.Component
Expand Down Expand Up @@ -125,7 +118,7 @@ ensureQueue chan queue = do
-- queue. Perhaps none of this should be servant code anymore. But it is here to
-- allow smooth transition to RabbitMQ based notification pushing.
--
-- Use 'Wire.API.Federation.API.fedQueueClient' to create and action and pass it
-- Use 'Wire.API.Federation.API.fedQueueClient' to create an action and pass it
-- to 'enqueue'
newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv)
Expand All @@ -142,41 +135,41 @@ data EnqueueError = EnqueueError String

instance Exception EnqueueError

instance (KnownComponent c) => RunClient (FedQueueClient c) where
runRequestAcceptStatus :: Maybe [Status] -> Request -> FedQueueClient c Response
runRequestAcceptStatus _ req = do
env <- ask
bodyLBS <- case requestBody req of
Just (RequestBodyLBS lbs, _) -> pure lbs
Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs)
Just (RequestBodySource src, _) -> liftIO $ do
errOrRes <- runExceptT $ runSourceT src
either (throwIO . EnqueueError) (pure . mconcat) errOrRes
Nothing -> pure mempty
let notif =
BackendNotification
{ ownDomain = env.originDomain,
targetComponent = componentVal @c,
path = decodeUtf8 $ LBS.toStrict $ Builder.toLazyByteString req.requestPath,
body = RawJson bodyLBS
}
let msg =
Q.newMsg
{ Q.msgBody = encode notif,
Q.msgDeliveryMode = Just (env.deliveryMode),
Q.msgContentType = Just "application/json"
}
-- Empty string means default exchange
exchange = ""
liftIO $ do
ensureQueue env.channel env.targetDomain._domainText
void $ Q.publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg
pure $
Response
{ responseHttpVersion = http20,
responseStatusCode = status200,
responseHeaders = Seq.singleton (hContentType, "application/json"),
responseBody = "{}"
}
throwClientError :: ClientError -> FedQueueClient c a
throwClientError = liftIO . throwIO
-- instance (KnownComponent c) => RunClient (FedQueueClient c) where
-- runRequestAcceptStatus :: Maybe [Status] -> Request -> FedQueueClient c Response
-- runRequestAcceptStatus _ req = do
-- env <- ask
-- bodyLBS <- case requestBody req of
-- Just (RequestBodyLBS lbs, _) -> pure lbs
-- Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs)
-- Just (RequestBodySource src, _) -> liftIO $ do
-- errOrRes <- runExceptT $ runSourceT src
-- either (throwIO . EnqueueError) (pure . mconcat) errOrRes
-- Nothing -> pure mempty
-- let notif =
-- BackendNotification
-- { ownDomain = env.originDomain,
-- targetComponent = componentVal @c,
-- path = decodeUtf8 $ LBS.toStrict $ Builder.toLazyByteString req.requestPath,
-- body = RawJson bodyLBS
-- }
-- let msg =
-- Q.newMsg
-- { Q.msgBody = encode notif,
-- Q.msgDeliveryMode = Just (env.deliveryMode),
-- Q.msgContentType = Just "application/json"
-- }
-- -- Empty string means default exchange
-- exchange = ""
-- liftIO $ do
-- ensureQueue env.channel env.targetDomain._domainText
-- void $ Q.publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg
-- pure $
-- Response
-- { responseHttpVersion = http20,
-- responseStatusCode = status200,
-- responseHeaders = Seq.singleton (hContentType, "application/json"),
-- responseBody = "{}"
-- }
-- throwClientError :: ClientError -> FedQueueClient c a
-- throwClientError = liftIO . throwIO
4 changes: 3 additions & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Servant.API
import Wire.API.ApplyMods
import Wire.API.Federation.API.Common
import Wire.API.Federation.Domain
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Routes.Named

type FedEndpointWithMods (mods :: [Type]) name input output =
Expand All @@ -41,7 +42,8 @@ type NotificationFedEndpointWithMods (mods :: [Type]) name input =

type FedEndpoint name input output = FedEndpointWithMods '[] name input output

type NotificationFedEndpoint name input = FedEndpoint name input EmptyResponse
type NotificationFedEndpoint tag =
FedEndpoint (NotificationPath tag) (Payload tag) EmptyResponse

type StreamingFedEndpoint name input output =
Named
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.HasNotificationEndpoint where

import Data.Kind
import GHC.TypeLits
import Wire.API.Federation.Component

class HasNotificationEndpoint t where
-- | The type of the payload for this endpoint
type Payload t :: Type

-- | The central path component of a notification endpoint
type NotificationPath t :: Symbol

-- | The server component this endpoint is associated with
type NotificationComponent t :: Component

-- | The Servant API endpoint type
type NotificationAPI t (c :: Component) :: Type
1 change: 1 addition & 0 deletions libs/wire-api-federation/wire-api-federation.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ library
Wire.API.Federation.Domain
Wire.API.Federation.Endpoint
Wire.API.Federation.Error
Wire.API.Federation.HasNotificationEndpoint
Wire.API.Federation.Version

other-modules: Paths_wire_api_federation
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ notifyUserDeleted self remotes = do
Just chanVar -> do
enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $
void $
fedQueueClient @'Brig @"on-user-deleted-connections" notif
fedQueueClient @'OnUserDeletedConnectionsTag notif
Nothing ->
Log.err $
Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString)
Expand Down
Loading

0 comments on commit 808050d

Please sign in to comment.