Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WPB-183] Version federation API queue notifications #3831

Merged
merged 52 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
502fb55
Define the version range type
mdimjasevic Jan 18, 2024
7f2a316
Pass the version range to a BackendNotification
mdimjasevic Jan 18, 2024
7211937
Utility: compute a version in common
mdimjasevic Jan 19, 2024
86c617a
Parameterise version negotiation
mdimjasevic Jan 24, 2024
39f8cac
Define the PayloadBundle type
mdimjasevic Jan 24, 2024
f2da0fe
Convert a call to use fedQueueClientBundle
mdimjasevic Jan 19, 2024
0b8a3b4
Move fedNotifToBackendNotif next to BackendNotification
mdimjasevic Jan 25, 2024
bfb3425
Parse a queue message as a PayloadBundle
mdimjasevic Jan 24, 2024
3b121f9
Remove targetDomain from PayloadBundle
mdimjasevic Jan 26, 2024
0ab7dc8
Remove originDomain from PayloadBundle
mdimjasevic Jan 26, 2024
1592554
instance Semigroup PayloadBundle
mdimjasevic Jan 26, 2024
b6d3812
TODO note: refactoring
mdimjasevic Jan 26, 2024
a29dea5
More focused signature of toBundle
mdimjasevic Jan 26, 2024
5934575
Make all fedQueue endpoints use the new client
mdimjasevic Jan 26, 2024
6862fca
Test: dequeueing a payload bundle
mdimjasevic Jan 26, 2024
df4e34c
Add a futurework note on dropping support for parsing backend
mdimjasevic Jan 29, 2024
f4dd8ce
Pull out mostRecentNotif into a standalone function
mdimjasevic Jan 29, 2024
400dc6f
Fix Nix dependencies
mdimjasevic Jan 29, 2024
0a3b9aa
A TODO and a FUTUREWORK note
mdimjasevic Jan 29, 2024
c7f5de4
Unit tests for mostRecentNotif.
mdimjasevic Jan 29, 2024
8ba0c2c
Refactor VersionRange
fisx Jan 29, 2024
3619b54
Slight refactoring of mostRecentNotif
mdimjasevic Jan 29, 2024
bd73ba6
Move mostRecentTuple
mdimjasevic Jan 29, 2024
16d520c
Disable a deprecation warning
mdimjasevic Jan 30, 2024
12889ae
Inline reqOrigin
pcapriotti Feb 2, 2024
8ce41a2
Remove redundant maxBound
pcapriotti Feb 2, 2024
e9f833a
Introduce change in notification API
pcapriotti Feb 2, 2024
6048ede
Clean up backend notification effect
pcapriotti Feb 5, 2024
e9ce727
Refactor fedQueueClient API
pcapriotti Feb 5, 2024
71958cf
Make ConversationUpdate request versioned
pcapriotti Feb 5, 2024
532d05b
Expose v0 notification endpoint
pcapriotti Feb 9, 2024
75bd0e9
Fatal logs for version mismatch in pusher
pcapriotti Feb 12, 2024
d629922
Add TODOs
pcapriotti Feb 13, 2024
6d9ac81
Add notification mods and generate version ranges
pcapriotti Feb 13, 2024
2cf8c86
Use existential to lift components to type level
pcapriotti Feb 13, 2024
126f18c
Skip negotiation when fetching remote versions
pcapriotti Feb 13, 2024
914f54e
Move notification change to V2
pcapriotti Feb 14, 2024
68a2060
Test conversation update on V0
pcapriotti Feb 14, 2024
1c0bfd0
Parse remote versions liberally
pcapriotti Feb 14, 2024
d3c28e4
Support broken fed API version negotiation
pcapriotti Feb 15, 2024
1235397
Turn on-conversation-updated uses into notifs
pcapriotti Feb 16, 2024
68c38c1
Refactor notification version negotiation
pcapriotti Feb 16, 2024
9ef54e8
Add CHANGELOG entries
fisx Feb 2, 2024
521e13f
Lint
pcapriotti Feb 16, 2024
4a7795b
Fix ConversationUpdate golden tests
pcapriotti Feb 16, 2024
4f464db
Remove federation version V2
pcapriotti Feb 19, 2024
d060253
Remove obsolete tests from galley integration
pcapriotti Feb 20, 2024
2aad799
Remove commented out code
pcapriotti Feb 29, 2024
af44567
Refactor mock federator arguments
pcapriotti Mar 1, 2024
0caa42d
Unit-test pusher version negotiation
pcapriotti Mar 4, 2024
83dcf79
Ignore version mismatches when pushing
pcapriotti Mar 4, 2024
f70d713
Rename test case
pcapriotti Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/6-federation/on-conversation-updated-async
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The on-conversation-updated notification is now queued instead of being sent directly. A new version of the notification has been introduced with a different JSON format for the body, mostly for testing purposes of the versioning system.

