Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smp server: do not open/read journal message queues that are known to be empty #1406

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.2.0.3
version: 6.2.0.400
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 1048 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down Expand Up @@ -1821,8 +1821,8 @@
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
where
-- if the first message in queue head is "quota", remove it.
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq ->
tryPeekMsg_ mq >>= \case
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \mq ->
tryPeekMsg_ q mq >>= \case
Just MessageQuota {} -> tryDeleteMsg_ q mq False
_ -> pure ()
msgErr :: Show e => String -> e -> String
Expand Down
46 changes: 32 additions & 14 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}

module Simplex.Messaging.Server.MsgStore.Journal
Expand All @@ -20,7 +19,6 @@ module Simplex.Messaging.Server.MsgStore.Journal
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
JournalStoreConfig (..),
getQueueMessages,
closeMsgQueue,
closeMsgQueueHandles,
-- below are exported for tests
Expand Down Expand Up @@ -50,7 +48,7 @@ import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import Data.Maybe (catMaybes, fromMaybe)
import Data.Maybe (catMaybes, fromMaybe, isNothing)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
Expand Down Expand Up @@ -105,7 +103,9 @@ data JournalQueue = JournalQueue
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe JournalMsgQueue),
-- system time in seconds since epoch
activeAt :: TVar Int64
activeAt :: TVar Int64,
-- True - empty, False - non-empty or unknown
isEmpty :: TVar Bool
}

data JMQueue = JMQueue
Expand Down Expand Up @@ -224,10 +224,11 @@ instance STMQueueStore JournalMsgStore where
storeLog' = storeLog
mkQueue st qr = do
lock <- getMapLock (queueLocks st) $ recipientId qr
q <- newTVar $! Just qr
q <- newTVar $ Just qr
mq <- newTVar Nothing
activeAt <- newTVar 0
pure $ JournalQueue lock q mq activeAt
isEmpty <- newTVar False
pure $ JournalQueue lock q mq activeAt isEmpty
msgQueue_' = msgQueue_

instance MsgStoreClass JournalMsgStore where
Expand Down Expand Up @@ -322,7 +323,7 @@ instance MsgStoreClass JournalMsgStore where
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
queue = JMQueue {queueDirectory = dir, statePath}
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
atomically $ writeTVar msgQueue_ $! Just q
atomically $ writeTVar msgQueue_ $ Just q
pure q
where
createQ :: JMQueue -> IO JournalMsgQueue
Expand All @@ -332,14 +333,27 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

getNonEmptyMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe JournalMsgQueue)
getNonEmptyMsgQueue ms rId q@JournalQueue {isEmpty} =
ifM
(StoreIO $ readTVarIO isEmpty)
(pure Nothing)
(Just <$> getMsgQueue ms rId q)

-- only runs action if queue is not empty
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
E.bracket
(unStoreIO $ getNonEmptyMsgQueue ms rId q)
(mapM_ $ \_ -> closeMsgQueue q)
(maybe (pure (Nothing, 0)) (unStoreIO . run))
where
run mq = do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
Just mq -> do
ts <- readTVarIO $ activeAt q
r <- if now - ts >= idleInterval config
Expand Down Expand Up @@ -378,6 +392,7 @@ instance MsgStoreClass JournalMsgStore where
let empty = size == 0
if canWrite || empty
then do
atomically $ writeTVar (isEmpty q') False
let canWrt' = quota > size
if canWrt'
then writeToJournal q st canWrt' msg $> Just (msg, empty)
Expand Down Expand Up @@ -426,16 +441,19 @@ instance MsgStoreClass JournalMsgStore where
getQueueSize_ :: JournalMsgQueue -> StoreIO Int
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state

tryPeekMsg_ :: JournalMsgQueue -> StoreIO (Maybe Message)
tryPeekMsg_ q@JournalMsgQueue {tipMsg, handles} =
StoreIO $ readTVarIO handles $>>= chooseReadJournal q True $>>= peekMsg
tryPeekMsg_ :: JournalQueue -> JournalMsgQueue -> StoreIO (Maybe Message)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
StoreIO $ (readTVarIO handles $>>= chooseReadJournal mq True $>>= peekMsg) >>= setEmpty
where
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
where
readMsg = do
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
setEmpty msg = do
atomically $ writeTVar (isEmpty q) (isNothing msg)
pure msg

tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
Expand Down
14 changes: 8 additions & 6 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
Expand All @@ -8,7 +7,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}

