Skip to content

Commit

Permalink
Stop using Servant client for 'fedQueueClient'
Browse files Browse the repository at this point in the history
  • Loading branch information
mdimjasevic committed Oct 20, 2023
1 parent 892f4d8 commit fbc0ea1
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 136 deletions.
36 changes: 29 additions & 7 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ module Wire.API.Federation.API
)
where

import Data.Aeson
import Data.Domain
import Data.Kind
import Data.Proxy
import GHC.TypeLits
import Imports
import Network.AMQP
import Servant
import Servant.Client
import Servant.Client.Core
Expand All @@ -46,6 +49,8 @@ import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MakesFederatedCall
import Wire.API.Routes.Named

Expand Down Expand Up @@ -94,14 +99,31 @@ fedClient ::
fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall (comp :: Component) (name :: Symbol) m api.
( HasEmptyResponse api,
HasFedEndpoint comp api name,
HasClient m api,
m ~ FedQueueClient comp
forall tag api.
( HasNotificationEndpoint tag,
-- api ~ NotificationAPI tag (NotificationComponent tag),
HasEmptyResponse api,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent tag),
ToJSON (Payload tag),
HasFedEndpoint (NotificationComponent tag) api (NotificationPath tag)
) =>
Client m api
fedQueueClient = clientIn (Proxy @api) (Proxy @m)
Payload tag ->
FedQueueClient (NotificationComponent tag) ()
fedQueueClient payload = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.originDomain payload
msg =
newMsg
{ msgBody = encode notif,
msgDeliveryMode = Just (env.deliveryMode),
msgContentType = Just "application/json"
}
-- Empty string means default exchange
exchange = ""
liftIO $ do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type BrigApi =
:<|> FedEndpoint "get-not-fully-connected-backends" DomainSet NonConnectedBackends
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> NotificationAPI
:<|> BrigNotificationAPI

newtype DomainSet = DomainSet
{ domains :: Set Domain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import Data.Aeson
import Data.Id
import Data.Range
import Imports
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Util.Aeson
import Wire.Arbitrary

Expand All @@ -37,6 +39,18 @@ data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification
deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification)
deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification)

data BrigNotificationTag = OnUserDeletedConnectionsTag
deriving (Show, Eq, Generic, Bounded, Enum)

instance HasNotificationEndpoint 'OnUserDeletedConnectionsTag where
type Payload 'OnUserDeletedConnectionsTag = UserDeletedConnectionsNotification
type NotificationPath 'OnUserDeletedConnectionsTag = "on-user-deleted-connections"
type NotificationComponent 'OnUserDeletedConnectionsTag = 'Brig
type
NotificationAPI 'OnUserDeletedConnectionsTag 'Brig =
NotificationFedEndpoint 'OnUserDeletedConnectionsTag

-- | All the notification endpoints return an 'EmptyResponse'.
type NotificationAPI =
NotificationFedEndpoint "on-user-deleted-connections" UserDeletedConnectionsNotification
type BrigNotificationAPI =
-- FUTUREWORK: Use NotificationAPI 'OnUserDeletedConnectionsTag 'Brig instead
NotificationFedEndpoint 'OnUserDeletedConnectionsTag
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type GalleyApi =
GetOne2OneConversationResponse
-- All the notification endpoints that go through the queue-based
-- federation client ('fedQueueClient').
:<|> NotificationAPI
:<|> GalleyNotificationAPI