Since the notification is now sent asynchronously, some error conditions in case of unreachable backends cannot be triggered anymore.
2 changes: 2 additions & 0 deletions changelog.d/6-federation/wpb-183-versioned-async-b2b
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Versioning of backend to backend notifications. Notifications are now stored in "bundles" containing a serialised payload for each supported version. The background worker then dynamically selects the best version to use and sends only the notification corresponding to that version.

2 changes: 2 additions & 0 deletions deploy/dockerephemeral/federation-v0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ services:
networks:
- demo_wire
- coredns
extra_hosts:
- "host.docker.internal.:host-gateway"
ports:
- '127.0.0.1:21097:8080'
- '127.0.0.1:21098:8081'
Expand Down
14 changes: 14 additions & 0 deletions integration/test/Test/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -850,3 +850,17 @@ testGuestLinksExpired = do
liftIO $ threadDelay (1_100_000)
bindResponse (getJoinCodeConv tm k v) $ \resp -> do
resp.status `shouldMatchInt` 404

testConversationWithFedV0 :: HasCallStack => App ()
testConversationWithFedV0 = do
alice <- randomUser OwnDomain def
bob <- randomUser FedV0Domain def
withAPIVersion 4 $ connectTwoUsers alice bob

conv <-
postConversation alice (defProteus {qualifiedUsers = [bob]})
>>= getJSON 201

withWebSocket bob $ \ws -> do
void $ changeConversationName alice conv "foobar" >>= getJSON 200
void $ awaitMatch isConvNameChangeNotif ws
4 changes: 4 additions & 0 deletions integration/test/Testlib/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Testlib.Env where

import Control.Monad.Codensity
import Control.Monad.IO.Class
import Control.Monad.Reader
import Data.Default
import Data.Function ((&))
import Data.Functor
Expand Down Expand Up @@ -184,3 +185,6 @@ mkMLSState = Codensity $ \k ->
ciphersuite = def,
protocol = MLSProtocolMLS
}

withAPIVersion :: Int -> App a -> App a
withAPIVersion v = local $ \e -> e {defaultAPIVersion = v}
46 changes: 28 additions & 18 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ module Wire.API.Federation.API
HasUnsafeFedEndpoint,
fedClient,
fedQueueClient,
sendBundle,
fedClientIn,
unsafeFedClientIn,
module Wire.API.MakesFederatedCall,

-- * Re-exports
Component (..),
makeConversationUpdateBundle,
)
where

Expand All @@ -45,6 +47,7 @@ import Servant.Client.Core
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Cargohold
import Wire.API.Federation.API.Galley
import Wire.API.Federation.API.Util
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
Expand Down Expand Up @@ -88,21 +91,21 @@ fedClient ::
Client m api
fedClient = clientIn (Proxy @api) (Proxy @m)

fedQueueClient ::
forall {k} (tag :: k).
( HasNotificationEndpoint tag,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent k),
ToJSON (Payload tag)
) =>
Payload tag ->
FedQueueClient (NotificationComponent k) ()
fedQueueClient payload = do
fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
(HasFedEndpoint comp api name, HasClient m api) =>
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)

sendBundle ::
KnownComponent c =>
PayloadBundle c ->
FedQueueClient c ()
sendBundle bundle = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.requestId env.originDomain payload
msg =
let msg =
newMsg
{ msgBody = encode notif,
{ msgBody = encode bundle,
msgDeliveryMode = Just (env.deliveryMode),
msgContentType = Just "application/json"
}
Expand All @@ -112,11 +115,18 @@ fedQueueClient payload = do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedClientIn ::
forall (comp :: Component) (name :: Symbol) m api.
(HasFedEndpoint comp api name, HasClient m api) =>
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)
fedQueueClient ::
forall {k} (tag :: k) c.
( HasNotificationEndpoint tag,
HasVersionRange tag,
HasFedPath tag,
KnownComponent (NotificationComponent k),
ToJSON (Payload tag),
c ~ NotificationComponent k
) =>
Payload tag ->
FedQueueClient c ()
fedQueueClient payload = sendBundle =<< makeBundle @tag payload

-- | Like 'fedClientIn', but doesn't propagate a 'CallsFed' constraint. Intended
-- to be used in test situations only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ import Wire.API.Conversation.Action
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.Federation.Version
import Wire.API.MLS.SubConversation
import Wire.API.Message
import Wire.API.Routes.Version (From, Until)
import Wire.API.Util.Aeson
import Wire.Arbitrary