module Simplex.Messaging.Server.MsgStore.STM
Expand Down Expand Up @@ -63,7 +61,7 @@ instance STMQueueStore STMMsgStore where
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr = STMQueue <$> (newTVar $! Just qr) <*> newTVar Nothing
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
msgQueue_' = msgQueue_

instance MsgStoreClass STMMsgStore where
Expand Down Expand Up @@ -106,9 +104,13 @@ instance MsgStoreClass STMMsgStore where
canWrite <- newTVar True
size <- newTVar 0
let q = STMMsgQueue {msgQueue, canWrite, size}
writeTVar msgQueue_ $! Just q
writeTVar msgQueue_ (Just q)
pure q

getNonEmptyMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe STMMsgQueue)
getNonEmptyMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_
{-# INLINE getNonEmptyMsgQueue #-}

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Expand Down Expand Up @@ -159,8 +161,8 @@ instance MsgStoreClass STMMsgStore where
getQueueSize_ :: STMMsgQueue -> STM Int
getQueueSize_ STMMsgQueue {size} = readTVar size

tryPeekMsg_ :: STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ = tryPeekTQueue . msgQueue
tryPeekMsg_ :: STMQueue -> STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ _ = tryPeekTQueue . msgQueue
{-# INLINE tryPeekMsg_ #-}

tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
Expand Down
27 changes: 14 additions & 13 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getNonEmptyMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
Expand All @@ -53,7 +54,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a

Expand All @@ -73,21 +74,20 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
pure $! acc <> r

getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueMessages_ drainMsgs
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure []) $ getQueueMessages_ drainMsgs
{-# INLINE getQueueMessages #-}

getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueSize_
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure 0) getQueueSize_
{-# INLINE getQueueSize #-}

tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ tryPeekMsg_
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ maybe (pure Nothing) (tryPeekMsg_ q)
{-# INLINE tryPeekMsg #-}

tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st rId q msgId' =
withMsgQueue st rId q "tryDelMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
tryDelMsg st rId q msgId' = withMsgQueue st rId q "tryDelMsg" $ maybe (pure Nothing) $ \mq ->
tryPeekMsg_ q mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' ->
tryDeleteMsg_ q mq True >> pure msg_
Expand All @@ -96,15 +96,16 @@ tryDelMsg st rId q msgId' =
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st rId q msgId' =
withMsgQueue st rId q "tryDelPeekMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
withMsgQueue st rId q "tryDelPeekMsg" $ maybe (pure (Nothing, Nothing)) $ \mq ->
tryPeekMsg_ q mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ mq)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)

withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (MsgQueue s -> StoreMonad s a) -> ExceptT ErrorType IO a
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
-- The action is called with Nothing when it is known that the queue is empty
withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s) -> StoreMonad s a) -> ExceptT ErrorType IO a
withMsgQueue st rId q op a = isolateQueue rId q op $ getNonEmptyMsgQueue st rId q >>= a
{-# INLINE withMsgQueue #-}

deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
Expand All @@ -126,7 +127,7 @@ deleteExpireMsgs_ old q mq = do
pure n
where
loop dc =
tryPeekMsg_ mq >>= \case
tryPeekMsg_ q mq >>= \case
Just Message {msgTs}
| systemSeconds msgTs < old ->
tryDeleteMsg_ q mq False >> loop (dc + 1)
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/QueueStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier

getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))

Check warning on line 66 in src/Simplex/Messaging/Server/QueueStore/STM.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: DirectParty p

Check warning on line 66 in src/Simplex/Messaging/Server/QueueStore/STM.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: DirectParty p
getQueue st party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId $ queues' st
Expand All @@ -84,7 +84,7 @@
secure q@QueueRec {recipientId = rId} = case senderKey q of
Just k -> pure $ if sKey == k then Right rId else Left AUTH
Nothing -> do
writeTVar qr $! Just q {senderKey = Just sKey}
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right rId

addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
Expand All @@ -96,7 +96,7 @@
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $! Just q'
writeTVar qr $ Just q'
TM.insert nId rId $ notifiers' st
pure $ Right (rId, nId_)

Expand Down
Loading