Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xftp: fix repeated replica creation if it was in uploaded status #1079

Merged
merged 11 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 11 in src/Simplex/FileTransfer/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 11 in src/Simplex/FileTransfer/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.FileTransfer.Agent
( startXFTPWorkers,
Expand Down Expand Up @@ -389,7 +389,7 @@
let numRecipients' = min numRecipients maxRecipients
-- concurrently?
-- separate worker to create chunks? record retries and delay on snd_file_chunks?
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
forM_ (filter (\SndFileChunk {replicas} -> null replicas) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
Expand All @@ -413,9 +413,6 @@
let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes
chunkDigests <- liftIO $ mapM getChunkDigest chunkSpecs
pure (FileDigest digest, zip chunkSpecs $ coerce chunkDigests)
chunkCreated :: SndFileChunk -> Bool
chunkCreated SndFileChunk {replicas} =
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas
createChunk :: Int -> SndFileChunk -> AM ()
createChunk numRecipients' ch = do
atomically $ assertAgentForeground c
Expand Down
26 changes: 22 additions & 4 deletions tests/XFTPAgent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import Data.Int (Int64)
import Data.List (find, isSuffixOf)
import Data.Maybe (fromJust)
import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2, testDB3)
import Simplex.FileTransfer.Description (FileDescription (..), FileDescriptionURI (..), ValidFileDescription, fileDescriptionURI, mb, qrSizeLimit, pattern ValidFileDescription)
import Simplex.FileTransfer.Description (FileChunk (..), FileDescription (..), FileDescriptionURI (..), ValidFileDescription, fileDescriptionURI, mb, qrSizeLimit, pattern ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
Expand Down Expand Up @@ -105,7 +105,6 @@ testXFTPAgentSendReceive = withXFTPServer $ do
(sfId, _, rfd1, rfd2) <- testSend sndr filePath
liftIO $ xftpDeleteSndFileInternal sndr sfId
pure (rfd1, rfd2)

-- receive file, delete rcv file
testReceiveDelete 2 rfd1 filePath
testReceiveDelete 3 rfd2 filePath
Expand Down Expand Up @@ -157,13 +156,19 @@ testXFTPAgentSendReceiveRedirect = withXFTPServer $ do
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r

testNoRedundancy vfdDirect

redirectFileId <- runRight $ xftpSendDescription sndr 1 vfdDirect 1
logInfo $ "File sent, sending redirect: " <> tshow redirectFileId
sfGet sndr `shouldReturn` ("", redirectFileId, SFPROG 65536 65536)
vfdRedirect@(ValidFileDescription fdRedirect) <-
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r

testNoRedundancy vfdRedirect

case fdRedirect of
FileDescription {redirect = Just _} -> pure ()
_ -> error "missing RedirectFileInfo"
Expand Down Expand Up @@ -208,6 +213,9 @@ testXFTPAgentSendReceiveNoRedirect = withXFTPServer $ do
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r

testNoRedundancy vfdDirect

let uri = strEncode $ fileDescriptionURI vfdDirect
B.length uri `shouldSatisfy` (< qrSizeLimit)
case strDecode uri of
Expand Down Expand Up @@ -255,9 +263,15 @@ testSendCF sndr file = do
sfId <- xftpSendFile sndr 1 file 2
sfProgress sndr $ mb 18
("", sfId', SFDONE sndDescr [rfd1, rfd2]) <- sfGet sndr
liftIO $ testNoRedundancy rfd1
liftIO $ testNoRedundancy rfd2
liftIO $ sfId' `shouldBe` sfId
pure (sfId, sndDescr, rfd1, rfd2)

testNoRedundancy :: HasCallStack => ValidFileDescription 'FRecipient -> IO ()
testNoRedundancy (ValidFileDescription FileDescription {chunks}) =
all (\FileChunk {replicas} -> length replicas == 1) chunks `shouldBe` True

testReceive :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> FilePath -> ExceptT AgentErrorType IO RcvFileId
testReceive rcp rfd = testReceiveCF rcp rfd Nothing

Expand Down Expand Up @@ -400,7 +414,9 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
sndr' <- getSMPAgentClient' 3 agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
sfProgress sndr' $ mb 18
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr'
("", sfId', SFDONE _sndDescr [rfd1, rfd2]) <- sfGet sndr'
liftIO $ testNoRedundancy rfd1
liftIO $ testNoRedundancy rfd2
liftIO $ sfId' `shouldBe` sfId

-- prefix path should be removed after sending file
Expand Down Expand Up @@ -618,6 +634,8 @@ testXFTPAgentRequestAdditionalRecipientIDs = withXFTPServer $ do
length rfds `shouldBe` 500
pure rfds

forM_ rfds testNoRedundancy

-- receive file using different descriptions
-- ! revise number of recipients and indexes if xftpMaxRecipientsPerRequest is changed
rcp <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2
Expand Down
Loading