Skip to content

Commit 97c98ce

Browse files
authored
HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)
Contributed by Mikhail Pryakhin
1 parent 9c290c0 commit 97c98ce

File tree

7 files changed

+313
-45
lines changed

7 files changed

+313
-45
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.FileNotFoundException;
2121
import java.io.IOException;
22-
import java.io.InputStream;
2322
import java.io.OutputStream;
2423
import java.net.URI;
2524
import java.net.URLDecoder;
@@ -516,20 +515,21 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
516515
disconnect(channel);
517516
throw new IOException(String.format(E_PATH_DIR, f));
518517
}
519-
InputStream is;
520518
try {
521519
// the path could be a symbolic link, so get the real path
522520
absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
523-
524-
is = channel.get(absolute.toUri().getPath());
525521
} catch (SftpException e) {
526522
throw new IOException(e);
527523
}
528-
return new FSDataInputStream(new SFTPInputStream(is, statistics)){
524+
return new FSDataInputStream(
525+
new SFTPInputStream(channel, absolute, statistics)){
529526
@Override
530527
public void close() throws IOException {
531-
super.close();
532-
disconnect(channel);
528+
try {
529+
super.close();
530+
} finally {
531+
disconnect(channel);
532+
}
533533
}
534534
};
535535
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,86 +15,114 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
1819
package org.apache.hadoop.fs.sftp;
1920

21+
import java.io.EOFException;
2022
import java.io.IOException;
2123
import java.io.InputStream;
2224

25+
import com.jcraft.jsch.ChannelSftp;
26+
import com.jcraft.jsch.SftpATTRS;
27+
import com.jcraft.jsch.SftpException;
28+
29+
import org.apache.hadoop.fs.FSExceptionMessages;
2330
import org.apache.hadoop.fs.FSInputStream;
2431
import org.apache.hadoop.fs.FileSystem;
32+
import org.apache.hadoop.fs.Path;
2533

2634
/** SFTP FileSystem input stream. */
2735
class SFTPInputStream extends FSInputStream {
2836

29-
public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
30-
public static final String E_NULL_INPUTSTREAM = "Null InputStream";
31-
public static final String E_STREAM_CLOSED = "Stream closed";
32-
37+
private final ChannelSftp channel;
38+
private final Path path;
3339
private InputStream wrappedStream;
3440
private FileSystem.Statistics stats;
3541
private boolean closed;
3642
private long pos;
43+
private long nextPos;
44+
private long contentLength;
3745

38-
SFTPInputStream(InputStream stream, FileSystem.Statistics stats) {
39-
40-
if (stream == null) {
41-
throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
46+
SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
47+
throws IOException {
48+
try {
49+
this.channel = channel;
50+
this.path = path;
51+
this.stats = stats;
52+
this.wrappedStream = channel.get(path.toUri().getPath());
53+
SftpATTRS stat = channel.lstat(path.toString());
54+
this.contentLength = stat.getSize();
55+
} catch (SftpException e) {
56+
throw new IOException(e);
4257
}
43-
this.wrappedStream = stream;
44-
this.stats = stats;
58+
}
4559

46-
this.pos = 0;
47-
this.closed = false;
60+
@Override
61+
public synchronized void seek(long position) throws IOException {
62+
checkNotClosed();
63+
if (position < 0) {
64+
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
65+
}
66+
nextPos = position;
4867
}
4968

5069
@Override
51-
public void seek(long position) throws IOException {
52-
throw new IOException(E_SEEK_NOTSUPPORTED);
70+
public synchronized int available() throws IOException {
71+
checkNotClosed();
72+
long remaining = contentLength - nextPos;
73+
if (remaining > Integer.MAX_VALUE) {
74+
return Integer.MAX_VALUE;
75+
}
76+
return (int) remaining;
77+
}
78+
79+
private void seekInternal() throws IOException {
80+
if (pos == nextPos) {
81+
return;
82+
}
83+
if (nextPos > pos) {
84+
long skipped = wrappedStream.skip(nextPos - pos);
85+
pos = pos + skipped;
86+
}
87+
if (nextPos < pos) {
88+
wrappedStream.close();
89+
try {
90+
wrappedStream = channel.get(path.toUri().getPath());
91+
pos = wrappedStream.skip(nextPos);
92+
} catch (SftpException e) {
93+
throw new IOException(e);
94+
}
95+
}
5396
}
5497

5598
@Override
5699
public boolean seekToNewSource(long targetPos) throws IOException {
57-
throw new IOException(E_SEEK_NOTSUPPORTED);
100+
return false;
58101
}
59102

60103
@Override
61-
public long getPos() throws IOException {
62-
return pos;
104+
public synchronized long getPos() throws IOException {
105+
return nextPos;
63106
}
64107

65108
@Override
66109
public synchronized int read() throws IOException {
67-
if (closed) {
68-
throw new IOException(E_STREAM_CLOSED);
110+
checkNotClosed();
111+
if (this.contentLength == 0 || (nextPos >= contentLength)) {
112+
return -1;
69113
}
70-
114+
seekInternal();
71115
int byteRead = wrappedStream.read();
72116
if (byteRead >= 0) {
73117
pos++;
118+
nextPos++;
74119
}
75120
if (stats != null & byteRead >= 0) {
76121
stats.incrementBytesRead(1);
77122
}
78123
return byteRead;
79124
}
80125

81-
public synchronized int read(byte[] buf, int off, int len)
82-
throws IOException {
83-
if (closed) {
84-
throw new IOException(E_STREAM_CLOSED);
85-
}
86-
87-
int result = wrappedStream.read(buf, off, len);
88-
if (result > 0) {
89-
pos += result;
90-
}
91-
if (stats != null & result > 0) {
92-
stats.incrementBytesRead(result);
93-
}
94-
95-
return result;
96-
}
97-
98126
public synchronized void close() throws IOException {
99127
if (closed) {
100128
return;
@@ -103,4 +131,12 @@ public synchronized void close() throws IOException {
103131
wrappedStream.close();
104132
closed = true;
105133
}
134+
135+
private void checkNotClosed() throws IOException {
136+
if (closed) {
137+
throw new IOException(
138+
path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
139+
);
140+
}
141+
}
106142
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ public void init() throws IOException {
6969

7070
}
7171

72+
/**
73+
* Any teardown logic can go here.
74+
* @throws IOException IO problems
75+
*/
76+
public void teardown() throws IOException {
77+
78+
}
79+
7280
/**
7381
* Add a configuration resource to this instance's configuration
7482
* @param resource resource reference
@@ -113,7 +121,7 @@ public FileSystem getFileSystem(URI uri) throws IOException {
113121
public abstract FileSystem getTestFileSystem() throws IOException;
114122

115123
/**
116-
* Get the scheme of this FS
124+
* Get the scheme of this FS.
117125
* @return the scheme this FS supports
118126
*/
119127
public abstract String getScheme();

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ public void teardown() throws Exception {
213213
Thread.currentThread().setName("teardown");
214214
LOG.debug("== Teardown ==");
215215
deleteTestDirInTeardown();
216+
if (contract != null) {
217+
contract.teardown();
218+
}
216219
LOG.debug("== Teardown complete ==");
217220
}
218221

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.contract.sftp;
20+
21+
import java.io.IOException;
22+
import java.io.UncheckedIOException;
23+
import java.net.URI;
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.List;
27+
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.FileSystem;
30+
import org.apache.hadoop.fs.FileSystemTestHelper;
31+
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.fs.contract.AbstractFSContract;
33+
import org.apache.hadoop.fs.sftp.SFTPFileSystem;
34+
import org.apache.sshd.common.NamedFactory;
35+
import org.apache.sshd.server.SshServer;
36+
import org.apache.sshd.server.auth.UserAuth;
37+
import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
38+
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
39+
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
40+
41+
public class SFTPContract extends AbstractFSContract {
42+
43+
private static final String CONTRACT_XML = "contract/sftp.xml";
44+
private static final URI TEST_URI =
45+
URI.create("sftp://user:password@localhost");
46+
private final String testDataDir =
47+
new FileSystemTestHelper().getTestRootDir();
48+
private final Configuration conf;
49+
private SshServer sshd;
50+
51+
public SFTPContract(Configuration conf) {
52+
super(conf);
53+
addConfResource(CONTRACT_XML);
54+
this.conf = conf;
55+
}
56+
57+
@Override
58+
public void init() throws IOException {
59+
sshd = SshServer.setUpDefaultServer();
60+
// ask OS to assign a port
61+
sshd.setPort(0);
62+
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
63+
64+
List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
65+
userAuthFactories.add(new UserAuthPasswordFactory());
66+
67+
sshd.setUserAuthFactories(userAuthFactories);
68+
sshd.setPasswordAuthenticator((username, password, session) ->
69+
username.equals("user") && password.equals("password")
70+
);
71+
72+
sshd.setSubsystemFactories(
73+
Collections.singletonList(new SftpSubsystemFactory()));
74+
75+
sshd.start();
76+
int port = sshd.getPort();
77+
78+
conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
79+
conf.setInt("fs.sftp.host.port", port);
80+
conf.setBoolean("fs.sftp.impl.disable.cache", true);
81+
}
82+
83+
@Override
84+
public void teardown() throws IOException {
85+
if (sshd != null) {
86+
sshd.stop();
87+
}
88+
}
89+
90+
@Override
91+
public FileSystem getTestFileSystem() throws IOException {
92+
return FileSystem.get(TEST_URI, conf);
93+
}
94+
95+
@Override
96+
public String getScheme() {
97+
return "sftp";
98+
}
99+
100+
@Override
101+
public Path getTestPath() {
102+
try {
103+
FileSystem fs = FileSystem.get(
104+
URI.create("sftp://user:password@localhost"), conf
105+
);
106+
return fs.makeQualified(new Path(testDataDir));
107+
} catch (IOException e) {
108+
throw new UncheckedIOException(e);
109+
}
110+
}
111+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.contract.sftp;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
23+
import org.apache.hadoop.fs.contract.AbstractFSContract;
24+
25+
public class TestSFTPContractSeek extends AbstractContractSeekTest {
26+
27+
@Override
28+
protected AbstractFSContract createContract(Configuration conf) {
29+
return new SFTPContract(conf);
30+
}
31+
}

0 commit comments

Comments
 (0)