Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smp server: remove queue from map when closing, test #1392

Merged
merged 3 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ packages: .
-- packages: . ../http2
-- packages: . ../network-transport

-- package *
-- coverage: True
-- library-coverage: True

-- package attoparsec
-- coverage: False
-- library-coverage: False

index-state: 2023-12-12T00:00:00Z

package cryptostore
Expand Down
1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ tests:
- -rtsopts
- -with-rtsopts=-A64M
- -with-rtsopts=-N1
# - -fhpc

ghc-options:
# - -haddock
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueue)
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueueHandles)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
Expand Down Expand Up @@ -1044,7 +1044,7 @@

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummySignKey :: C.SignatureAlgorithm a => C.SAlgorithm a -> C.PublicKey a

Check warning on line 1047 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a

Check warning on line 1047 in src/Simplex/Messaging/Server.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: C.SignatureAlgorithm a
dummySignKey = \case
C.SEd25519 -> dummyKeyEd25519
C.SEd448 -> dummyKeyEd448
Expand Down Expand Up @@ -1788,7 +1788,7 @@
expired'' <- deleteExpiredMsgs q False old
stored'' <- liftIO $ getQueueSize q
liftIO $ logQueueState q
liftIO $ closeMsgQueue q
liftIO $ closeMsgQueueHandles q
pure (stored'', expired'')
processValidateQueue q =
getQueueSize q >>= \storedMsgsCount -> pure mempty {storedMsgsCount, storedQueues = 1}
Expand Down
28 changes: 16 additions & 12 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ module Simplex.Messaging.Server.MsgStore.Journal
JournalStoreConfig (..),
getQueueMessages,
closeMsgQueue,
closeMsgQueueHandles,
-- below are exported for tests
MsgQueueState (..),
JournalState (..),
SJournalType (..),
msgQueueDirectory,
msgQueueStatePath,
readWriteQueueState,
newMsgQueueState,
newJournalId,
Expand All @@ -47,6 +49,7 @@ import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
Expand Down Expand Up @@ -207,7 +210,7 @@ instance MsgStoreClass JournalMsgStore where
msgQueues <- TM.emptyIO
pure JournalMsgStore {config, random, queueLocks, msgQueues}

closeMsgStore st = readTVarIO (msgQueues st) >>= mapM_ closeMsgQueue
closeMsgStore st = atomically (swapTVar (msgQueues st) M.empty) >>= mapM_ closeMsgQueueHandles

activeMsgQueues = msgQueues
{-# INLINE activeMsgQueues #-}
Expand Down Expand Up @@ -236,7 +239,7 @@ instance MsgStoreClass JournalMsgStore where
Left e -> do
putStrLn ("Error: message queue directory " <> dir <> " is invalid: " <> e)
exitFailure
closeMsgQueue q
closeMsgQueueHandles q
pure (i + 1, r <> r')
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
Expand Down Expand Up @@ -283,15 +286,16 @@ instance MsgStoreClass JournalMsgStore where

delMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
delMsgQueue ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
void $ deleteMsgQueue_ ms rId
closeMsgQueue ms rId
removeQueueDirectory ms rId

delMsgQueueSize :: JournalMsgStore -> RecipientId -> IO Int
delMsgQueueSize ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
state_ <- deleteMsgQueue_ ms rId
sz <- maybe (pure $ -1) (fmap size . readTVarIO) state_
st_ <-
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM (\q -> closeMsgQueueHandles q >> readTVarIO (state q))
removeQueueDirectory ms rId
pure sz
pure $ maybe (-1) size st_

getQueueMessages :: Bool -> JournalMsgQueue -> IO [Message]
getQueueMessages drainMsgs q = run []
Expand Down Expand Up @@ -587,13 +591,13 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
&& msgPos ws == msgCount ws
&& bytePos ws == byteCount ws

deleteMsgQueue_ :: JournalMsgStore -> RecipientId -> IO (Maybe (TVar MsgQueueState))
deleteMsgQueue_ st rId =
atomically (TM.lookupDelete rId (msgQueues st))
>>= mapM (\q -> closeMsgQueue q $> state q)
closeMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
closeMsgQueue ms rId =
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM_ closeMsgQueueHandles

closeMsgQueue :: JournalMsgQueue -> IO ()
closeMsgQueue q = readTVarIO (handles q) >>= mapM_ closeHandles
closeMsgQueueHandles :: JournalMsgQueue -> IO ()
closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
where
closeHandles (MsgQueueHandles sh rh wh_) = do
hClose sh
Expand Down
32 changes: 30 additions & 2 deletions tests/CoreTests/MsgStoreTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module CoreTests.MsgStoreTests where

import AgentTests.FunctionalAPITests (runRight_)
import AgentTests.FunctionalAPITests (runRight, runRight_)
import Control.Concurrent.STM
import Control.Exception (bracket)
import Control.Monad
import Control.Monad.IO.Class
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Base64.URL as B64
import Data.Time.Clock.System (getSystemTime)
import Simplex.Messaging.Crypto (pattern MaxLenBS)
import qualified Simplex.Messaging.Crypto as C
Expand All @@ -41,6 +43,7 @@ msgStoreTests = do
it "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
where
someMsgStoreTests :: MsgStoreClass s => SpecWith s
someMsgStoreTests = do
Expand Down Expand Up @@ -189,7 +192,7 @@ testQueueState ms = do
g <- C.newRandom
rId <- EntityId <$> atomically (C.randomBytes 24 g)
let dir = msgQueueDirectory ms rId
statePath = dir </> (queueLogFileName <> logFileExt)
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
createDirectoryIfMissing True dir
state <- newMsgQueueState <$> newJournalId (random ms)
withFile statePath WriteMode (`appendState` state)
Expand Down Expand Up @@ -248,3 +251,28 @@ testQueueState ms = do
forM_ names $ \name ->
let f = dir </> name
in unless (f == keep) $ removeFile f

testMessageState :: JournalMsgStore -> IO ()
testMessageState ms = do
g <- C.newRandom
rId <- EntityId <$> atomically (C.randomBytes 24 g)
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
write q s = writeMsg ms q True =<< mkMessage s

mId1 <- runRight $ do
q <- getMsgQueue ms rId
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {}, False) <- write q "message 2"
liftIO $ closeMsgQueue ms rId
pure mId1

ls <- B.lines <$> B.readFile statePath
B.writeFile statePath $ B.unlines $ take (length ls - 1) ls

runRight_ $ do
q <- getMsgQueue ms rId
Just (Message {msgId = mId3}, False) <- write q "message 3"
(Msg "message 1", Msg "message 3") <- tryDelPeekMsg q mId1
(Msg "message 3", Nothing) <- tryDelPeekMsg q mId3
liftIO $ closeMsgQueueHandles q
Loading