Skip to content

Commit

Permalink
smp server: remove queue from map when closing, test (#1392)
Browse files Browse the repository at this point in the history
* smp server: remove queue from map when closing, test

* remove print

* refactor
  • Loading branch information
epoberezkin authored Oct 26, 2024
1 parent 9161507 commit 5940514
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
8 changes: 8 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ packages: .
-- packages: . ../http2
-- packages: . ../network-transport

-- package *
-- coverage: True
-- library-coverage: True

-- package attoparsec
-- coverage: False
-- library-coverage: False

index-state: 2023-12-12T00:00:00Z

package cryptostore
Expand Down
1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ tests:
- -rtsopts
- -with-rtsopts=-A64M
- -with-rtsopts=-N1
# - -fhpc

ghc-options:
# - -haddock
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ import Simplex.Messaging.Server.Control
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueue)
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueueHandles)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
Expand Down Expand Up @@ -1788,7 +1788,7 @@ processServerMessages = do
expired'' <- deleteExpiredMsgs q False old
stored'' <- liftIO $ getQueueSize q
liftIO $ logQueueState q
liftIO $ closeMsgQueue q
liftIO $ closeMsgQueueHandles q
pure (stored'', expired'')
processValidateQueue q =
getQueueSize q >>= \storedMsgsCount -> pure mempty {storedMsgsCount, storedQueues = 1}
Expand Down
28 changes: 16 additions & 12 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ module Simplex.Messaging.Server.MsgStore.Journal
JournalStoreConfig (..),
getQueueMessages,
closeMsgQueue,
closeMsgQueueHandles,
-- below are exported for tests
MsgQueueState (..),
JournalState (..),
SJournalType (..),
msgQueueDirectory,
msgQueueStatePath,
readWriteQueueState,
newMsgQueueState,
newJournalId,
Expand All @@ -47,6 +49,7 @@ import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
Expand Down Expand Up @@ -207,7 +210,7 @@ instance MsgStoreClass JournalMsgStore where
msgQueues <- TM.emptyIO
pure JournalMsgStore {config, random, queueLocks, msgQueues}

closeMsgStore st = readTVarIO (msgQueues st) >>= mapM_ closeMsgQueue
closeMsgStore st = atomically (swapTVar (msgQueues st) M.empty) >>= mapM_ closeMsgQueueHandles

activeMsgQueues = msgQueues
{-# INLINE activeMsgQueues #-}
Expand Down Expand Up @@ -236,7 +239,7 @@ instance MsgStoreClass JournalMsgStore where
Left e -> do
putStrLn ("Error: message queue directory " <> dir <> " is invalid: " <> e)
exitFailure
closeMsgQueue q
closeMsgQueueHandles q
pure (i + 1, r <> r')
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
Expand Down Expand Up @@ -283,15 +286,16 @@ instance MsgStoreClass JournalMsgStore where

delMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
delMsgQueue ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
void $ deleteMsgQueue_ ms rId
closeMsgQueue ms rId
removeQueueDirectory ms rId

delMsgQueueSize :: JournalMsgStore -> RecipientId -> IO Int
delMsgQueueSize ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
state_ <- deleteMsgQueue_ ms rId
sz <- maybe (pure $ -1) (fmap size . readTVarIO) state_
st_ <-
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM (\q -> closeMsgQueueHandles q >> readTVarIO (state q))
removeQueueDirectory ms rId
pure sz
pure $ maybe (-1) size st_

getQueueMessages :: Bool -> JournalMsgQueue -> IO [Message]
getQueueMessages drainMsgs q = run []
Expand Down Expand Up @@ -587,13 +591,13 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
&& msgPos ws == msgCount ws
&& bytePos ws == byteCount ws

deleteMsgQueue_ :: JournalMsgStore -> RecipientId -> IO (Maybe (TVar MsgQueueState))
deleteMsgQueue_ st rId =
atomically (TM.lookupDelete rId (msgQueues st))
>>= mapM (\q -> closeMsgQueue q $> state q)
closeMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
closeMsgQueue ms rId =
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM_ closeMsgQueueHandles

closeMsgQueue :: JournalMsgQueue -> IO ()
closeMsgQueue q = readTVarIO (handles q) >>= mapM_ closeHandles
closeMsgQueueHandles :: JournalMsgQueue -> IO ()
closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
where
closeHandles (MsgQueueHandles sh rh wh_) = do
hClose sh
Expand Down
32 changes: 30 additions & 2 deletions tests/CoreTests/MsgStoreTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module CoreTests.MsgStoreTests where

import AgentTests.FunctionalAPITests (runRight_)
import AgentTests.FunctionalAPITests (runRight, runRight_)
import Control.Concurrent.STM
import Control.Exception (bracket)
import Control.Monad
import Control.Monad.IO.Class
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Base64.URL as B64
import Data.Time.Clock.System (getSystemTime)
import Simplex.Messaging.Crypto (pattern MaxLenBS)
import qualified Simplex.Messaging.Crypto as C
Expand All @@ -41,6 +43,7 @@ msgStoreTests = do
it "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
where
someMsgStoreTests :: MsgStoreClass s => SpecWith s
someMsgStoreTests = do
Expand Down Expand Up @@ -189,7 +192,7 @@ testQueueState ms = do
g <- C.newRandom
rId <- EntityId <$> atomically (C.randomBytes 24 g)
let dir = msgQueueDirectory ms rId
statePath = dir </> (queueLogFileName <> logFileExt)
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
createDirectoryIfMissing True dir
state <- newMsgQueueState <$> newJournalId (random ms)
withFile statePath WriteMode (`appendState` state)
Expand Down Expand Up @@ -248,3 +251,28 @@ testQueueState ms = do
forM_ names $ \name ->
let f = dir </> name
in unless (f == keep) $ removeFile f

testMessageState :: JournalMsgStore -> IO ()
testMessageState ms = do
g <- C.newRandom
rId <- EntityId <$> atomically (C.randomBytes 24 g)
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
write q s = writeMsg ms q True =<< mkMessage s

mId1 <- runRight $ do
q <- getMsgQueue ms rId
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {}, False) <- write q "message 2"
liftIO $ closeMsgQueue ms rId
pure mId1

ls <- B.lines <$> B.readFile statePath
B.writeFile statePath $ B.unlines $ take (length ls - 1) ls

runRight_ $ do
q <- getMsgQueue ms rId
Just (Message {msgId = mId3}, False) <- write q "message 3"
(Msg "message 1", Msg "message 3") <- tryDelPeekMsg q mId1
(Msg "message 3", Nothing) <- tryDelPeekMsg q mId3
liftIO $ closeMsgQueueHandles q

0 comments on commit 5940514

Please sign in to comment.