Skip to content

Commit

Permalink
flip results
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Nov 15, 2024
1 parent 134387a commit f5580e0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
printMessageStats "STORE: messages" msgStats
where
expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do
(stored, expired) <- idleDeleteExpiredMsgs now ms rId q old
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired, storedQueues = 1}
(expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}

expireNtfsThread :: ServerConfig -> M ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
Expand Down
11 changes: 6 additions & 5 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,21 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Int, Maybe a)
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
sz <- getQueueSize_ mq
r <- action mq
pure (sz, Just r)
sz <- getQueueSize_ mq
pure (Just r, sz)
Just mq -> do
ts <- readTVarIO $ activeAt q
sz <- unStoreIO $ getQueueSize_ mq
(sz,) <$> if now - ts >= idleInterval config
r <- if now - ts >= idleInterval config
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
else pure Nothing
sz <- unStoreIO $ getQueueSize_ mq
pure (r, sz)

deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ instance MsgStoreClass STMMsgStore where
pure q

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Int, Maybe a)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Just q -> do
sz <- getQueueSize_ q
r <- action q
pure (sz, Just r)
Nothing -> pure (0, Nothing)
sz <- getQueueSize_ q
pure (Just r, sz)
Nothing -> pure (Nothing, 0)

deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
Expand Down
10 changes: 4 additions & 6 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ module Simplex.Messaging.Server.MsgStore.Types where
import Control.Concurrent.STM
import Control.Monad (foldM)
import Control.Monad.Trans.Except
import Data.Bifunctor (second)
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
Expand Down Expand Up @@ -48,7 +46,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Int, Maybe a)
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
Expand Down Expand Up @@ -115,11 +113,11 @@ deleteExpiredMsgs st rId q old =
getMsgQueue st rId q >>= deleteExpireMsgs_ old q

-- closed and idle queues will be closed after expiration
-- returns (queue size, expired count)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Int, Int)
-- returns (expired count, queue size after expiration)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
idleDeleteExpiredMsgs now st rId q old =
isolateQueue rId q "idleDeleteExpiredMsgs" $
second (fromMaybe 0) <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)

deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
Expand Down

0 comments on commit f5580e0

Please sign in to comment.