Skip to content

Commit

Permalink
close empty queues on first subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Nov 22, 2024
1 parent 679b2c8 commit 1174967
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 39 deletions.
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.2.0.400
version: 6.2.0.401
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
6 changes: 3 additions & 3 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1821,9 +1821,9 @@ importMessages tty ms f old_ = do
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
where
-- if the first message in queue head is "quota", remove it.
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \mq ->
tryPeekMsg_ q mq >>= \case
Just MessageQuota {} -> tryDeleteMsg_ q mq False
mergeQuotaMsgs = withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \(mq, msg) ->
case msg of
MessageQuota {} -> tryDeleteMsg_ q mq False
_ -> pure ()
msgErr :: Show e => String -> e -> String
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
Expand Down
39 changes: 26 additions & 13 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}

module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (queues, senders, notifiers, random),
Expand Down Expand Up @@ -104,8 +105,8 @@ data JournalQueue = JournalQueue
msgQueue_ :: TVar (Maybe JournalMsgQueue),
-- system time in seconds since epoch
activeAt :: TVar Int64,
-- True - empty, False - non-empty or unknown
isEmpty :: TVar Bool
-- Just True - empty, Just False - non-empty, Nothing - unknown
isEmpty :: TVar (Maybe Bool)
}

data JMQueue = JMQueue
Expand Down Expand Up @@ -227,7 +228,7 @@ instance STMQueueStore JournalMsgStore where
q <- newTVar $ Just qr
mq <- newTVar Nothing
activeAt <- newTVar 0
isEmpty <- newTVar False
isEmpty <- newTVar Nothing
pure $ JournalQueue lock q mq activeAt isEmpty
msgQueue_' = msgQueue_

Expand Down Expand Up @@ -333,24 +334,36 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing

getNonEmptyMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe JournalMsgQueue)
getNonEmptyMsgQueue ms rId q@JournalQueue {isEmpty} =
ifM
(StoreIO $ readTVarIO isEmpty)
(pure Nothing)
(Just <$> getMsgQueue ms rId q)
getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
getPeekMsgQueue ms rId q@JournalQueue {isEmpty} =
StoreIO (readTVarIO isEmpty) >>= \case
Just True -> pure Nothing
Just False -> peek
Nothing -> do
-- We only close the queue if we just learnt it's empty.
-- This is needed to reduce file descriptors and memory usage
-- after the server just started and many clients subscribe.
-- In case the queue became non-empty on write and then again empty on read
-- we won't be closing it, to avoid frequent open/close on active queues.
r <- peek
when (isNothing r) $ StoreIO $ closeMsgQueue q
pure r
where
peek = do
mq <- getMsgQueue ms rId q
(mq,) <$$> tryPeekMsg_ q mq

-- only runs action if queue is not empty
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
E.bracket
(unStoreIO $ getNonEmptyMsgQueue ms rId q)
(unStoreIO $ getPeekMsgQueue ms rId q)
(mapM_ $ \_ -> closeMsgQueue q)
(maybe (pure (Nothing, 0)) (unStoreIO . run))
where
run mq = do
run (mq, _) = do
r <- action mq
sz <- getQueueSize_ mq
pure (Just r, sz)
Expand Down Expand Up @@ -392,7 +405,7 @@ instance MsgStoreClass JournalMsgStore where
let empty = size == 0
if canWrite || empty
then do
atomically $ writeTVar (isEmpty q') False
atomically $ writeTVar (isEmpty q') (Just False)
let canWrt' = quota > size
if canWrt'
then writeToJournal q st canWrt' msg $> Just (msg, empty)
Expand Down Expand Up @@ -452,7 +465,7 @@ instance MsgStoreClass JournalMsgStore where
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
setEmpty msg = do
atomically $ writeTVar (isEmpty q) (isNothing msg)
atomically $ writeTVar (isEmpty q) (Just $ isNothing msg)
pure msg

tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}

module Simplex.Messaging.Server.MsgStore.STM
( STMMsgStore (..),
Expand All @@ -27,7 +28,7 @@ import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util ((<$$>))
import Simplex.Messaging.Util ((<$$>), ($>>=))
import System.IO (IOMode (..))

data STMMsgStore = STMMsgStore
Expand Down Expand Up @@ -107,9 +108,8 @@ instance MsgStoreClass STMMsgStore where
writeTVar msgQueue_ (Just q)
pure q

getNonEmptyMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe STMMsgQueue)
getNonEmptyMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_
{-# INLINE getNonEmptyMsgQueue #-}
getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq

-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
Expand Down
42 changes: 24 additions & 18 deletions src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}

{-# HLINT ignore "Redundant multi-way if" #-}

module Simplex.Messaging.Server.MsgStore.Types where

Expand All @@ -21,6 +25,7 @@ import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.StoreLog.Types
import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Util ((<$$>))
import System.IO (IOMode (..))

class MsgStoreClass s => STMQueueStore s where
Expand All @@ -44,8 +49,9 @@ class Monad (StoreMonad s) => MsgStoreClass s where
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getNonEmptyMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)

-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
Expand Down Expand Up @@ -74,39 +80,39 @@ withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty
pure $! acc <> r

getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure []) $ getQueueMessages_ drainMsgs
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
{-# INLINE getQueueMessages #-}

getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ maybe (pure 0) getQueueSize_
getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
{-# INLINE getQueueSize #-}

tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ maybe (pure Nothing) (tryPeekMsg_ q)
tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}

tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st rId q msgId' = withMsgQueue st rId q "tryDelMsg" $ maybe (pure Nothing) $ \mq ->
tryPeekMsg_ q mq >>= \case
msg_@(Just msg)
tryDelMsg st rId q msgId' =
withPeekMsgQueue st rId q "tryDelMsg" $
maybe (pure Nothing) $ \(mq, msg) ->
if
| messageId msg == msgId' ->
tryDeleteMsg_ q mq True >> pure msg_
_ -> pure Nothing
tryDeleteMsg_ q mq True $> Just msg

Check failure on line 100 in src/Simplex/Messaging/Server/MsgStore/Types.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

• Variable not in scope:

Check failure on line 100 in src/Simplex/Messaging/Server/MsgStore/Types.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Variable not in scope:

Check failure on line 100 in src/Simplex/Messaging/Server/MsgStore/Types.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Variable not in scope:
| otherwise -> pure Nothing

-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st rId q msgId' =
withMsgQueue st rId q "tryDelPeekMsg" $ maybe (pure (Nothing, Nothing)) $ \mq ->
tryPeekMsg_ q mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)
withPeekMsgQueue st rId q "tryDelPeekMsg" $
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
if
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, Just msg)

-- The action is called with Nothing when it is known that the queue is empty
withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s) -> StoreMonad s a) -> ExceptT ErrorType IO a
withMsgQueue st rId q op a = isolateQueue rId q op $ getNonEmptyMsgQueue st rId q >>= a
{-# INLINE withMsgQueue #-}
withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
{-# INLINE withPeekMsgQueue #-}

deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st rId q old =
Expand Down

0 comments on commit 1174967

Please sign in to comment.