Skip to content

Commit

Permalink
server: support server roles and operators
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Sep 30, 2024
1 parent a957693 commit e3f2fb3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ joinConn c userId connId hasNewConn enableNtfs cReq cInfo pqSupport subMode = do
srv <- case cReq of
CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ ->
getNextServer c userId [qServer q]
_ -> getSMPServer c userId
_ -> getSMPServer c userId -- TODO when connecting to contact address, also try gettings a different operator/server
joinConnSrv c userId connId hasNewConn enableNtfs cReq cInfo pqSupport subMode srv

startJoinInvitation :: UserId -> ConnId -> Maybe SndQueue -> Bool -> ConnectionRequestUri 'CMInvitation -> PQSupport -> AM (ConnData, NewSndQueue, C.PublicKeyX25519, CR.Ratchet 'C.X448, CR.SndE2ERatchetParams 'C.X448)
Expand Down
62 changes: 46 additions & 16 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ import qualified Data.ByteString.Base64 as B64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (isRight, partitionEithers)
import qualified Data.Foldable as Foldable
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (deleteFirstsBy, find, foldl', partition, (\\))
Expand Down Expand Up @@ -2033,34 +2034,63 @@ userServers c = case protocolTypeI @p of
SPXFTP -> xftpServers c
{-# INLINE userServers #-}

pickServer :: forall p. NonEmpty (ProtoServerWithAuth p) -> AM (ProtoServerWithAuth p)
pickServer :: forall p. NonEmpty (OperatorId, ProtoServerWithAuth p) -> AM (ProtoServerWithAuth p)
pickServer = \case
srv :| [] -> pure srv
(_, srv) :| [] -> pure srv
servers -> do
gen <- asks randomServer
atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))
atomically $ snd . (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))

getNextServer :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> [ProtocolServer p] -> AM (ProtoServerWithAuth p)
getNextServer c userId usedSrvs = withUserServers c userId $ \srvs ->
case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of
Just srvs' -> pickServer srvs'
_ -> pickServer srvs
getNextServer ::
forall p. (ProtocolTypeI p, UserProtocol p) =>
AgentClient ->
UserId ->
(UserServers p -> NonEmpty (OperatorId, ProtoServerWithAuth p)) ->
Set TransportHost ->
AM (ProtoServerWithAuth p)
getNextServer c userId enabledSel usedHosts = withUserServers c userId enabledSel $ \srvs -> do
let (usedSrvs, unusedSrvs) = L.partition (isUsed . snd) srvs
usedOps :: Set OperatorId = foldl' (\s -> (`S.insert` s) . fst) S.empty usedSrvs
unusedOpSrvs = filter ((`S.notMember` usedOps) . fst) unusedSrvs
-- choose from: 1) servers of other operators, 2) other servers, 3) all servers
srvs' = fromMaybe srvs $ L.nonEmpty unusedOpSrvs <|> L.nonEmpty unusedSrvs
pickServer srvs'
where
isUsed (ProtoServerWithAuth ProtocolServer {host} _) = any (`S.member` usedHosts) host

withUserServers :: forall p a. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p) -> AM a) -> AM a
withUserServers c userId action =
withUserServers ::
forall p a. (ProtocolTypeI p, UserProtocol p) =>
AgentClient ->
UserId ->
(UserServers p -> NonEmpty (OperatorId, ProtoServerWithAuth p)) ->
(NonEmpty (OperatorId, ProtoServerWithAuth p) -> AM a) ->
AM a
withUserServers c userId enabledSel action =
liftIO (TM.lookupIO userId $ userServers c) >>= \case
Just srvs -> action $ enabledSrvs srvs
Just srvs -> action $ enabledSel srvs
_ -> throwE $ INTERNAL "unknown userId - no user servers"

withNextSrv :: forall p a. (ProtocolTypeI p, UserProtocol p) => AgentClient -> UserId -> TVar [ProtocolServer p] -> [ProtocolServer p] -> (ProtoServerWithAuth p -> AM a) -> AM a
withNextSrv c userId usedSrvs initUsed action = do
withNextSrv ::
forall p a. (ProtocolTypeI p, UserProtocol p) =>
AgentClient ->
UserId ->
(UserServers p -> NonEmpty (OperatorId, ProtoServerWithAuth p)) ->
TVar (Set TransportHost) ->
Maybe (ProtocolServer p) ->
(ProtoServerWithAuth p -> AM a) ->
AM a
withNextSrv c userId enabledSel usedSrvs initUsed action = do
used <- readTVarIO usedSrvs
srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId used
srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId enabledSel used
srvs_ <- liftIO $ TM.lookupIO userId $ userServers c
let unused = maybe [] ((\\ used) . map protoServer . L.toList . enabledSrvs) srvs_
used' = if null unused then initUsed else srv : used
let unused = undefined -- maybe [] ((\\ used) . map (protoServer . snd) . L.toList . enabledSel) srvs_
used'
| null unused = maybe id addHosts initUsed S.empty
| otherwise = addHosts srv used
atomically $ writeTVar usedSrvs $! used'
action srvAuth
where
addHosts ProtocolServer {host} s = Foldable.foldl' (flip S.insert) s host

