Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ntf server: better batching and logging #780

Merged
merged 8 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/Simplex/FileTransfer/Client/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Char (toLower)
import Data.Function (on)
import Data.Int (Int64)
import Data.List (foldl', groupBy, sortOn)
import Data.List (foldl', sortOn)
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
Expand All @@ -66,7 +65,7 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.Server.CLI (getCliCommand')
import Simplex.Messaging.Util (ifM, tshow, whenM)
import Simplex.Messaging.Util (groupAllOn, ifM, tshow, whenM)
import System.Exit (exitFailure)
import System.FilePath (splitFileName, (</>))
import System.IO.Temp (getCanonicalTemporaryDirectory)
Expand Down Expand Up @@ -316,7 +315,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers)
srvs <- liftIO $ replicateM (length chunks) $ getXFTPServer gen xftpSrvs
let thd3 (_, _, x) = x
chunks' = groupBy ((==) `on` thd3) $ sortOn thd3 $ zip3 [1 ..] chunks srvs
chunks' = groupAllOn thd3 $ zip3 [1 ..] chunks srvs
-- TODO shuffle/unshuffle chunks
-- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though
-- upload doesn't allow other requests within the same client until complete (but download does allow).
Expand Down Expand Up @@ -428,7 +427,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
liftIO $ printNoNewLine "Downloading file..."
downloadedChunks <- newTVarIO []
let srv FileChunk {replicas} = server (head replicas :: FileChunkReplica)
srvChunks = groupBy ((==) `on` srv) $ sortOn srv chunks
srvChunks = groupAllOn srv chunks
chunkPaths <- map snd . sortOn fst . concat <$> pooledForConcurrentlyN 16 srvChunks (mapM $ downloadFileChunk a encPath size downloadedChunks)
encDigest <- liftIO $ LC.sha512Hash <$> readChunks chunkPaths
when (encDigest /= unFileDigest digest) $ throwError $ CLIError "File digest mismatch"
Expand Down
11 changes: 4 additions & 7 deletions src/Simplex/FileTransfer/Description.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Function (on)
import Data.Int (Int64)
import Data.List (foldl', groupBy, sortOn)
import Data.List (foldl', sortOn)
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
Expand All @@ -59,7 +58,7 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (XFTPServer)
import Simplex.Messaging.Util (bshow, (<$?>))
import Simplex.Messaging.Util (bshow, groupAllOn, (<$?>))

data FileDescription (p :: FileParty) = FileDescription
{ party :: SFileParty p,
Expand Down Expand Up @@ -258,17 +257,15 @@ instance (ToField a) => ToField (FileSize a) where toField (FileSize s) = toFiel

groupReplicasByServer :: FileSize Word32 -> [FileChunk] -> [[FileServerReplica]]
groupReplicasByServer defChunkSize =
groupBy ((==) `on` replicaServer)
. sortOn replicaServer
. unfoldChunksToReplicas defChunkSize
groupAllOn replicaServer . unfoldChunksToReplicas defChunkSize

encodeFileReplicas :: FileSize Word32 -> [FileChunk] -> [YAMLServerReplicas]
encodeFileReplicas defChunkSize =
map encodeServerReplicas . groupReplicasByServer defChunkSize
where
encodeServerReplicas fs =
YAMLServerReplicas
{ server = replicaServer $ head fs, -- groupBy guarantees that fs is not empty
{ server = replicaServer $ head fs, -- groupAllOn guarantees that fs is not empty
chunks = map (B.unpack . encodeServerReplica) fs
}

Expand Down
7 changes: 3 additions & 4 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,10 @@ import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import Data.Char (toLower)
import Data.Function (on)
import Data.Functor (($>))
import Data.IORef
import Data.Int (Int64)
import Data.List (foldl', groupBy, intercalate, sortBy)
import Data.List (foldl', intercalate, sortBy)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
Expand Down Expand Up @@ -250,7 +249,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, dropPrefix, fromTextField_, s
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, groupOn, ($>>=), (<$$>))
import Simplex.Messaging.Version
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
import System.Exit (exitFailure)
Expand Down Expand Up @@ -1092,7 +1091,7 @@ insertedRowId db = fromOnly . head <$> DB.query_ db "SELECT last_insert_rowid()"

getPendingCommands :: DB.Connection -> ConnId -> IO [(Maybe SMPServer, [AsyncCmdId])]
getPendingCommands db connId = do
map (\ids -> (fst $ head ids, map snd ids)) . groupBy ((==) `on` fst) . map srvCmdId
map (\ids -> (fst $ head ids, map snd ids)) . groupOn fst . map srvCmdId
<$> DB.query
db
[sql|
Expand Down
15 changes: 9 additions & 6 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ data ProtocolClientConfig = ProtocolClientConfig
defaultTransport :: (ServiceName, ATransport),
-- | network configuration
networkConfig :: NetworkConfig,
-- | SMP client-server protocol version range
smpServerVRange :: VersionRange
-- | client-server protocol version range
serverVRange :: VersionRange,
-- | delay between sending batches of commands (microseconds)
batchDelay :: Maybe Int
}

-- | Default protocol client configuration.
Expand All @@ -230,7 +232,8 @@ defaultClientConfig =
{ qSize = 64,
defaultTransport = ("443", transport @TLS),
networkConfig = defaultNetworkConfig,
smpServerVRange = supportedSMPServerVRange
serverVRange = supportedSMPServerVRange,
batchDelay = Nothing
}

data Request err msg = Request
Expand Down Expand Up @@ -276,7 +279,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId)
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall err msg. Protocol err msg => TransportSession msg -> ProtocolClientConfig -> Maybe (TBQueue (ServerTransmission msg)) -> (ProtocolClient err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient err msg))
getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, smpServerVRange} msgQ disconnected = do
getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, serverVRange, batchDelay} msgQ disconnected = do
case chooseTransportHost networkConfig (host srv) of
Right useHost ->
(atomically (mkProtocolClient useHost) >>= runClient useTransport useHost)
Expand Down Expand Up @@ -329,7 +332,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,

client :: forall c. Transport c => TProxy c -> PClient err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient err msg)) -> c -> IO ()
client _ c cVar h =
runExceptT (protocolClientHandshake @err @msg h (keyHash srv) smpServerVRange) >>= \case
runExceptT (protocolClientHandshake @err @msg h (keyHash srv) serverVRange) >>= \case
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
Right th@THandle {sessionId, thVersion} -> do
sessionTs <- getCurrentTime
Expand All @@ -341,7 +344,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,
`finally` disconnected c'

send :: Transport c => ProtocolClient err msg -> THandle c -> IO ()
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h batchDelay

receive :: Transport c => ProtocolClient err msg -> THandle c -> IO ()
receive ProtocolClient {client_ = PClient {rcvQ}} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ
Expand Down
56 changes: 24 additions & 32 deletions src/Simplex/Messaging/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import Control.Logger.Simple
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Data.Bifunctor (first)
import Data.Bifunctor (first, bimap)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.List (find, partition)
import Data.Either (partitionEithers)
import Data.List (partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (listToMaybe)
import Data.Set (Set)
import Data.Text.Encoding
import Data.Tuple (swap)
Expand All @@ -43,16 +45,15 @@ import UnliftIO (async)
import UnliftIO.Exception (Exception)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Data.Either (isLeft)

type SMPClientVar = TMVar (Either SMPClientError SMPClient)

data SMPClientAgentEvent
= CAConnected SMPServer
| CADisconnected SMPServer (Set SMPSub)
| CAReconnected SMPServer
| CAResubscribed SMPServer SMPSub
| CASubError SMPServer SMPSub SMPClientError
| CAResubscribed SMPServer (NonEmpty SMPSub)
| CASubError SMPServer (NonEmpty (SMPSub, SMPClientError))

data SMPSubParty = SPRecipient | SPNotifier
deriving (Eq, Ord, Show)
Expand Down Expand Up @@ -208,45 +209,36 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
reconnectClient :: ExceptT SMPClientError IO ()
reconnectClient = do
withSMP ca srv $ \smp -> do
liftIO . notify $ CAReconnected srv
liftIO $ notify $ CAReconnected srv
cs_ <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca)
forM_ cs_ $ \cs -> do
subs' <- filterM (fmap not . atomically . hasSub (srvSubs ca) srv . fst) $ M.assocs cs
let (nSubs, rSubs) = partition (isNotifier . fst . fst) subs'
nRs <- liftIO $ subscribe_ smp SPNotifier nSubs
rRs <- liftIO $ subscribe_ smp SPRecipient rSubs
case find isLeft $ nRs <> rRs of
Just (Left e) -> throwE e
_ -> pure ()
subscribe_ smp SPNotifier nSubs
subscribe_ smp SPRecipient rSubs
where
isNotifier = \case
SPNotifier -> True
SPRecipient -> False

subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> IO [Either SMPClientError ()]
subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> ExceptT SMPClientError IO ()
subscribe_ smp party subs =
case L.nonEmpty subs of
Just subs' -> do
let subs'' = L.map (first snd) subs'
rs <- L.zip subs'' <$> smpSubscribeQueues party ca smp srv subs''
rs' <- forM rs $ \(sub, r) -> do
let sub' = first (party,) sub
s = fst sub'
case snd r of
Right () -> do
atomically $ addSubscription ca srv sub'
notify $ CAResubscribed srv s
pure $ Right ()
Left e -> do
case e of
PCEResponseTimeout -> pure $ Left e
PCENetworkError -> pure $ Left e
_ -> do
notify $ CASubError srv s e
atomically $ removePendingSubscription ca srv s
pure $ Right ()
pure $ L.toList rs'
Nothing -> pure []
let subs'' :: (NonEmpty (QueueId, C.APrivateSignKey)) = L.map (first snd) subs'
rs <- liftIO $ smpSubscribeQueues party ca smp srv subs''
let rs' :: (NonEmpty ((SMPSub, C.APrivateSignKey), Either SMPClientError ())) =
L.zipWith (first . const) subs' rs
rs'' :: [Either (SMPSub, SMPClientError) (SMPSub, C.APrivateSignKey)] =
map (\(sub, r) -> bimap (fst sub,) (const sub) r) $ L.toList rs'
(errs, oks) = partitionEithers rs''
(tempErrs, finalErrs) = partition (temporaryClientError . snd) errs
mapM_ (atomically . addSubscription ca srv) oks
mapM_ (liftIO . notify . CAResubscribed srv) $ L.nonEmpty $ map fst oks
mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs
mapM_ (liftIO . notify . CASubError srv) $ L.nonEmpty finalErrs
mapM_ (throwE . snd) $ listToMaybe tempErrs
Nothing -> pure ()

notify :: SMPClientAgentEvent -> IO ()
notify evt = atomically $ writeTBQueue (agentQ ca) evt
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Notifications/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ data NtfSubStatus
NSAuth
| -- | SMP error other than AUTH
NSErr ByteString
deriving (Eq, Show)
deriving (Eq, Ord, Show)

ntfShouldSubscribe :: NtfSubStatus -> Bool
ntfShouldSubscribe = \case
Expand Down
Loading