Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xftp agent: update chunk delay #671

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,15 @@ runXFTPWorker c srv doWork = do
nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv)
case nextChunk of
Nothing -> noWorkToDo
Just fc@RcvFileChunk {nextDelay} -> do
Just fc@RcvFileChunk {rcvChunkId, delay} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d}) nextDelay
withRetryInterval ri' $ \loop ->
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
downloadFileChunk fc
`catchError` \e -> do
liftIO $ print e
`catchError` \_ -> do
withStore' c $ \db -> updateRcvFileChunkDelay db rcvChunkId delay'
-- TODO don't loop on permanent errors
-- TODO increase replica retries count
-- TODO update nextDelay (modify withRetryInterval to expose current delay)
loop
noWorkToDo = void . atomically $ tryTakeTMVar doWork
downloadFileChunk :: RcvFileChunk -> m ()
Expand Down Expand Up @@ -125,10 +124,9 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
Nothing -> noWorkToDo
Just fd -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
withRetryInterval ri $ \_ loop ->
decryptFile fd
`catchError` \e -> do
liftIO $ print e
`catchError` \_ -> do
-- TODO don't loop on permanent errors
-- TODO fixed number of retries instead of exponential backoff?
loop
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/FileTransfer/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do
throwError e
tryConnectAsync :: ME ()
tryConnectAsync = void . async $ do
withRetryInterval (reconnectInterval config) $ void . tryConnectClient
withRetryInterval (reconnectInterval config) $ \_ loop -> void $ tryConnectClient loop

showServer :: XFTPServer -> Text
showServer ProtocolServer {host, port} =
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/FileTransfer/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ data RcvFileChunk = RcvFileChunk
replicas :: [RcvFileChunkReplica],
fileTmpPath :: FilePath,
chunkTmpPath :: Maybe FilePath,
nextDelay :: Maybe Int
delay :: Maybe Int
}
deriving (Eq, Show)

Expand Down
18 changes: 12 additions & 6 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ module Simplex.Messaging.Agent.Store.SQLite
-- File transfer
createRcvFile,
getRcvFile,
updateRcvFileChunkDelay,
updateRcvFileChunkReceived,
updateRcvFileStatus,
updateRcvFileComplete,
Expand Down Expand Up @@ -1789,7 +1790,7 @@ getRcvFile db rcvFileId = runExceptT $ do
<$> DB.query
db
[sql|
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, next_delay
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, delay
FROM rcv_file_chunks
WHERE rcv_file_id = ?
|]
Expand All @@ -1799,8 +1800,8 @@ getRcvFile db rcvFileId = runExceptT $ do
pure (chunk {replicas = replicas'} :: RcvFileChunk)
where
toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk
toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, nextDelay) =
RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay, replicas = []}
toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, delay) =
RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []}
getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica]
getChunkReplicas chunkId = do
map toReplica
Expand All @@ -1821,6 +1822,11 @@ getRcvFile db rcvFileId = runExceptT $ do
let server = XFTPServer host port keyHash
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}

updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO ()
updateRcvFileChunkDelay db chunkId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId)

updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile)
updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do
updatedAt <- getCurrentTime
Expand Down Expand Up @@ -1850,7 +1856,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
db
[sql|
SELECT
f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.next_delay,
f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
Expand All @@ -1864,7 +1870,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
(host, port, keyHash)
where
toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk
toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) =
toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) =
RcvFileChunk
{ userId,
rcvFileId,
Expand All @@ -1874,7 +1880,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
digest,
fileTmpPath,
chunkTmpPath,
nextDelay,
delay,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ CREATE TABLE rcv_file_chunks (
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
tmp_path TEXT,
next_delay INTEGER,
delay INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ CREATE TABLE rcv_file_chunks(
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
tmp_path TEXT,
next_delay INTEGER,
delay INTEGER,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
Expand Down