Skip to content

Commit

Permalink
xftp: report receive file error with redirected file ID, when redirec…
Browse files Browse the repository at this point in the history
…t is present (#1304)

* xftp: report receive file error with redirected file ID, when redirect is present

* fix test
  • Loading branch information
epoberezkin authored Sep 9, 2024
1 parent 344a295 commit dab1980
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
21 changes: 11 additions & 10 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import Data.List (foldl', partition, sortOn)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe)
import Data.Maybe (fromMaybe, mapMaybe)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
Expand Down Expand Up @@ -190,8 +190,9 @@ runXFTPRcvWorker c srv Worker {doWork} = do
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} =
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas")
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _, redirectEntityId_) ->
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) (INTERNAL "chunk has no replicas")
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays, redirectEntityId_) -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
liftIO $ waitWhileSuspended c
Expand All @@ -202,7 +203,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
where
retryLoop loop e replicaDelay = do
flip catchAgentError (\_ -> pure ()) $ do
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
when (serverHostError e) $ notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFWARN e)
liftIO $ closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
liftIO $ assertAgentForeground c
Expand All @@ -211,7 +212,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
atomically . incXFTPServerStat c userId srv $ case e of
XFTP _ XFTP.AUTH -> downloadAuthErrs
_ -> downloadErrs
rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) e
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) e
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED
Expand Down Expand Up @@ -262,11 +263,11 @@ retryOnError name loop done e = do
then loop
else done

rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ tmpPath err = do
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateRcvFileError db rcvFileId (show err)
notify c rcvFileEntityId $ RFERR err
notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFERR err)

runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM ()
runXFTPRcvLocalWorker c Worker {doWork} = do
Expand All @@ -279,8 +280,8 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {rcvFilesTTL} =
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath, redirect} ->
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath
decryptFile :: RcvFile -> AM ()
decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do
let CryptoFile savePath cfArgs = saveFile
Expand Down
13 changes: 7 additions & 6 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO ()
deleteRcvFile' db rcvFileId =
DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId)

getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool)))
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool, Maybe RcvFileId)))
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do
getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd)
where
Expand All @@ -2549,7 +2549,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
LIMIT 1
|]
(host, port, keyHash, RFSReceiving, cutoffTs)
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool))
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool, Maybe RcvFileId))
getChunkData (rcvFileChunkReplicaId, _fileId) =
firstRow toChunk SEFileNotFound $
DB.query
Expand All @@ -2558,7 +2558,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
SELECT
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries,
f.approved_relays
f.approved_relays, f.redirect_entity_id
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
Expand All @@ -2567,8 +2567,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|]
(Only rcvFileChunkReplicaId)
where
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool)
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) =
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. (Bool, Maybe RcvFileId)) -> (RcvFileChunk, Bool, Maybe RcvFileId)
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (approvedRelays, redirectEntityId_)) =
( RcvFileChunk
{ rcvFileId,
rcvFileEntityId,
Expand All @@ -2581,7 +2581,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
chunkTmpPath,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
},
approvedRelays
approvedRelays,
redirectEntityId_
)

getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile))
Expand Down
2 changes: 1 addition & 1 deletion tests/AgentTests/SQLiteTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ testGetNextRcvChunkToDownload st = do
show e `shouldContain` "ConversionFailed"
DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]

Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400
Right (Just (RcvFileChunk {rcvFileEntityId}, _, Nothing)) <- getNextRcvChunkToDownload db xftpServer1 86400
rcvFileEntityId `shouldBe` fId2

testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation
Expand Down

0 comments on commit dab1980

Please sign in to comment.