diff --git a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs index 3560e2c5e44..3fa1aba2871 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs @@ -77,7 +77,7 @@ sendNotification env component path body = runFederatorClient env . void $ clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body -enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c () -> IO () +enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a enqueue channel originDomain targetDomain deliveryMode (FedQueueClient action) = runReaderT action FedQueueEnv {..} diff --git a/services/galley/src/Galley/API/Action.hs b/services/galley/src/Galley/API/Action.hs index e21ea249684..2efa6ca4d75 100644 --- a/services/galley/src/Galley/API/Action.hs +++ b/services/galley/src/Galley/API/Action.hs @@ -804,16 +804,15 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do -- because quid's backend will update local state and notify its users -- itself using the ConversationUpdate returned by this function if notifyOrigDomain || tDomain ruids /= qDomain quid - then (void $ fedQueueClient @'Galley @"on-conversation-updated" update, Nothing) - else (pure (), Just update) - let f = fromMaybe (mkUpdate []) . asum . map tUnqualified . rights - update = f updates - failedUpdates = lefts updates - for_ failedUpdates $ - logError - "on-conversation-updated" - "An error occurred while communicating with federated server: " - pure update + then fedQueueClient @'Galley @"on-conversation-updated" update $> Nothing + else pure (Just update) + case partitionEithers updates of + (ls :: [Remote ([UserId], FederationError)], rs) -> do + for_ ls $ + logError + "on-conversation-updated" + "An error occurred while communicating with federated server: " + pure $ fromMaybe (mkUpdate []) . asum . map tUnqualified $ rs -- notify local participants and bots pushConversationEvent con e (qualifyAs lcnv (bmLocals targets)) (bmBots targets) @@ -822,10 +821,12 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do -- to the originating domain (if it is remote) pure $ LocalConversationUpdate e update where - logError :: (Show a) => String -> String -> (a, FederationError) -> Sem r () + logError :: String -> String -> Remote (a, FederationError) -> Sem r () logError field msg e = P.warn $ - Log.field "federation call" field . Log.msg (msg <> show e) + Log.field "federation call" field + . Log.field "domain" (_domainText (tDomain e)) + . Log.msg (msg <> displayException (snd (tUnqualified e))) -- | Update the local database with information on conversation members joining -- or leaving. Finally, push out notifications to local users. diff --git a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs index cee114a4e19..ac006ded4c5 100644 --- a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs +++ b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs @@ -15,13 +15,13 @@ data BackendNotificationQueueAccess m a where KnownComponent c => Remote x -> Q.DeliveryMode -> - FedQueueClient c () -> - BackendNotificationQueueAccess m (Either FederationError ()) + FedQueueClient c a -> + BackendNotificationQueueAccess m (Either FederationError a) EnqueueNotificationsConcurrently :: (KnownComponent c, Foldable f, Functor f) => Q.DeliveryMode -> f (Remote x) -> - (Remote [x] -> (FedQueueClient c (), b)) -> - BackendNotificationQueueAccess m [Either (Remote [x], FederationError) (Remote b)] + (Remote [x] -> FedQueueClient c a) -> + BackendNotificationQueueAccess m [Either (Remote ([x], FederationError)) (Remote a)] makeSem ''BackendNotificationQueueAccess diff --git a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs index 4c005df735c..b9affe48585 100644 --- a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs +++ b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs @@ -5,6 +5,7 @@ module Galley.Intra.BackendNotificationQueue (interpretBackendNotificationQueueA import Control.Lens (view) import Control.Monad.Catch import Control.Retry +import Data.Bifunctor import Data.Domain import Data.Qualified import Galley.Effects.BackendNotificationQueueAccess (BackendNotificationQueueAccess (..)) @@ -32,7 +33,7 @@ interpretBackendNotificationQueueAccess = interpret $ \case EnqueueNotificationsConcurrently m xs rpc -> do embedApp $ enqueueNotificationsConcurrently m xs rpc -enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c () -> App (Either FederationError ()) +enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c a -> App (Either FederationError a) enqueueNotification remoteDomain deliveryMode action = do mChanVar <- view rabbitmqChannel ownDomain <- view (options . settings . federationDomain) @@ -61,14 +62,15 @@ enqueueNotification remoteDomain deliveryMode action = do enqueueNotificationsConcurrently :: (Foldable f, Functor f) => Q.DeliveryMode -> - f (Remote a) -> - (Remote [a] -> (FedQueueClient c (), b)) -> - App [Either (Remote [a], FederationError) (Remote b)] + f (Remote x) -> + (Remote [x] -> FedQueueClient c a) -> + App [(Either (Remote ([x], FederationError)) (Remote a))] enqueueNotificationsConcurrently m xs f = pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> - let o = f r - in bimap (r,) (qualifyAs r . const (snd o)) - <$> enqueueNotification (tDomain r) m (fst o) + bimap + (qualifyAs r . (tUnqualified r,)) + (qualifyAs r) + <$> enqueueNotification (tDomain r) m (f r) data NoRabbitMqChannel = NoRabbitMqChannel deriving (Show)