Skip to content

Commit

Permalink
Extract RabbitMQOpts and the function to create channel in extended (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaymankar authored Jun 14, 2023
1 parent e6c32cb commit 2604b20
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 56 deletions.
22 changes: 22 additions & 0 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Network.AMQP.Extended where
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Control.Retry
import Data.Aeson (FromJSON)
import qualified Data.Text as Text
import Imports
import qualified Network.AMQP as Q
Expand All @@ -21,6 +22,27 @@ data RabbitMqHooks m = RabbitMqHooks
onChannelException :: SomeException -> m ()
}

data RabbitMqOpts = RabbitMqOpts
{ host :: !String,
port :: !Int,
vHost :: !Text
}
deriving (Show, Generic)

instance FromJSON RabbitMqOpts

-- | Useful if the application only pushes into some queues.
mkRabbitMqChannelMVar :: Logger -> RabbitMqOpts -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar l opts = do
chan <- newEmptyMVar
openConnectionWithRetries l opts.host opts.port opts.vHost $
RabbitMqHooks
{ onNewChannel = putMVar chan,
onChannelException = \_ -> void $ tryTakeMVar chan,
onConnectionClose = void $ tryTakeMVar chan
}
pure chan

-- | Connects with RabbitMQ and opens a channel. If the channel is closed for
-- some reasons, reopens the channel. If the connection is closed for some
-- reasons, keeps retrying to connect until it works.
Expand Down
10 changes: 1 addition & 9 deletions services/background-worker/src/Wire/BackgroundWorker/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Wire.BackgroundWorker.Options where
import Data.Aeson
import Data.Domain
import Imports
import Network.AMQP.Extended
import System.Logger.Extended
import Util.Options

Expand All @@ -16,12 +17,3 @@ data Opts = Opts
deriving (Show, Generic)

instance FromJSON Opts

data RabbitMqOpts = RabbitMqOpts
{ host :: !String,
port :: !Int,
vHost :: !Text
}
deriving (Show, Generic)

