Skip to content

Commit

Permalink
apacheGH-384: SFTP: improve FileChannel locking
Browse files Browse the repository at this point in the history
SSH_FXP_BLOCK is available only in SFTPv6. It was introduced in draft
08. Throw an UnsupportedOperationException if the negotiated SFTP
version is lower.

Respect the contract of FileChannel.lock(): a shared lock can be
requested only if the channel was opened for reading, and an exclusive
lock only if the channel is writable. Check this, and throw appropriate
exceptions if the condition is violated.

Pass on the correct lock flags to the server. At the server, repeat the
version check, and respect the flags passed. Since Java does not have
delete locks we map them to write locks. The draft RFC[1] is silent on
what to do if no flags are passed at all: we use an exclusive  write
lock if the handle was opened for writing, and a shared read lock
otherwise. We cannot throw an exception since we must be able to support
old Apache MINA sshd clients that sent zero as lock flags.

[1] https://www.ietf.org/archive/id/draft-ietf-secsh-filexfer-13.txt

Bug: apache#384
  • Loading branch information
tomaswolf committed Jun 3, 2023
1 parent b1d5cfe commit 2a9c46c
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [GH-370](https://github.com/apache/mina-sshd/issues/370) Also compare file keys in `ModifiableFileWatcher`.
* [GH-371](https://github.com/apache/mina-sshd/issues/371) Fix channel pool in `SftpFileSystem`.
* [GH-383](https://github.com/apache/mina-sshd/issues/383) Use correct default `OpenOption`s in `SftpFileSystemProvider.newFileChannel()`.
* [GH-384](https://github.com/apache/mina-sshd/issues/384) Use correct lock modes for SFTP `FileChannel.lock()`.

* [SSHD-1259](https://issues.apache.org/jira/browse/SSHD-1259) Consider all applicable host keys from the known_hosts files.
* [SSHD-1310](https://issues.apache.org/jira/browse/SSHD-1310) `SftpFileSystem`: do not close user session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ public void link(String linkPath, String targetPath, boolean symbolic) throws IO
Buffer buffer = new ByteArrayBuffer(linkPath.length() + targetPath.length() + Long.SIZE /* some extra fields */, false);
if (version < SftpConstants.SFTP_V6) {
if (!symbolic) {
throw new UnsupportedOperationException("Hard links are not supported in sftp v" + version);
throw new UnsupportedOperationException("Hard links are not supported in sftp v" + version + ", need SFTPv6");
}
buffer = putReferencedName(cmd, buffer, targetPath, 0);
buffer = putReferencedName(cmd, buffer, linkPath, 1);
Expand All @@ -1110,7 +1110,10 @@ public void lock(Handle handle, long offset, long length, int mask) throws IOExc
"lock(" + handle + ")[offset=" + offset + ", length=" + length + ", mask=0x" + Integer.toHexString(mask)
+ "] client is closed");
}

int version = getVersion();
if (version < SftpConstants.SFTP_V6) {
throw new UnsupportedOperationException("File locks are not supported in sftp v" + version + ", need SFTPv6");
}
byte[] id = Objects.requireNonNull(handle, "No handle").getIdentifier();
Buffer buffer = new ByteArrayBuffer(id.length + Long.SIZE /* a bit extra */, false);
buffer.putBytes(id);
Expand All @@ -1129,6 +1132,10 @@ public void unlock(Handle handle, long offset, long length) throws IOException {
if (!isOpen()) {
throw new IOException("unlock" + handle + ")[offset=" + offset + ", length=" + length + "] client is closed");
}
int version = getVersion();
if (version < SftpConstants.SFTP_V6) {
throw new UnsupportedOperationException("File locks are not supported in sftp v" + version + ", need SFTPv6");
}

byte[] id = Objects.requireNonNull(handle, "No handle").getIdentifier();
Buffer buffer = new ByteArrayBuffer(id.length + Long.SIZE /* a bit extra */, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.NonReadableChannelException;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.OverlappingFileLockException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
Expand All @@ -41,7 +43,6 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.sftp.SftpModuleProperties;
import org.apache.sshd.sftp.client.SftpClient;
Expand Down Expand Up @@ -78,10 +79,13 @@ public SftpRemotePathChannel(String path, SftpClient sftp, boolean closeOnExit,
throws IOException {
this.log = LoggerFactory.getLogger(getClass());
this.path = ValidateUtils.checkNotNullAndNotEmpty(path, "No remote file path specified");
this.modes = Objects.requireNonNull(modes, "No channel modes specified");
this.modes = Collections.unmodifiableSet(EnumSet.copyOf(modes));
if (this.modes.isEmpty()) {
throw new IllegalArgumentException("At least one OpenMode is required for a SftpRemotePathChannel");
}
this.sftp = Objects.requireNonNull(sftp, "No SFTP client instance");
this.closeOnExit = closeOnExit;
this.handle = sftp.open(path, modes);
this.handle = sftp.open(path, this.modes);
}

public String getRemotePath() {
Expand Down Expand Up @@ -119,7 +123,10 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
}

protected long doRead(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
ensureOpen(READ_MODES);
if (!isOpen()) {
throw new ClosedChannelException();
}
ensureMode(false);

ClientSession clientSession = sftp.getClientSession();
int copySize = SftpModuleProperties.COPY_BUF_SIZE.getRequired(clientSession);
Expand Down Expand Up @@ -223,7 +230,10 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
}

protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
ensureOpen(WRITE_MODES);
if (!isOpen()) {
throw new ClosedChannelException();
}
ensureMode(true);

ClientSession clientSession = sftp.getClientSession();
int copySize = SftpModuleProperties.COPY_BUF_SIZE.getRequired(clientSession);
Expand Down Expand Up @@ -282,7 +292,9 @@ protected long doWrite(Collection<? extends ByteBuffer> buffers, long position)

@Override
public long position() throws IOException {
ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
return posTracker.get();
}

Expand All @@ -293,28 +305,37 @@ public FileChannel position(long newPosition) throws IOException {
+ " illegal file channel position: " + newPosition);
}

ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
posTracker.set(newPosition);
return this;
}

@Override
public long size() throws IOException {
ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
Attributes stat = sftp.stat(handle);
return stat.getSize();
}

@Override
public FileChannel truncate(long size) throws IOException {
ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
ensureMode(true);
sftp.setStat(handle, new SftpClient.Attributes().size(size));
return this;
}

@Override
public void force(boolean metaData) throws IOException {
ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
}

@Override
Expand All @@ -323,7 +344,10 @@ public long transferTo(long position, long count, WritableByteChannel target) th
throw new IllegalArgumentException("transferTo(" + getRemotePath() + ")"
+ " illegal position (" + position + ") or count (" + count + ")");
}
ensureOpen(READ_MODES);
if (!isOpen() || !target.isOpen()) {
throw new ClosedChannelException();
}
ensureMode(false);

ClientSession clientSession = sftp.getClientSession();
int copySize = SftpModuleProperties.COPY_BUF_SIZE.getRequired(clientSession);
Expand Down Expand Up @@ -359,7 +383,10 @@ public long transferTo(long position, long count, WritableByteChannel target) th
this, position, count, copySize, totalRead, eof, target);
}

return (totalRead > 0L) ? totalRead : eof ? -1L : 0L;
if (totalRead > 0) {
return totalRead;
}
return eof ? -1 : 0;
}

@Override
Expand All @@ -368,7 +395,10 @@ public long transferFrom(ReadableByteChannel src, long position, long count) thr
throw new IllegalArgumentException("transferFrom(" + getRemotePath() + ")"
+ " illegal position (" + position + ") or count (" + count + ")");
}
ensureOpen(WRITE_MODES);
if (!isOpen() || !src.isOpen()) {
throw new ClosedChannelException();
}
ensureMode(true);

ClientSession clientSession = sftp.getClientSession();
int copySize = SftpModuleProperties.COPY_BUF_SIZE.getRequired(clientSession);
Expand Down Expand Up @@ -431,10 +461,14 @@ public FileLock lock(long position, long size, boolean shared) throws IOExceptio

@Override
public FileLock tryLock(long position, long size, boolean shared) throws IOException {
ensureOpen(Collections.emptySet());
if (!isOpen()) {
throw new ClosedChannelException();
}
ensureMode(!shared);

int lockFlags = shared ? SftpConstants.SSH_FXF_READ_LOCK : SftpConstants.SSH_FXF_WRITE_LOCK;
try {
sftp.lock(handle, position, size, 0);
sftp.lock(handle, position, size, lockFlags);
} catch (SftpException e) {
if (e.getStatus() == SftpConstants.SSH_FX_LOCK_CONFLICT) {
throw new OverlappingFileLockException();
Expand All @@ -452,7 +486,7 @@ public boolean isValid() {

@Override
public void release() throws IOException {
if (valid.compareAndSet(true, false)) {
if (valid.getAndSet(false)) {
sftp.unlock(handle, position, size);
}
}
Expand Down Expand Up @@ -500,27 +534,15 @@ protected void endBlocking(Object actionHint, boolean completed)
end(completed);
}

/**
* Checks that the channel is open and that its current mode contains at least one of the required ones
*
* @param reqModes The required modes - ignored if {@code null}/empty
* @throws IOException If channel not open or the required modes are not satisfied
*/
private void ensureOpen(Collection<OpenMode> reqModes) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}

if (GenericUtils.size(reqModes) > 0) {
for (OpenMode m : reqModes) {
if (this.modes.contains(m)) {
return;
}
private void ensureMode(boolean forWriting) {
if (!forWriting && !modes.contains(OpenMode.Read)) {
throw new NonReadableChannelException();
} else if (forWriting) {
EnumSet<OpenMode> myModes = EnumSet.copyOf(modes);
myModes.retainAll(WRITE_MODES);
if (myModes.isEmpty()) {
throw new NonWritableChannelException();
}

throw new IOException("ensureOpen(" + getRemotePath() + ")"
+ " current channel modes (" + this.modes + ")"
+ " do contain any of the required ones: " + reqModes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.channels.NonReadableChannelException;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand Down Expand Up @@ -57,6 +59,11 @@ public FileHandle(SftpSubsystem subsystem, Path file, String handle, int flags,
super(subsystem, file, handle);

Set<StandardOpenOption> options = getOpenOptions(flags, access);
// We are going to open a channel. If neither APPEND nor WRITE are explicitly specified, the default will be
// READ.
if (!options.contains(StandardOpenOption.WRITE) && !options.contains(StandardOpenOption.APPEND)) {
options.add(StandardOpenOption.READ);
}
// Java cannot do READ | WRITE | APPEND; it throws an IllegalArgumentException "READ+APPEND not allowed". So
// just open READ | WRITE, and use the ACE4_APPEND_DATA access flag to indicate that we need to handle "append"
// mode ourselves. ACE4_APPEND_DATA should only have an effect if the file is indeed opened for APPEND mode.
Expand Down Expand Up @@ -164,22 +171,57 @@ public void close() throws IOException {
}

public void lock(long offset, long length, int mask) throws IOException {
// We map delete locks to write locks, and we ignore the advisory bit.
boolean writeLock = (mask & (SftpConstants.SSH_FXF_WRITE_LOCK | SftpConstants.SSH_FXF_DELETE_LOCK)) != 0;
if (!writeLock) {
// If read and write are requested, it's a write lock.
boolean readLock = (mask & SftpConstants.SSH_FXF_READ_LOCK) != 0;
// Draft RFC is silent on what to do if no flags at all are set:
// https://www.ietf.org/archive/id/draft-ietf-secsh-filexfer-13.txt
// If the handle was opened for reading, use a read lock, otherwise a write lock.
if (!readLock && canWrite()) {
writeLock = true;
}
}
if (writeLock && !canWrite()) {
throw new SftpException(SftpConstants.SSH_FX_BYTE_RANGE_LOCK_REFUSED,
"Write lock requested, but handle opened for reading only");
} else if (!writeLock && !canRead()) {
throw new SftpException(SftpConstants.SSH_FX_BYTE_RANGE_LOCK_REFUSED,
"Read lock requested, but handle opened for writing only");
}
SeekableByteChannel channel = getFileChannel();
long size = (length == 0L) ? channel.size() - offset : length;
SftpSubsystem subsystem = getSubsystem();
SftpFileSystemAccessor accessor = subsystem.getFileSystemAccessor();
FileLock lock = accessor.tryLock(
subsystem, this, getFile(), getFileHandle(), channel, offset, size, false);
if (lock == null) {
throw new SftpException(SftpConstants.SSH_FX_BYTE_RANGE_LOCK_REFUSED,
"Overlapping lock held by another program on range [" + offset + "-" + (offset + length));
FileLock lock = null;
try {
lock = accessor.tryLock(subsystem, this, getFile(), getFileHandle(), channel, offset, size, !writeLock);
} catch (NonReadableChannelException | NonWritableChannelException e) {
SftpException error = new SftpException(SftpConstants.SSH_FX_BYTE_RANGE_LOCK_REFUSED,
"Could not acquire channel lock; write=" + writeLock + ": " + e.toString());
error.initCause(e);
throw error;
}

if (lock == null) {
throw new SftpException(SftpConstants.SSH_FX_BYTE_RANGE_LOCK_CONFLICT,
"Overlapping lock held by another program on range [" + offset + "-" + (offset + length) + "]");
}
synchronized (locks) {
locks.add(lock);
}
}

private boolean canRead() {
return getOpenOptions().contains(StandardOpenOption.READ);
}

private boolean canWrite() {
Set<StandardOpenOption> options = getOpenOptions();
return options.contains(StandardOpenOption.WRITE) || options.contains(StandardOpenOption.APPEND);
}

public void unlock(long offset, long length) throws IOException {
SeekableByteChannel channel = getFileChannel();
long size = (length == 0L) ? channel.size() - offset : length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ protected void doVersionSelect(Buffer buffer, int id, String proposed) throws IO

@Override
protected void doBlock(int id, String handle, long offset, long length, int mask) throws IOException {
int vers = getVersion();
if (vers < SftpConstants.SFTP_V6) {
throw new UnsupportedOperationException("File locks are not supported in sftp v" + vers + ", need SFTPv6");
}
Handle p = handles.get(handle);
ServerSession session = getServerSession();
if (log.isDebugEnabled()) {
Expand All @@ -555,6 +559,10 @@ protected void doBlock(int id, String handle, long offset, long length, int mask

@Override
protected void doUnblock(int id, String handle, long offset, long length) throws IOException {
int vers = getVersion();
if (vers < SftpConstants.SFTP_V6) {
throw new UnsupportedOperationException("File locks are not supported in sftp v" + vers + ", need SFTPv6");
}
Handle p = handles.get(handle);
ServerSession session = getServerSession();
if (log.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 2a9c46c

Please sign in to comment.