Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for resuming SFTP file upload/download #864

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Renci.SshNet/IServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ internal partial interface IServiceFactory
/// <param name="fileName">The file to read.</param>
/// <param name="sftpSession">The SFTP session to use.</param>
/// <param name="bufferSize">The size of buffer.</param>
/// <param name="offset">The offset to resume from.</param>
/// <returns>
/// An <see cref="ISftpFileReader"/>.
/// </returns>
ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize);
ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize, ulong offset = 0);

/// <summary>
/// Creates a new <see cref="ISftpResponseFactory"/> instance.
Expand Down
9 changes: 6 additions & 3 deletions src/Renci.SshNet/ServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ public INetConfSession CreateNetConfSession(ISession session, int operationTimeo
/// <param name="fileName">The file to read.</param>
/// <param name="sftpSession">The SFTP session to use.</param>
/// <param name="bufferSize">The size of buffer.</param>
/// <param name="offset">The offset to resume from.</param>
/// <returns>
/// An <see cref="ISftpFileReader"/>.
/// </returns>
public ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize)
public ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize, ulong offset = 0)
{
const int defaultMaxPendingReads = 10;

Expand All @@ -152,7 +153,9 @@ public ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSe
{
var fileAttributes = sftpSession.EndLStat(statAsyncResult);
fileSize = fileAttributes.Size;
maxPendingReads = Math.Min(100, (int)Math.Ceiling((double)fileAttributes.Size / chunkSize) + 1);

// calculate maxPendingReads based on remaining size, not total filesize (needed for resume support)
maxPendingReads = Math.Min(100, (int) Math.Ceiling((double)(fileSize - (long)offset) / chunkSize) + 1);
}
catch (SshException ex)
{
Expand All @@ -162,7 +165,7 @@ public ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSe
DiagnosticAbstraction.Log(string.Format("Failed to obtain size of file. Allowing maximum {0} pending reads: {1}", maxPendingReads, ex));
}

return sftpSession.CreateFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize);
return sftpSession.CreateFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize, offset);
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/Renci.SshNet/Sftp/ISftpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,11 @@ void RequestWrite(byte[] handle,
/// <param name="chunkSize">The maximum number of bytes to read with each chunk.</param>
/// <param name="maxPendingReads">The maximum number of pending reads.</param>
/// <param name="fileSize">The size of the file or <see langword="null"/> when the size could not be determined.</param>
/// <param name="offset">The offset to resume from.</param>
/// <returns>
/// An <see cref="ISftpFileReader"/> for reading the content of the file represented by the
/// specified <paramref name="handle"/>.
/// </returns>
ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize);
ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize, ulong offset = 0);
}
}
8 changes: 7 additions & 1 deletion src/Renci.SshNet/Sftp/SftpFileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ internal sealed class SftpFileReader : ISftpFileReader
/// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
/// <param name="maxPendingReads">The maximum number of pending reads.</param>
/// <param name="fileSize">The size of the file, if known; otherwise, <see langword="null"/>.</param>
public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
/// <param name="offset">The offset to resume from.</param>
public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize, ulong offset = 0)
{
_handle = handle;
_sftpSession = sftpSession;
Expand All @@ -69,6 +70,11 @@ public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, i
_disposingWaitHandle = new ManualResetEvent(initialState: false);
_waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);

// When resuming a download (offset > 0), set the initial offset of the remote file to
// the same offset as the local output file. Read-ahead also starts at the same offset.
_offset = offset;
_readAheadOffset = offset;

StartReadAhead();
}

Expand Down
5 changes: 3 additions & 2 deletions src/Renci.SshNet/Sftp/SftpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,14 @@ public async Task<string> GetCanonicalPathAsync(string path, CancellationToken c
/// <param name="chunkSize">The maximum number of bytes to read with each chunk.</param>
/// <param name="maxPendingReads">The maximum number of pending reads.</param>
/// <param name="fileSize">The size of the file or <see langword="null"/> when the size could not be determined.</param>
/// <param name="offset">The offset to resume from.</param>
/// <returns>
/// An <see cref="ISftpFileReader"/> for reading the content of the file represented by the
/// specified <paramref name="handle"/>.
/// </returns>
public ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
public ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize, ulong offset = 0)
{
return new SftpFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize);
return new SftpFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize, offset);
}

internal string GetFullRemotePath(string path)
Expand Down
17 changes: 14 additions & 3 deletions src/Renci.SshNet/SftpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,12 @@ public void UploadFile(Stream input, string path, bool canOverride, Action<ulong

var flags = Flags.Write | Flags.Truncate;

if (canOverride)
if (input.Position > 0)
{
// if the local stream position is not zero, open the remote file in APPEND mode to resume upload
flags = Flags.Write | Flags.Append;
zybexXL marked this conversation as resolved.
Show resolved Hide resolved
}
else if (canOverride)
{
flags |= Flags.CreateNewOrOpen;
}
Expand Down Expand Up @@ -1117,7 +1122,12 @@ public IAsyncResult BeginUploadFile(Stream input, string path, bool canOverride,

var flags = Flags.Write | Flags.Truncate;

if (canOverride)
if (input.Position > 0)
{
// if the local stream position is not zero, open the remote file in APPEND mode to resume upload
flags = Flags.Write | Flags.Append;
zybexXL marked this conversation as resolved.
Show resolved Hide resolved
}
else if (canOverride)
{
flags |= Flags.CreateNewOrOpen;
}
Expand Down Expand Up @@ -2432,7 +2442,8 @@ private void InternalUploadFile(Stream input, string path, Flags flags, SftpUplo

var handle = _sftpSession.RequestOpen(fullPath, flags);

ulong offset = 0;
// Set the initial offset of the remote file to the same as the local file to allow resuming
var offset = (ulong)input.Position;

// create buffer of optimal length
var buffer = new byte[_sftpSession.CalculateOptimalWriteLength(_bufferSize, handle)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Throws(new SshException());
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 10, null))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 10, null, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 7, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 7, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 2, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 2, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 6, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 6, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 2, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 2, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 7, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 7, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, _maxPendingReads, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, _maxPendingReads, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void SetupMocks()
.Setup(p => p.EndLStat(_statAsyncResult))
.Returns(_fileAttributes);
_sftpSessionMock.InSequence(seq)
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 1, _fileSize))
.Setup(p => p.CreateFileReader(_handle, _sftpSessionMock.Object, _chunkSize, 1, _fileSize, 0))
.Returns(_sftpFileReaderMock.Object);
}

Expand Down