Skip to content

Commit

Permalink
Refactor fedQueueClient API
Browse files Browse the repository at this point in the history
  • Loading branch information
pcapriotti committed Feb 9, 2024
1 parent 53d4d2b commit 26eaae8
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 193 deletions.
17 changes: 15 additions & 2 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MakesFederatedCall
import Wire.API.Routes.Named

Expand Down Expand Up @@ -94,11 +95,11 @@ fedClientIn ::
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
fedQueueClientFromBundle ::
KnownComponent c =>
PayloadBundle c ->
FedQueueClient c ()
fedQueueClient bundle = do
fedQueueClientFromBundle bundle = do
env <- ask
let msg =
newMsg
Expand All @@ -112,6 +113,18 @@ fedQueueClient bundle = do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedQueueClient ::
forall {k} (tag :: k) c.
( HasNotificationEndpoint tag,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent k),
ToJSON (Payload tag),
c ~ NotificationComponent k
) =>
Payload tag ->
FedQueueClient c ()
fedQueueClient payload = fedQueueClientFromBundle =<< makeBundle @tag payload

-- | Like 'fedClientIn', but doesn't propagate a 'CallsFed' constraint. Intended
-- to be used in test situations only.
unsafeFedClientIn ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,24 @@ toBundle ::
Domain ->
Payload tag ->
PayloadBundle (NotificationComponent k)
toBundle reqId originDomain payload = do
toBundle reqId originDomain payload =
let notif = fedNotifToBackendNotif @tag reqId originDomain payload
PayloadBundle . pure $ notif
in PayloadBundle . pure $ notif

makeBundle ::
forall {k} (tag :: k) c.
( HasNotificationEndpoint tag,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent k),
A.ToJSON (Payload tag),
c ~ NotificationComponent k
) =>
Payload tag ->
FedQueueClient c (PayloadBundle c)
makeBundle payload = do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
pure $ toBundle @tag reqId origin payload

type BackendNotificationAPI = Capture "name" Text :> ReqBody '[JSON] RawJson :> Post '[JSON] EmptyResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,27 @@ dee = Id (fromJust (UUID.fromString "00000fff-0000-aaaa-0000-000100005007"))
testObject_ConversationUpdate1 :: ConversationUpdate
testObject_ConversationUpdate1 =
ConversationUpdate
{ cuTime = read "1864-04-12 12:22:43.673 UTC",
cuOrigUserId =
{ time = read "1864-04-12 12:22:43.673 UTC",
origUserId =
Qualified
(Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000007")))
(Domain "golden.example.com"),
cuConvId =
convId =
Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000006")),
cuAlreadyPresentUsers = [],
cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qAlice :| [qBob]) roleNameWireAdmin)
alreadyPresentUsers = [],
action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qAlice :| [qBob]) roleNameWireAdmin)
}