data TypingDataUpdateRequest = TypingDataUpdateRequest
{ typingStatus :: TypingStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,21 @@
module Wire.API.Federation.API.Galley.Notifications where

import Data.Aeson
import Data.Domain
import Data.Id
import Data.Json.Util
import Data.Kind
import Data.List.NonEmpty
import Data.Proxy
import Data.Qualified
import Data.Range
import Data.Text qualified as T
import Data.Time.Clock
import GHC.TypeLits
import Imports
import Servant.API
import Wire.API.Conversation.Action
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MLS.SubConversation
import Wire.API.MakesFederatedCall
import Wire.API.Message
import Wire.API.RawJson
import Wire.API.Util.Aeson
import Wire.Arbitrary

Expand All @@ -52,76 +47,63 @@ data GalleyNotificationTag
| OnUserDeletedConversationsTag
deriving (Show, Eq, Generic, Bounded, Enum)

type family GalleyNotification (tag :: GalleyNotificationTag) :: Type where
GalleyNotification 'OnClientRemovedTag = ClientRemovedRequest
GalleyNotification 'OnMessageSentTag = RemoteMessage ConvId
GalleyNotification 'OnMLSMessageSentTag = RemoteMLSMessage
GalleyNotification 'OnConversationUpdatedTag = ConversationUpdate
GalleyNotification 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification

-- | The central path component of a Galley notification endpoint
type family GNPath (tag :: GalleyNotificationTag) :: Symbol where
GNPath 'OnClientRemovedTag = "on-client-removed"
GNPath 'OnMessageSentTag = "on-message-sent"
GNPath 'OnMLSMessageSentTag = "on-mls-message-sent"
GNPath 'OnConversationUpdatedTag = "on-conversation-updated"
GNPath 'OnUserDeletedConversationsTag = "on-user-deleted-conversations"

type GalleyNotifEndpoint (tag :: GalleyNotificationTag) =
NotificationFedEndpoint (GNPath tag) (GalleyNotification tag)

type family GalleyNotificationToServantAPI (gn :: GalleyNotificationTag) :: Type where
GalleyNotificationToServantAPI 'OnClientRemovedTag =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent"
]
(GNPath 'OnClientRemovedTag)
(GalleyNotification 'OnClientRemovedTag)
instance HasNotificationEndpoint 'OnClientRemovedTag where
type Payload 'OnClientRemovedTag = ClientRemovedRequest
type NotificationPath 'OnClientRemovedTag = "on-client-removed"
type NotificationComponent 'OnClientRemovedTag = 'Galley
type
NotificationAPI 'OnClientRemovedTag 'Galley =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent"
]
(NotificationPath 'OnClientRemovedTag)
(Payload 'OnClientRemovedTag)

instance HasNotificationEndpoint 'OnMessageSentTag where
type Payload 'OnMessageSentTag = RemoteMessage ConvId
type NotificationPath 'OnMessageSentTag = "on-message-sent"
type NotificationComponent 'OnMessageSentTag = 'Galley

-- used to notify this backend that a new message has been posted to a
-- remote conversation
GalleyNotificationToServantAPI 'OnMessageSentTag = GalleyNotifEndpoint 'OnMessageSentTag
GalleyNotificationToServantAPI 'OnMLSMessageSentTag = GalleyNotifEndpoint 'OnMLSMessageSentTag
type NotificationAPI 'OnMessageSentTag 'Galley = NotificationFedEndpoint 'OnMessageSentTag

instance HasNotificationEndpoint 'OnMLSMessageSentTag where
type Payload 'OnMLSMessageSentTag = RemoteMLSMessage
type NotificationPath 'OnMLSMessageSentTag = "on-mls-message-sent"
type NotificationComponent 'OnMLSMessageSentTag = 'Galley
type NotificationAPI 'OnMLSMessageSentTag 'Galley = NotificationFedEndpoint 'OnMLSMessageSentTag

instance HasNotificationEndpoint 'OnConversationUpdatedTag where
type Payload 'OnConversationUpdatedTag = ConversationUpdate
type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated"
type NotificationComponent 'OnConversationUpdatedTag = 'Galley

-- used by the backend that owns a conversation to inform this backend of
-- changes to the conversation
GalleyNotificationToServantAPI 'OnConversationUpdatedTag =
GalleyNotifEndpoint 'OnConversationUpdatedTag
GalleyNotificationToServantAPI 'OnUserDeletedConversationsTag =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent",
MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Brig "api-version"
]
(GNPath 'OnUserDeletedConversationsTag)
(GalleyNotification 'OnUserDeletedConversationsTag)
type NotificationAPI 'OnConversationUpdatedTag 'Galley = NotificationFedEndpoint 'OnConversationUpdatedTag

instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where
type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification
type NotificationPath 'OnUserDeletedConversationsTag = "on-user-deleted-conversations"
type NotificationComponent 'OnUserDeletedConversationsTag = 'Galley
type
NotificationAPI 'OnUserDeletedConversationsTag 'Galley =
NotificationFedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-mls-message-sent",
MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Brig "api-version"
]
(NotificationPath 'OnUserDeletedConversationsTag)
(Payload 'OnUserDeletedConversationsTag)

-- | All the notification endpoints return an 'EmptyResponse'.
type NotificationAPI =
GalleyNotificationToServantAPI 'OnClientRemovedTag
:<|> GalleyNotificationToServantAPI 'OnMessageSentTag
:<|> GalleyNotificationToServantAPI 'OnMLSMessageSentTag
:<|> GalleyNotificationToServantAPI 'OnConversationUpdatedTag
:<|> GalleyNotificationToServantAPI 'OnUserDeletedConversationsTag

galleyToBackendNotification ::
forall tag.
KnownSymbol (GNPath tag) =>
ToJSON (GalleyNotification tag) =>
Domain ->
GalleyNotification tag ->
BackendNotification
galleyToBackendNotification ownDomain gn =
let p = symbolVal (Proxy @(GNPath tag))
b = RawJson . encode $ gn
in toNotif (T.pack . show $ p) b
where
toNotif :: Text -> RawJson -> BackendNotification
toNotif path payload =
BackendNotification
{ ownDomain = ownDomain,
targetComponent = Galley,
path = path,
body = payload
}
type GalleyNotificationAPI =
NotificationAPI 'OnClientRemovedTag 'Galley
:<|> NotificationAPI 'OnMessageSentTag 'Galley
:<|> NotificationAPI 'OnMLSMessageSentTag 'Galley
:<|> NotificationAPI 'OnConversationUpdatedTag 'Galley
:<|> NotificationAPI 'OnUserDeletedConversationsTag 'Galley

data ClientRemovedRequest = ClientRemovedRequest
{ user :: UserId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@ module Wire.API.Federation.BackendNotifications where
import Control.Exception
import Control.Monad.Except
import Data.Aeson
import Data.ByteString.Builder qualified as Builder
import Data.ByteString.Lazy qualified as LBS
import Data.Domain
import Data.Map qualified as Map
import Data.Sequence qualified as Seq
import Data.Text qualified as Text
import Data.Text.Encoding
import Data.Text.Lazy.Encoding qualified as TL
import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Types qualified as Q
import Network.HTTP.Types
import Servant
import Servant.Client
import Servant.Client.Core
import Servant.Types.SourceT
import Wire.API.Federation.API.Common
import Wire.API.Federation.Client
import Wire.API.Federation.Component
Expand Down Expand Up @@ -125,7 +118,7 @@ ensureQueue chan queue = do
-- queue. Perhaps none of this should be servant code anymore. But it is here to
-- allow smooth transition to RabbitMQ based notification pushing.
--
-- Use 'Wire.API.Federation.API.fedQueueClient' to create and action and pass it
-- Use 'Wire.API.Federation.API.fedQueueClient' to create an action and pass it
-- to 'enqueue'
newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv)
Expand All @@ -141,42 +134,3 @@ data EnqueueError = EnqueueError String
deriving (Show)

instance Exception EnqueueError

instance (KnownComponent c) => RunClient (FedQueueClient c) where
runRequestAcceptStatus :: Maybe [Status] -> Request -> FedQueueClient c Response
runRequestAcceptStatus _ req = do
env <- ask
bodyLBS <- case requestBody req of
Just (RequestBodyLBS lbs, _) -> pure lbs
Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs)
Just (RequestBodySource src, _) -> liftIO $ do
errOrRes <- runExceptT $ runSourceT src
either (throwIO . EnqueueError) (pure . mconcat) errOrRes
Nothing -> pure mempty
let notif =
BackendNotification
{ ownDomain = env.originDomain,
targetComponent = componentVal @c,
path = decodeUtf8 $ LBS.toStrict $ Builder.toLazyByteString req.requestPath,
body = RawJson bodyLBS
}
let msg =
Q.newMsg
{ Q.msgBody = encode notif,
Q.msgDeliveryMode = Just (env.deliveryMode),
Q.msgContentType = Just "application/json"
}
-- Empty string means default exchange
exchange = ""
liftIO $ do
ensureQueue env.channel env.targetDomain._domainText
void $ Q.publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg
pure $
Response
{ responseHttpVersion = http20,
responseStatusCode = status200,
responseHeaders = Seq.singleton (hContentType, "application/json"),
responseBody = "{}"
}
throwClientError :: ClientError -> FedQueueClient c a
throwClientError = liftIO . throwIO
4 changes: 3 additions & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Servant.API
import Wire.API.ApplyMods
import Wire.API.Federation.API.Common
import Wire.API.Federation.Domain
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Routes.Named

type FedEndpointWithMods (mods :: [Type]) name input output =
Expand All @@ -41,7 +42,8 @@ type NotificationFedEndpointWithMods (mods :: [Type]) name input =

type FedEndpoint name input output = FedEndpointWithMods '[] name input output

type NotificationFedEndpoint name input = FedEndpoint name input EmptyResponse
type NotificationFedEndpoint tag =
FedEndpoint (NotificationPath tag) (Payload tag) EmptyResponse

type StreamingFedEndpoint name input output =
Named
Expand Down
Loading

0 comments on commit fbc0ea1

Please sign in to comment.