data GalleyNotificationTag
= OnClientRemovedTag
| OnMessageSentTag
| OnMLSMessageSentTag
| OnConversationUpdatedTagV0
| OnConversationUpdatedTag
| OnUserDeletedConversationsTag
deriving (Show, Eq, Generic, Bounded, Enum)
Expand All @@ -66,9 +69,16 @@ instance HasNotificationEndpoint 'OnMLSMessageSentTag where

-- used by the backend that owns a conversation to inform this backend of
-- changes to the conversation
instance HasNotificationEndpoint 'OnConversationUpdatedTagV0 where
type Payload 'OnConversationUpdatedTagV0 = ConversationUpdateV0
type NotificationPath 'OnConversationUpdatedTagV0 = "on-conversation-updated"
type NotificationVersionTag 'OnConversationUpdatedTagV0 = 'Just 'V0
type NotificationMods 'OnConversationUpdatedTagV0 = '[Until 'V1]

instance HasNotificationEndpoint 'OnConversationUpdatedTag where
type Payload 'OnConversationUpdatedTag = ConversationUpdate
type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated"
type NotificationMods 'OnConversationUpdatedTag = '[From 'V1]

instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where
type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification
Expand All @@ -79,6 +89,7 @@ type GalleyNotificationAPI =
NotificationFedEndpoint 'OnClientRemovedTag
:<|> NotificationFedEndpoint 'OnMessageSentTag
:<|> NotificationFedEndpoint 'OnMLSMessageSentTag
:<|> NotificationFedEndpoint 'OnConversationUpdatedTagV0
:<|> NotificationFedEndpoint 'OnConversationUpdatedTag
:<|> NotificationFedEndpoint 'OnUserDeletedConversationsTag

Expand Down Expand Up @@ -129,7 +140,7 @@ data RemoteMLSMessage = RemoteMLSMessage

instance ToSchema RemoteMLSMessage

data ConversationUpdate = ConversationUpdate
data ConversationUpdateV0 = ConversationUpdateV0
{ cuTime :: UTCTime,
cuOrigUserId :: Qualified UserId,
-- | The unqualified ID of the conversation where the update is happening.
Expand All @@ -147,12 +158,56 @@ data ConversationUpdate = ConversationUpdate
}
deriving (Eq, Show, Generic)

instance ToJSON ConversationUpdateV0

instance FromJSON ConversationUpdateV0

instance ToSchema ConversationUpdateV0

data ConversationUpdate = ConversationUpdate
{ time :: UTCTime,
origUserId :: Qualified UserId,
-- | The unqualified ID of the conversation where the update is happening.
-- The ID is local to the sender to prevent putting arbitrary domain that
-- is different than that of the backend making a conversation membership
-- update request.
convId :: ConvId,
-- | A list of users from the receiving backend that need to be sent
-- notifications about this change. This is required as we do not expect a
-- non-conversation owning backend to have an indexed mapping of
-- conversation to users.
alreadyPresentUsers :: [UserId],
-- | Information on the specific action that caused the update.
action :: SomeConversationAction
}
deriving (Eq, Show, Generic)

instance ToJSON ConversationUpdate

instance FromJSON ConversationUpdate

instance ToSchema ConversationUpdate

conversationUpdateToV0 :: ConversationUpdate -> ConversationUpdateV0
conversationUpdateToV0 cu =
ConversationUpdateV0
{ cuTime = cu.time,
cuOrigUserId = cu.origUserId,
cuConvId = cu.convId,
cuAlreadyPresentUsers = cu.alreadyPresentUsers,
cuAction = cu.action
}

conversationUpdateFromV0 :: ConversationUpdateV0 -> ConversationUpdate
conversationUpdateFromV0 cu =
ConversationUpdate
{ time = cu.cuTime,
origUserId = cu.cuOrigUserId,
convId = cu.cuConvId,
alreadyPresentUsers = cu.cuAlreadyPresentUsers,
action = cu.cuAction
}

type UserDeletedNotificationMaxConvs = 1000

data UserDeletedConversationsNotification = UserDeletedConversationsNotification
Expand Down
29 changes: 29 additions & 0 deletions libs/wire-api-federation/src/Wire/API/Federation/API/Util.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2023 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.API.Federation.API.Util where

import Imports
import Wire.API.Federation.API.Galley.Notifications
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Component

makeConversationUpdateBundle ::
ConversationUpdate ->
FedQueueClient 'Galley (PayloadBundle 'Galley)
makeConversationUpdateBundle update =
(<>) <$> makeBundle update <*> makeBundle (conversationUpdateToV0 update)
Loading
Loading