Skip to content

Commit

Permalink
Revert "Remove dead code."
Browse files Browse the repository at this point in the history
This reverts commit 62d91e6.
  • Loading branch information
fisx committed Aug 22, 2023
1 parent fdc380a commit 25b368c
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
33 changes: 33 additions & 0 deletions services/background-worker/src/Wire/BackendNotificationPusher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import Data.Aeson qualified as A
import Data.Domain
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Data.Text qualified as Text
import Imports
import Network.AMQP (cancelConsumer)
import Network.AMQP qualified as Q
import Network.AMQP.Extended
import Network.AMQP.Lifted qualified as QL
import Network.RabbitMqAdmin
import Prometheus
import System.Logger.Class qualified as Log
import UnliftIO
Expand Down Expand Up @@ -184,6 +186,37 @@ ensureConsumer consumers chan domain = do
-- let us come down this path if there is an old consumer.
liftIO $ forM_ oldTag $ Q.cancelConsumer chan . fst

getRemoteDomains :: AppT IO [Domain]
getRemoteDomains = do
-- Jittered exponential backoff with 10ms as starting delay and 60s as max
-- cumulative delay. When this is reached, the operation fails.
--
-- FUTUREWORK: Pull these numbers into config
let policy = limitRetriesByCumulativeDelay 60_000_000 $ fullJitterBackoff 10000
logErrr willRetry (SomeException e) rs =
Log.err $
Log.msg (Log.val "Exception occurred while refreshig domains")
. Log.field "error" (displayException e)
. Log.field "willRetry" willRetry
. Log.field "retryCount" rs.rsIterNumber
handlers =
skipAsyncExceptions
<> [logRetries (const $ pure True) logErrr]
recovering policy handlers $ const go
where
go :: AppT IO [Domain]
go = do
client <- asks rabbitmqAdminClient
vhost <- asks rabbitmqVHost
queues <- liftIO $ listQueuesByVHost client vhost
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues
catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
logInvalidDomain d e =
Log.warn $
Log.msg (Log.val "Found invalid domain in a backend notifications queue name")
. Log.field "queue" ("backend-notifications." <> d)
. Log.field "error" e

startWorker :: RabbitMqAdminOpts -> AppT IO (IORef (Maybe Q.Channel), IORef (Map Domain (Q.ConsumerTag, MVar ())))
startWorker rabbitmqOpts = do
env <- ask
Expand Down
7 changes: 7 additions & 0 deletions services/background-worker/src/Wire/Defederation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Control.Monad.Catch
import Control.Retry
import Data.Aeson qualified as A
import Data.ByteString.Conversion
import Data.Domain
import Data.Text (unpack)
import Data.Text.Encoding
import Imports
Expand All @@ -19,6 +20,7 @@ import Servant.Client (BaseUrl (..), ClientEnv, Scheme (Http), mkClientEnv)
import System.Logger.Class qualified as Log
import Util.Options
import Wire.API.Federation.BackendNotifications
import Wire.API.Routes.FederationDomainConfig qualified as Fed
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Util

Expand Down Expand Up @@ -56,6 +58,11 @@ mkBrigEnv = do
<$> asks httpManager
<*> pure (BaseUrl Http (unpack brigHost) (fromIntegral brigPort) "")

getRemoteDomains :: AppT IO [Domain]
getRemoteDomains = do
ref <- asks remoteDomains
fmap Fed.domain . Fed.remotes <$> readIORef ref

callGalleyDelete ::
( MonadReader Env m,
MonadMask m,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

module Test.Wire.BackendNotificationPusherSpec where

import Control.Concurrent.Chan
import Control.Exception
import Control.Monad.Trans.Except
import Data.Aeson qualified as Aeson
Expand All @@ -17,6 +18,7 @@ import Data.Text.Encoding qualified as Text
import Federator.MockServer
import Imports
import Network.AMQP qualified as Q
import Network.HTTP.Client (defaultManagerSettings, newManager, responseTimeoutNone)
import Network.HTTP.Media
import Network.HTTP.Types
import Network.RabbitMqAdmin
Expand All @@ -29,15 +31,18 @@ import Servant.Client.Core
import Servant.Client.Internal.HttpClient (mkFailureResponse)
import Servant.Server.Generic
import Servant.Types.SourceT
import System.Logger.Class qualified as Logger
import Test.Hspec
import Test.QuickCheck
import Test.Wire.Util
import UnliftIO.Async
import Util.Options
import Wire.API.Federation.API
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Common
import Wire.API.Federation.BackendNotifications
import Wire.API.RawJson
import Wire.API.Routes.FederationDomainConfig
import Wire.BackendNotificationPusher
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Options
Expand Down Expand Up @@ -202,6 +207,75 @@ spec = do
getVectorWith env.backendNotificationMetrics.stuckQueuesGauge getGauge
`shouldReturn` [(domainText targetDomain, 0)]

describe "getRemoteDomains" $ do
it "should parse remoteDomains from queues with name starting with 'backend-notifications.' and ignore the other queues" $ do
mockAdmin <-
newMockRabbitMqAdmin
False
[ "backend-notifications.foo.example",
"backend-notifications.bar.example",
"some-other-queue",
"backend-notifications.baz.example",
"backend-notifications.not-a-domain"
]
logger <- Logger.new Logger.defSettings
httpManager <- newManager defaultManagerSettings
remoteDomains <- newIORef defFederationDomainConfigs
remoteDomainsChan <- newChan
notificationChannel <- newEmptyMVar
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
statuses = undefined
metrics = undefined
rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
galley = Endpoint "localhost" 8085
brig = Endpoint "localhost" 8082
backendNotificationsConfig = BackendNotificationsConfig 1000 500000

backendNotificationMetrics <- mkBackendNotificationMetrics
domains <- runAppT Env {..} getRemoteDomains
domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"]
readTVarIO mockAdmin.listQueuesVHostCalls `shouldReturn` ["test-vhost"]

it "should retry fetching domains if a request fails" $ do
mockAdmin <- newMockRabbitMqAdmin True ["backend-notifications.foo.example"]
logger <- Logger.new Logger.defSettings
httpManager <- newManager defaultManagerSettings
remoteDomains <- newIORef defFederationDomainConfigs
remoteDomainsChan <- newChan
notificationChannel <- newEmptyMVar
let federatorInternal = Endpoint "localhost" 8097
http2Manager = undefined
statuses = undefined
metrics = undefined
rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin
rabbitmqVHost = "test-vhost"
defederationTimeout = responseTimeoutNone
galley = Endpoint "localhost" 8085
brig = Endpoint "localhost" 8082
backendNotificationsConfig = BackendNotificationsConfig 1000 500000

backendNotificationMetrics <- mkBackendNotificationMetrics
domainsThread <- async $ runAppT Env {..} getRemoteDomains

-- Wait for first call
untilM (readTVarIO mockAdmin.listQueuesVHostCalls >>= \calls -> pure $ not $ null calls)

-- Unbreak the API
atomically $ writeTVar mockAdmin.broken False

-- Now the thread can finish
wait domainsThread `shouldReturn` [Domain "foo.example"]

calls <- readTVarIO mockAdmin.listQueuesVHostCalls

-- This test cannot guarantee how many times retries happened due to the
-- concurrency, so just assert that there were at least 2 calls.
calls `shouldSatisfy` (\c -> length c >= 2)
mapM_ (\vhost -> vhost `shouldBe` rabbitmqVHost) calls

untilM :: (Monad m) => m Bool -> m ()
untilM action = do
b <- action
Expand Down

0 comments on commit 25b368c

Please sign in to comment.