incSMPServerStat :: AgentClient -> UserId -> SMPServer -> (AgentSMPServerStats -> TVar Int) -> STM ()
incSMPServerStat c userId srv sel = incSMPServerStat' c userId srv sel 1
Expand Down
41 changes: 30 additions & 11 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module Simplex.Messaging.Agent.Env.SQLite
AgentConfig (..),
InitialAgentServers (..),
ServerCfg (..),
OperatorId,
UserServers (..),
NetworkConfig (..),
presetServerCfg,
Expand Down Expand Up @@ -54,6 +55,8 @@ import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Time.Clock.System (SystemTime (..))
import Data.Word (Word16)
Expand All @@ -71,10 +74,11 @@ import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig)
import Simplex.Messaging.Notifications.Transport (NTFVersion)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (defaultJSON)
import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth, ProtocolServer, ProtocolType (..), ProtocolTypeI, VersionRangeSMPC, XFTPServer, supportedSMPClientVRange)
import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth (..), ProtocolServer (..), ProtocolType (..), ProtocolTypeI, VersionRangeSMPC, XFTPServer, supportedSMPClientVRange)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors')
import System.Mem.Weak (Weak)
import System.Random (StdGen, newStdGen)
Expand All @@ -94,29 +98,42 @@ data InitialAgentServers = InitialAgentServers

data ServerCfg p = ServerCfg
{ server :: ProtoServerWithAuth p,
operator :: OperatorId,
preset :: Bool,
tested :: Maybe Bool,
enabled :: Bool
enabled :: Bool,
roles :: ServerRoles
}
deriving (Show)

enabledServerCfg :: ProtoServerWithAuth p -> ServerCfg p
enabledServerCfg server = ServerCfg {server, preset = False, tested = Nothing, enabled = True}
data ServerRoles = ServerRoles
{ messaging :: Bool,
proxy :: Bool
}
deriving (Show)

enabledServerCfg :: ProtoServerWithAuth p -> OperatorId -> ServerCfg p
enabledServerCfg server operator =
ServerCfg {server, operator, preset = False, tested = Nothing, enabled = True, roles = ServerRoles True True}

presetServerCfg :: Bool -> ProtoServerWithAuth p -> ServerCfg p
presetServerCfg enabled server = ServerCfg {server, preset = True, tested = Nothing, enabled}
presetServerCfg :: Bool -> ServerRoles -> ProtoServerWithAuth p -> OperatorId -> ServerCfg p
presetServerCfg enabled roles server operator =
ServerCfg {server, operator, preset = True, tested = Nothing, enabled, roles}

data UserServers p = UserServers
{ enabledSrvs :: NonEmpty (ProtoServerWithAuth p),
knownSrvs :: NonEmpty (ProtocolServer p)
{ msgSrvs :: NonEmpty (OperatorId, ProtoServerWithAuth p),
pxySrvs :: NonEmpty (OperatorId, ProtoServerWithAuth p),
knownHosts :: Set TransportHost
}

type OperatorId = Int64

-- This function sets all servers as enabled in case all passed servers are disabled.
mkUserServers :: NonEmpty (ServerCfg p) -> UserServers p
mkUserServers srvs = UserServers {enabledSrvs, knownSrvs}
mkUserServers srvs = UserServers {msgSrvs = filterSrvs messaging, pxySrvs = filterSrvs proxy, knownHosts}
where
enabledSrvs = L.map (\ServerCfg {server} -> server) $ fromMaybe srvs $ L.nonEmpty $ L.filter (\ServerCfg {enabled} -> enabled) srvs
knownSrvs = L.map (\ServerCfg {server = ProtoServerWithAuth srv _} -> srv) srvs
filterSrvs role = L.map (\ServerCfg {operator, server} -> (operator, server)) $ fromMaybe srvs $ L.nonEmpty $ L.filter (\ServerCfg {enabled, roles} -> enabled && role roles) srvs
knownHosts = S.unions $ L.map (\ServerCfg {server = ProtoServerWithAuth ProtocolServer {host} _} -> S.fromList $ L.toList host) srvs

data AgentConfig = AgentConfig
{ tcpPort :: Maybe ServiceName,
Expand Down Expand Up @@ -333,6 +350,8 @@ updateRestartCount t (RestartCount minute count) = do

$(pure [])

$(JQ.deriveJSON defaultJSON ''ServerRoles)

instance ProtocolTypeI p => ToJSON (ServerCfg p) where
toEncoding = $(JQ.mkToEncoding defaultJSON ''ServerCfg)
toJSON = $(JQ.mkToJSON defaultJSON ''ServerCfg)
Expand Down

0 comments on commit e3f2fb3

Please sign in to comment.