Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smp server: update message counts during message expiration, increase idle interval #1404

Merged
merged 5 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.2.0.2
version: 6.2.0.3
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
41 changes: 23 additions & 18 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do
s <- asks server
pa <- asks proxyAgent
msgStats <- processServerMessages
msgStats_ <- processServerMessages
ntfStats <- restoreServerNtfs
liftIO $ printMessageStats "messages" msgStats
liftIO $ mapM_ (printMessageStats "messages") msgStats_
liftIO $ printMessageStats "notifications" ntfStats
restoreServerStats msgStats ntfStats
restoreServerStats msgStats_ ntfStats
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
Expand Down Expand Up @@ -385,12 +385,15 @@
threadDelay' interval
old <- expireBeforeEpoch expCfg
now <- systemSeconds <$> getSystemTime
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old
atomicModifyIORef'_ (msgExpired stats) (+ deleted)
logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages"
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
withActiveMsgQueues ms $ expireQueueMsgs now ms old
atomicWriteIORef (msgCount stats) stored
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
where
expireQueueMsgs now ms old rId q =
either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old)
expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do
(expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}

expireNtfsThread :: ServerConfig -> M ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
Expand Down Expand Up @@ -1042,7 +1045,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down Expand Up @@ -1731,26 +1734,26 @@
exitFailure
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')

processServerMessages :: M MessageStats
processServerMessages :: M (Maybe MessageStats)
processServerMessages = do
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
expire <- asks $ expireMessagesOnStart . config
asks msgStore >>= liftIO . processMessages old_ expire
where
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO MessageStats
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats)
processMessages old_ expire = \case
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats)
Nothing -> pure newMessageStats
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing)
Nothing -> pure Nothing
AMS SMSJournal ms
| expire -> case old_ of
| expire -> Just <$> case old_ of
Just old -> do
logInfo "expiring journal store messages..."
withAllMsgQueues False ms $ processExpireQueue old
Nothing -> do
logInfo "validating journal store messages..."
withAllMsgQueues False ms $ processValidateQueue
| otherwise -> logWarn "skipping message expiration" $> newMessageStats
| otherwise -> logWarn "skipping message expiration" $> Nothing
where
processExpireQueue old rId q =
runExceptT expireQueue >>= \case
Expand Down Expand Up @@ -1887,8 +1890,8 @@
B.writeFile f $ strEncode stats
logInfo "server stats saved"

restoreServerStats :: MessageStats -> MessageStats -> M ()
restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
restoreServerStats :: Maybe MessageStats -> MessageStats -> M ()
restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
where
restoreStats f = whenM (doesFileExist f) $ do
logInfo $ "restoring server stats from file " <> T.pack f
Expand All @@ -1897,9 +1900,11 @@
s <- asks serverStats
AMS _ st <- asks msgStore
_qCount <- M.size <$> readTVarIO (activeMsgQueues st)
let _msgCount = storedMsgsCount msgStats
let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_
_ntfCount = storedMsgsCount ntfStats
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgsCount msgStats, _msgNtfExpired = _msgNtfExpired d + expiredMsgsCount ntfStats}
_msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_
_msgNtfExpired' = _msgNtfExpired d + expiredMsgsCount ntfStats
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired', _msgNtfExpired = _msgNtfExpired'}
renameFile f $ f <> ".bak"
logInfo "server stats restored"
compareCounts "Queue" statsQCount _qCount
Expand Down
12 changes: 8 additions & 4 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ data ServerConfig = ServerConfig
-- | time after which the messages can be removed from the queues and check interval, seconds
messageExpiration :: Maybe ExpirationConfig,
expireMessagesOnStart :: Bool,
-- | interval of inactivity after which journal queue is closed
idleQueueInterval :: Int64,
-- | notification expiration interval (seconds)
notificationExpiration :: ExpirationConfig,
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
Expand Down Expand Up @@ -121,9 +123,12 @@ defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration =
ExpirationConfig
{ ttl = defMsgExpirationDays * 86400, -- seconds
checkInterval = 21600 -- seconds, 6 hours
checkInterval = 14400 -- seconds, 4 hours
}

defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval = 28800 -- seconds, 8 hours

defNtfExpirationHours :: Int64
defNtfExpirationHours = 24

Expand Down Expand Up @@ -283,15 +288,14 @@ newProhibitedSub = do
return Sub {subThread = ProhibitSub, delivered}

newEnv :: ServerConfig -> IO Env
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
serverActive <- newTVarIO True
server <- newServer
msgStore@(AMS _ store) <- case msgStoreType of
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let idleInterval = maybe maxBound checkInterval messageExpiration
cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval}
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 12 in src/Simplex/Messaging/Server/Main.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.Messaging.Server.Main where

Expand Down Expand Up @@ -416,6 +416,7 @@
{ ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini
},
expireMessagesOnStart = fromMaybe True $ iniOnOff "STORE_LOG" "expire_messages_on_start" ini,
idleQueueInterval = defaultIdleQueueInterval,
notificationExpiration =
defaultNtfExpiration
{ ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini
Expand Down
15 changes: 8 additions & 7 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,21 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a)
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
Just <$>
E.bracket
(unStoreIO $ getMsgQueue ms rId q)
(\_ -> closeMsgQueue q)
(unStoreIO . action)
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
Just mq -> do
ts <- readTVarIO $ activeAt q
if now - ts >= idleInterval config
r <- if now - ts >= idleInterval config
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
else pure Nothing
sz <- unStoreIO $ getQueueSize_ mq
pure (r, sz)

deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
Expand Down
10 changes: 7 additions & 3 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ instance MsgStoreClass STMMsgStore where
pure q

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action
{-# INLINE withIdleMsgQueue #-}
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Just q -> do
r <- action q
sz <- getQueueSize_ q
pure (Just r, sz)
Nothing -> pure (Nothing, 0)

deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import Control.Monad (foldM)
import Control.Monad.Trans.Except
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
Expand Down Expand Up @@ -47,7 +46,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a)
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
Expand Down Expand Up @@ -114,10 +113,11 @@ deleteExpiredMsgs st rId q old =
getMsgQueue st rId q >>= deleteExpireMsgs_ old q

-- closed and idle queues will be closed after expiration
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
-- returns (expired count, queue size after expiration)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
idleDeleteExpiredMsgs now st rId q old =
isolateQueue rId q "idleDeleteExpiredMsgs" $
fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
isolateQueue rId q "idleDeleteExpiredMsgs" $
withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)

deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
Expand Down
1 change: 1 addition & 0 deletions tests/SMPClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ cfgMS msType =
controlPortAdminAuth = Nothing,
messageExpiration = Just defaultMessageExpiration,
expireMessagesOnStart = True,
idleQueueInterval = defaultIdleQueueInterval,
notificationExpiration = defaultNtfExpiration,
inactiveClientExpiration = Just defaultInactiveClientExpiration,
logStatsInterval = Nothing,
Expand Down
2 changes: 1 addition & 1 deletion tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ testMsgExpireOnInterval =
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c), msType) -> do
g <- C.newRandom
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}}
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ ->
testSMPClient @c $ \sh -> do
(sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub
Expand Down
Loading