Skip to content

Commit

Permalink
smp server: update message counts during message expiration, increase…
Browse files Browse the repository at this point in the history
… idle interval (#1404)

* smp server: update message counts during message expiration, increase idle interval

* version

* fix

* flip results

* version
  • Loading branch information
epoberezkin authored Nov 17, 2024
1 parent 17a0be1 commit 21fbbf9
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 39 deletions.
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.2.0.2
version: 6.2.0.3
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
41 changes: 23 additions & 18 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M ()
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do
s <- asks server
pa <- asks proxyAgent
msgStats <- processServerMessages
msgStats_ <- processServerMessages
ntfStats <- restoreServerNtfs
liftIO $ printMessageStats "messages" msgStats
liftIO $ mapM_ (printMessageStats "messages") msgStats_
liftIO $ printMessageStats "notifications" ntfStats
restoreServerStats msgStats ntfStats
restoreServerStats msgStats_ ntfStats
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
Expand Down Expand Up @@ -385,12 +385,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
threadDelay' interval
old <- expireBeforeEpoch expCfg
now <- systemSeconds <$> getSystemTime
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old
atomicModifyIORef'_ (msgExpired stats) (+ deleted)
logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages"
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
withActiveMsgQueues ms $ expireQueueMsgs now ms old
atomicWriteIORef (msgCount stats) stored
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
where
expireQueueMsgs now ms old rId q =
either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old)
expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do
(expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}

expireNtfsThread :: ServerConfig -> M ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
Expand Down Expand Up @@ -1731,26 +1734,26 @@ exportMessages tty ms f drainMsgs = do
exitFailure
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')

processServerMessages :: M MessageStats
processServerMessages :: M (Maybe MessageStats)
processServerMessages = do
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
expire <- asks $ expireMessagesOnStart . config
asks msgStore >>= liftIO . processMessages old_ expire
where
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO MessageStats
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats)
processMessages old_ expire = \case
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats)
Nothing -> pure newMessageStats
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing)
Nothing -> pure Nothing
AMS SMSJournal ms
| expire -> case old_ of
| expire -> Just <$> case old_ of
Just old -> do
logInfo "expiring journal store messages..."
withAllMsgQueues False ms $ processExpireQueue old
Nothing -> do
logInfo "validating journal store messages..."
withAllMsgQueues False ms $ processValidateQueue
| otherwise -> logWarn "skipping message expiration" $> newMessageStats
| otherwise -> logWarn "skipping message expiration" $> Nothing
where
processExpireQueue old rId q =
runExceptT expireQueue >>= \case
Expand Down Expand Up @@ -1887,8 +1890,8 @@ saveServerStats =
B.writeFile f $ strEncode stats
logInfo "server stats saved"

restoreServerStats :: MessageStats -> MessageStats -> M ()
restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
restoreServerStats :: Maybe MessageStats -> MessageStats -> M ()
restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
where
restoreStats f = whenM (doesFileExist f) $ do
logInfo $ "restoring server stats from file " <> T.pack f
Expand All @@ -1897,9 +1900,11 @@ restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>=
s <- asks serverStats
AMS _ st <- asks msgStore
_qCount <- M.size <$> readTVarIO (activeMsgQueues st)
let _msgCount = storedMsgsCount msgStats
let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_
_ntfCount = storedMsgsCount ntfStats
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgsCount msgStats, _msgNtfExpired = _msgNtfExpired d + expiredMsgsCount ntfStats}
_msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_
_msgNtfExpired' = _msgNtfExpired d + expiredMsgsCount ntfStats
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired', _msgNtfExpired = _msgNtfExpired'}
renameFile f $ f <> ".bak"
logInfo "server stats restored"
compareCounts "Queue" statsQCount _qCount
Expand Down
12 changes: 8 additions & 4 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ data ServerConfig = ServerConfig
-- | time after which the messages can be removed from the queues and check interval, seconds
messageExpiration :: Maybe ExpirationConfig,
expireMessagesOnStart :: Bool,
-- | interval of inactivity after which journal queue is closed
idleQueueInterval :: Int64,
-- | notification expiration interval (seconds)
notificationExpiration :: ExpirationConfig,
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
Expand Down Expand Up @@ -121,9 +123,12 @@ defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration =
ExpirationConfig
{ ttl = defMsgExpirationDays * 86400, -- seconds
checkInterval = 21600 -- seconds, 6 hours
checkInterval = 14400 -- seconds, 4 hours
}

defaultIdleQueueInterval :: Int64
defaultIdleQueueInterval = 28800 -- seconds, 8 hours

defNtfExpirationHours :: Int64
defNtfExpirationHours = 24

Expand Down Expand Up @@ -283,15 +288,14 @@ newProhibitedSub = do
return Sub {subThread = ProhibitSub, delivered}

newEnv :: ServerConfig -> IO Env
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
serverActive <- newTVarIO True
server <- newServer
msgStore@(AMS _ store) <- case msgStoreType of
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let idleInterval = maybe maxBound checkInterval messageExpiration
cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval}
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
{ ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini
},
expireMessagesOnStart = fromMaybe True $ iniOnOff "STORE_LOG" "expire_messages_on_start" ini,
idleQueueInterval = defaultIdleQueueInterval,
notificationExpiration =
defaultNtfExpiration
{ ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini
Expand Down
15 changes: 8 additions & 7 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,21 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a)
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
Just <$>
E.bracket
(unStoreIO $ getMsgQueue ms rId q)
(\_ -> closeMsgQueue q)
(unStoreIO . action)
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
Just mq -> do
ts <- readTVarIO $ activeAt q
if now - ts >= idleInterval config
r <- if now - ts >= idleInterval config
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
else pure Nothing
sz <- unStoreIO $ getQueueSize_ mq
pure (r, sz)

deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
Expand Down
10 changes: 7 additions & 3 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ instance MsgStoreClass STMMsgStore where
pure q

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action
{-# INLINE withIdleMsgQueue #-}
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Just q -> do
r <- action q
sz <- getQueueSize_ q
pure (Just r, sz)
Nothing -> pure (Nothing, 0)

deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import Control.Monad (foldM)
import Control.Monad.Trans.Except
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
Expand Down Expand Up @@ -47,7 +46,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a)
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
Expand Down Expand Up @@ -114,10 +113,11 @@ deleteExpiredMsgs st rId q old =
getMsgQueue st rId q >>= deleteExpireMsgs_ old q

-- closed and idle queues will be closed after expiration
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
-- returns (expired count, queue size after expiration)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
idleDeleteExpiredMsgs now st rId q old =
isolateQueue rId q "idleDeleteExpiredMsgs" $
fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
isolateQueue rId q "idleDeleteExpiredMsgs" $
withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)

deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
Expand Down
1 change: 1 addition & 0 deletions tests/SMPClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ cfgMS msType =
controlPortAdminAuth = Nothing,
messageExpiration = Just defaultMessageExpiration,
expireMessagesOnStart = True,
idleQueueInterval = defaultIdleQueueInterval,
notificationExpiration = defaultNtfExpiration,
inactiveClientExpiration = Just defaultInactiveClientExpiration,
logStatsInterval = Nothing,
Expand Down
2 changes: 1 addition & 1 deletion tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ testMsgExpireOnInterval =
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c), msType) -> do
g <- C.newRandom
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}}
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ ->
testSMPClient @c $ \sh -> do
(sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub
Expand Down

0 comments on commit 21fbbf9

Please sign in to comment.