instance FromJSON RabbitMqOpts
15 changes: 1 addition & 14 deletions services/brig/src/Brig/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ import qualified Database.Bloodhound as ES
import HTTP2.Client.Manager (Http2Manager, http2ManagerWithSSLCtx)
import Imports
import qualified Network.AMQP as Q
import Network.AMQP.Extended (RabbitMqHooks (RabbitMqHooks))
import qualified Network.AMQP.Extended as Q
import Network.HTTP.Client (responseTimeoutMicro)
import Network.HTTP.Client.OpenSSL
Expand Down Expand Up @@ -259,7 +258,7 @@ newEnv o = do
Log.info lgr $ Log.msg (Log.val "randomPrekeys: not active; using dynamoDB instead.")
pure Nothing
kpLock <- newMVar ()
rabbitChan <- mkRabbitMqChannel lgr o
rabbitChan <- traverse (Q.mkRabbitMqChannelMVar lgr) o.rabbitmq
pure $!
Env
{ _cargohold = mkEndpoint $ Opt.cargohold o,
Expand Down Expand Up @@ -312,18 +311,6 @@ newEnv o = do
pure (Nothing, Just smtp)
mkEndpoint service = RPC.host (encodeUtf8 (service ^. epHost)) . RPC.port (service ^. epPort) $ RPC.empty

mkRabbitMqChannel :: Logger -> Opts -> IO (Maybe (MVar Q.Channel))
mkRabbitMqChannel l (Opt.rabbitmq -> Just Opt.RabbitMqOpts {..}) = do
chan <- newEmptyMVar
Q.openConnectionWithRetries l host port vHost $
RabbitMqHooks
{ onNewChannel = putMVar chan,
onChannelException = \_ -> void $ tryTakeMVar chan,
onConnectionClose = void $ tryTakeMVar chan
}
pure $ Just chan
mkRabbitMqChannel _ _ = pure Nothing

mkIndexEnv :: Opts -> Logger -> Manager -> Metrics -> Endpoint -> IndexEnv
mkIndexEnv o lgr mgr mtr galleyEndpoint =
let bhe = ES.mkBHEnv (ES.Server (Opt.url (Opt.elasticsearch o))) mgr
Expand Down
10 changes: 1 addition & 9 deletions services/brig/src/Brig/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import Data.Yaml (FromJSON (..), ToJSON (..), (.:), (.:?))
import qualified Data.Yaml as Y
import Galley.Types.Teams (unImplicitLockStatus)
import Imports
import Network.AMQP.Extended
import qualified Network.DNS as DNS
import System.Logger.Extended (Level, LogFormat)
import Util.Options
Expand Down Expand Up @@ -95,15 +96,6 @@ data ElasticSearchOpts = ElasticSearchOpts

instance FromJSON ElasticSearchOpts

data RabbitMqOpts = RabbitMqOpts
{ host :: !String,
port :: !Int,
vHost :: !Text
}
deriving (Show, Generic)

instance FromJSON RabbitMqOpts

data AWSOpts = AWSOpts
{ -- | Event journal queue for user events
-- (e.g. user deletion)
Expand Down
3 changes: 2 additions & 1 deletion services/galley/src/Galley/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import qualified Galley.Queue as Q
import qualified Galley.Types.Teams as Teams
import HTTP2.Client.Manager (Http2Manager, http2ManagerWithSSLCtx)
import Imports hiding (forkIO)
import Network.AMQP.Extended
import Network.HTTP.Client (responseTimeoutMicro)
import Network.HTTP.Client.OpenSSL
import qualified Network.Wai.Utilities.Error as Wai
Expand Down Expand Up @@ -166,7 +167,7 @@ createEnv m o = do
<*> initExtEnv
<*> maybe (pure Nothing) (fmap Just . Aws.mkEnv l mgr) (o ^. optJournal)
<*> loadAllMLSKeys (fold (o ^. optSettings . setMlsPrivateKeyPaths))
<*> mkRabbitMqChannel l o
<*> traverse (mkRabbitMqChannelMVar l) (o ^. optRabbitmq)

initCassandra :: Opts -> Logger -> IO ClientState
initCassandra o l = do
Expand Down
13 changes: 0 additions & 13 deletions services/galley/src/Galley/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import qualified Galley.Queue as Q
import HTTP2.Client.Manager (Http2Manager)
import Imports
import qualified Network.AMQP as Q
import qualified Network.AMQP.Extended as Q
import Network.HTTP.Client
import Network.HTTP.Client.OpenSSL
import OpenSSL.EVP.Digest
Expand Down Expand Up @@ -108,15 +107,3 @@ currentFanoutLimit o = do
let optFanoutLimit = fromIntegral . fromRange $ fromMaybe defFanoutLimit (o ^. (optSettings . setMaxFanoutSize))
let maxTeamSize = fromIntegral (o ^. (optSettings . setMaxTeamSize))
unsafeRange (min maxTeamSize optFanoutLimit)

mkRabbitMqChannel :: Logger -> Opts -> IO (Maybe (MVar Q.Channel))
mkRabbitMqChannel l (view optRabbitmq -> Just RabbitMqOpts {..}) = do
chan <- newEmptyMVar
Q.openConnectionWithRetries l _rabbitmqHost _rabbitmqPort _rabbitmqVHost $
Q.RabbitMqHooks
{ onNewChannel = putMVar chan,
onChannelException = \_ -> void $ tryTakeMVar chan,
onConnectionClose = void $ tryTakeMVar chan
}
pure $ Just chan
mkRabbitMqChannel _ _ = pure Nothing
11 changes: 1 addition & 10 deletions services/galley/src/Galley/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ module Galley.Options
defDeleteConvThrottleMillis,
defFanoutLimit,
JournalOpts (JournalOpts),
RabbitMqOpts (..),
awsQueueName,
awsEndpoint,
Opts,
Expand Down Expand Up @@ -66,6 +65,7 @@ import Data.Range
import Galley.Keys
import Galley.Types.Teams
import Imports
import Network.AMQP.Extended
import System.Logger.Extended (Level, LogFormat)
import Util.Options
import Util.Options.Common
Expand Down Expand Up @@ -147,15 +147,6 @@ deriveFromJSON toOptionFieldName ''JournalOpts

makeLenses ''JournalOpts

data RabbitMqOpts = RabbitMqOpts
{ _rabbitmqHost :: !String,
_rabbitmqPort :: !Int,
_rabbitmqVHost :: !Text
}
deriving (Show, Generic)

deriveFromJSON toOptionFieldName ''RabbitMqOpts

data Opts = Opts
{ -- | Host and port to bind to
_optGalley :: !Endpoint,
Expand Down

0 comments on commit 2604b20

Please sign in to comment.