testObject_ConversationUpdate2 :: ConversationUpdate
testObject_ConversationUpdate2 =
ConversationUpdate
{ cuTime = read "1864-04-12 12:22:43.673 UTC",
cuOrigUserId =
{ time = read "1864-04-12 12:22:43.673 UTC",
origUserId =
Qualified
(Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000007")))
(Domain "golden.example.com"),
cuConvId =
convId =
Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000006")),
cuAlreadyPresentUsers = [chad, dee],
cuAction = SomeConversationAction (sing @'ConversationLeaveTag) ()
alreadyPresentUsers = [chad, dee],
action = SomeConversationAction (sing @'ConversationLeaveTag) ()
}
5 changes: 1 addition & 4 deletions services/brig/src/Brig/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,7 @@ notifyUserDeleted self remotes = do
view rabbitmqChannel >>= \case
Just chanVar -> do
enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $
void $ do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient $ toBundle @'OnUserDeletedConnectionsTag reqId origin notif
fedQueueClient @'OnUserDeletedConnectionsTag notif
Nothing ->
Log.err $
Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString)
Expand Down
6 changes: 1 addition & 5 deletions services/galley/src/Galley/API/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ import Wire.API.Federation.API
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Galley
import Wire.API.Federation.API.Galley qualified as F
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Error
import Wire.API.FederationStatus
import Wire.API.MLS.CipherSuite
Expand Down Expand Up @@ -899,10 +898,7 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do
-- because quid's backend will update local state and notify its users
-- itself using the ConversationUpdate returned by this function
if notifyOrigDomain || tDomain ruids /= qDomain quid
then do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient (toBundle @'OnConversationUpdatedTag reqId origin update) $> Nothing
then fedQueueClient @'OnConversationUpdatedTag update $> Nothing
else pure (Just update)

-- notify local participants and bots
Expand Down
12 changes: 3 additions & 9 deletions services/galley/src/Galley/API/Clients.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import Polysemy.TinyLog qualified as P
import Wire.API.Conversation hiding (Member)
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Routes.MultiTablePaging
import Wire.NotificationSubsystem
import Wire.Sem.Paging.Cassandra (CassandraPaging)
Expand Down Expand Up @@ -139,13 +138,8 @@ rmClientH (usr ::: cid) = do
removeRemoteMLSClients :: Range 1 1000 [Remote ConvId] -> Sem r ()
removeRemoteMLSClients convIds = do
for_ (bucketRemote (fromRange convIds)) $ \remoteConvs ->
let rpc = void $ do
req <- asks (.requestId)
origin <- asks (.originDomain)
let rpc =
fedQueueClient
( toBundle @'OnClientRemovedTag
req
origin
(ClientRemovedRequest usr cid (tUnqualified remoteConvs))
)
@'OnClientRemovedTag
(ClientRemovedRequest usr cid (tUnqualified remoteConvs))
in enqueueNotification Q.Persistent remoteConvs rpc
7 changes: 1 addition & 6 deletions services/galley/src/Galley/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ import Wire.API.Event.Conversation
import Wire.API.Event.LeaveReason
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Error
import Wire.API.Provider.Service hiding (Service)
import Wire.API.Routes.API
Expand Down Expand Up @@ -424,11 +423,7 @@ rmUser lusr conn = do
leaveRemoteConversations cids =
for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do
let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs))
let rpc = void $ do
req <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient $
toBundle @'OnUserDeletedConversationsTag req origin userDelete
let rpc = fedQueueClient @'OnUserDeletedConversationsTag userDelete
enqueueNotification Q.Persistent remoteConvs rpc

-- FUTUREWORK: Add a retry mechanism if there are federation errrors.
Expand Down
36 changes: 15 additions & 21 deletions services/galley/src/Galley/API/MLS/Propagate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import Polysemy.TinyLog hiding (trace)
import Wire.API.Event.Conversation
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.MLS.Credential
import Wire.API.MLS.Message
import Wire.API.MLS.Serialisation
Expand Down Expand Up @@ -89,26 +88,21 @@ propagateMessage qusr mSenderClient lConvOrSub con msg cm = do

-- send to remotes
(either (logRemoteNotificationError @"on-mls-message-sent") (const (pure ())) <=< enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems)) $
\rs -> do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient $
toBundle @'OnMLSMessageSentTag
reqId
origin
( RemoteMLSMessage
{ time = now,
sender = qusr,
metadata = mm,
conversation = qUnqualified qcnv,
subConversation = sconv,
recipients =
Map.fromList $
tUnqualified rs
>>= toList . remoteMemberMLSClients,
message = Base64ByteString msg.raw
}
)
\rs ->
fedQueueClient
@'OnMLSMessageSentTag
RemoteMLSMessage
{ time = now,
sender = qusr,
metadata = mm,
conversation = qUnqualified qcnv,
subConversation = sconv,
recipients =
Map.fromList $
tUnqualified rs
>>= toList . remoteMemberMLSClients,
message = Base64ByteString msg.raw
}
where
cmWithoutSender = maybe cm (flip cmRemoveClient cm . mkClientIdentity qusr) mSenderClient

Expand Down
7 changes: 1 addition & 6 deletions services/galley/src/Galley/API/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ import Wire.API.Event.Conversation
import Wire.API.Federation.API
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client (FederatorClient)
import Wire.API.Federation.Error
import Wire.API.Message
Expand Down Expand Up @@ -699,11 +698,7 @@ sendRemoteMessages domain now sender senderClient lcnv metadata messages = (hand
transient = mmTransient metadata,
recipients = UserClientMap rcpts
}
let rpc = void $ do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient $ toBundle @'OnMessageSentTag reqId origin rm
enqueueNotification Q.Persistent domain rpc
enqueueNotification Q.Persistent domain (fedQueueClient @'OnMessageSentTag rm)
where
handle :: Either FederationError a -> Sem r (Set (UserId, ClientId))
handle (Right _) = pure mempty
Expand Down
7 changes: 2 additions & 5 deletions services/galley/src/Galley/API/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ import Wire.API.Error.Galley
import Wire.API.Event.Conversation
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Error
import Wire.API.Password
import Wire.API.Routes.Public.Galley.Conversation
Expand Down Expand Up @@ -877,10 +876,8 @@ registerRemoteConversationMemberships now lusr lc = deleteOnUnreachable $ do
)
joined

r <- enqueueNotificationsConcurrentlyBuckets Q.Persistent joinedCoupled $ \z -> do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClient $ toBundle @'OnConversationUpdatedTag reqId origin (convUpdateJoin z)
r <- enqueueNotificationsConcurrentlyBuckets Q.Persistent joinedCoupled $ \z ->
fedQueueClient @'OnConversationUpdatedTag (convUpdateJoin z)
either throw (void . pure) r
where
creator :: Maybe UserId
Expand Down
Loading

0 comments on commit 26eaae8

Please sign in to comment.