Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WPB-3867] Use queueing federation client for a federation API endpoint #3629

Merged
merged 4 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6-federation/wpb-3867-queue-endpoints
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Constrain which federation endpoints can be used via the queueing federation client
1 change: 1 addition & 0 deletions changelog.d/6-federation/wpb-3867-unreachable-users
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
There is a breaking change in the "on-mls-message-sent" federation endpoint due to queueing. Now that there is retrying because of queueing, the endpoint can no longer respond with a list of unreachable users.
22 changes: 21 additions & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import Data.Kind
import Data.Proxy
import GHC.TypeLits
import Imports
import Servant
import Servant.Client
import Servant.Client.Core
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Cargohold
import Wire.API.Federation.API.Common
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
Expand All @@ -64,6 +66,20 @@ type HasFedEndpoint comp api name = (HasUnsafeFedEndpoint comp api name)
-- you to forget about some federated calls.
type HasUnsafeFedEndpoint comp api name = 'Just api ~ LookupEndpoint (FedApi comp) name

-- | Constrains which endpoints can be used with FedQueueClient.
--
-- Since the servant client implementation underlying FedQueueClient is
-- returning a "fake" response consisting of an empty object, we need to make
-- sure that an API type is compatible with an empty response if we want to
-- invoke it using `fedQueueClient`
class HasEmptyResponse api

instance HasEmptyResponse (Post '[JSON] EmptyResponse)

instance HasEmptyResponse api => HasEmptyResponse (x :> api)

instance HasEmptyResponse api => HasEmptyResponse (Named name api)

-- | Return a client for a named endpoint.
--
-- This function introduces an 'AddAnnotation' constraint, which is
Expand All @@ -79,7 +95,11 @@ fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall (comp :: Component) (name :: Symbol) m api.
(HasFedEndpoint comp api name, HasClient m api, m ~ FedQueueClient comp) =>
( HasEmptyResponse api,
HasFedEndpoint comp api name,
HasClient m api,
m ~ FedQueueClient comp
) =>
Client m api
fedQueueClient = clientIn (Proxy @api) (Proxy @m)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import Wire.API.MLS.SubConversation
import Wire.API.MakesFederatedCall
import Wire.API.Message
import Wire.API.Routes.Public.Galley.Messaging
import Wire.API.Unreachable
import Wire.API.Util.Aeson (CustomEncoded (..), CustomEncodedLensable (..))
import Wire.Arbitrary (Arbitrary, GenericUniform (..))

Expand Down Expand Up @@ -97,7 +96,7 @@ type GalleyApi =
ConversationUpdateRequest
ConversationUpdateResponse
:<|> FedEndpoint "mls-welcome" MLSWelcomeRequest MLSWelcomeResponse
:<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage RemoteMLSMessageResponse
:<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage EmptyResponse
:<|> FedEndpointWithMods
'[ MakesFederatedCall 'Galley "on-conversation-updated",
MakesFederatedCall 'Galley "on-mls-message-sent",
Expand Down Expand Up @@ -419,7 +418,7 @@ data MLSMessageResponse
MLSMessageResponseUnreachableBackends (Set Domain)
| -- | If the list of unreachable users is non-empty, it corresponds to users
-- that an application message could not be sent to.
MLSMessageResponseUpdates [ConversationUpdate] (Maybe UnreachableUsers)
MLSMessageResponseUpdates [ConversationUpdate]
| MLSMessageResponseNonFederatingBackends NonFederatingBackends
deriving stock (Eq, Show, Generic)
deriving (ToJSON, FromJSON) via (CustomEncoded MLSMessageResponse)
Expand Down
31 changes: 10 additions & 21 deletions services/galley/src/Galley/API/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -786,47 +786,36 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do
now <- input
let lcnv = fmap (.convId) lconv
e = conversationActionToEvent tag now quid (tUntagged lcnv) Nothing action

let mkUpdate uids =
mkUpdate uids =
ConversationUpdate
now
quid
(tUnqualified lcnv)
uids
(SomeConversationAction tag action)

update <- do
let remoteTargets = toList (bmRemotes targets)
updates <-
enqueueNotificationsConcurrently Q.Persistent remoteTargets $ \ruids -> do
handleError :: FederationError -> Sem r (Maybe ConversationUpdate)
handleError fedErr =
logRemoteNotificationError @"on-conversation-updated" fedErr $> Nothing

update <-
fmap (fromMaybe (mkUpdate []))
. (either handleError (pure . asum . map tUnqualified))
<=< enqueueNotificationsConcurrently Q.Persistent (toList (bmRemotes targets))
$ \ruids -> do
let update = mkUpdate (tUnqualified ruids)
-- if notifyOrigDomain is false, filter out user from quid's domain,
-- because quid's backend will update local state and notify its users
-- itself using the ConversationUpdate returned by this function
if notifyOrigDomain || tDomain ruids /= qDomain quid
then fedQueueClient @'Galley @"on-conversation-updated" update $> Nothing
else pure (Just update)
case partitionEithers updates of
(ls :: [Remote ([UserId], FederationError)], rs) -> do
for_ ls $
logError
"on-conversation-updated"
"An error occurred while communicating with federated server: "
pure $ fromMaybe (mkUpdate []) . asum . map tUnqualified $ rs

-- notify local participants and bots
pushConversationEvent con e (qualifyAs lcnv (bmLocals targets)) (bmBots targets)

-- return both the event and the 'ConversationUpdate' structure corresponding
-- to the originating domain (if it is remote)
pure $ LocalConversationUpdate e update
where
logError :: String -> String -> Remote (a, FederationError) -> Sem r ()
logError field msg e =
P.warn $
Log.field "federation call" field
. Log.field "domain" (_domainText (tDomain e))
. Log.msg (msg <> displayException (snd (tUnqualified e)))

-- | Update the local database with information on conversation members joining
-- or leaving. Finally, push out notifications to local users.
Expand Down
Loading