diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ad7a5cb0..e8866f2af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1019,7 +1019,8 @@ else() env/io_posix.cc env/flink/env_flink.cc env/flink/jvm_util.cc - env/flink/jni_helper.cc) + env/flink/jni_helper.cc + env/flink/env_flink_test_suite.cc) endif() if(USE_FOLLY_LITE) diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index 9ff8f5b6d..b963fe508 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -306,7 +306,7 @@ class FlinkDirectory : public FSDirectory { FlinkFileSystem::FlinkFileSystem(const std::shared_ptr& base_fs, const std::string& base_path) - : FileSystemWrapper(base_fs), base_path_(base_path) {} + : FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {} FlinkFileSystem::~FlinkFileSystem() { if (file_system_instance_ != nullptr) { diff --git a/env/flink/env_flink.h b/env/flink/env_flink.h index 2b937b050..04295815f 100644 --- a/env/flink/env_flink.h +++ b/env/flink/env_flink.h @@ -115,6 +115,14 @@ class FlinkFileSystem : public FileSystemWrapper { const IOOptions& /*options*/, IODebugContext* /*dbg*/, jobject* /*fileStatus*/); std::string ConstructPath(const std::string& /*file_name*/); + + static std::string TrimTrailingSlash(const std::string& base_path) { + if (!base_path.empty() && base_path.back() == '/') { + return base_path.substr(0, base_path.size() - 1); + } else { + return base_path; + } + } }; // Returns a `FlinkEnv` with base_path diff --git a/env/flink/env_flink_test_suite.cc b/env/flink/env_flink_test_suite.cc new file mode 100644 index 000000000..2b1a312ab --- /dev/null +++ b/env/flink/env_flink_test_suite.cc @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "env/flink/env_flink_test_suite.h" + +#include +#include + +#define ASSERT_TRUE(expression) \ + if (!(expression)) { \ + std::cerr << "Assertion failed: " << #expression << ", file " << __FILE__ \ + << ", line " << __LINE__ << "." << std::endl; \ + std::abort(); \ + } + +namespace ROCKSDB_NAMESPACE { + +EnvFlinkTestSuites::EnvFlinkTestSuites(const std::string& basePath) + : base_path_(basePath) {} + +void EnvFlinkTestSuites::runAllTestSuites() { + setUp(); + testFileExist(); +} + +void EnvFlinkTestSuites::setUp() { + auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_); + if (!status.ok()) { + throw std::runtime_error("New FlinkEnv failed"); + } +} + +void EnvFlinkTestSuites::testFileExist() { + std::string fileName("test-file"); + Status result = flink_env_->FileExists(fileName); + ASSERT_TRUE(result.IsNotFound()); + + // Generate a file manually + const std::string prefix = "file:"; + std::string writeFileName = base_path_ + fileName; + if (writeFileName.compare(0, prefix.size(), prefix) == 0) { + writeFileName = writeFileName.substr(prefix.size()); + } + std::ofstream writeFile(writeFileName); + writeFile << "testFileExist"; + writeFile.close(); + + result = flink_env_->FileExists(fileName); + ASSERT_TRUE(result.ok()); +} +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/env/flink/env_flink_test_suite.h b/env/flink/env_flink_test_suite.h new file mode 100644 index 000000000..3826060d5 --- /dev/null +++ b/env/flink/env_flink_test_suite.h @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "env_flink.h" + +namespace ROCKSDB_NAMESPACE { + +class EnvFlinkTestSuites { + public: + EnvFlinkTestSuites(const std::string& basePath); + void runAllTestSuites(); + + private: + std::unique_ptr flink_env_; + const std::string base_path_; + void setUp(); + void testFileExist(); +}; +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/env/flink/jni_helper.cc b/env/flink/jni_helper.cc index de82978e3..9be816c39 100644 --- a/env/flink/jni_helper.cc +++ b/env/flink/jni_helper.cc @@ -81,7 +81,7 @@ IOStatus JavaClassCache::Init() { cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].methodName = ""; cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].signature = - "(Lorg/apache/flink/core/fs/Path;)Z"; + "(Ljava/lang/String;)V"; cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING] .javaClassAndName = cached_java_classes_[JC_FLINK_PATH]; @@ -103,6 +103,8 @@ IOStatus JavaClassCache::Init() { "get"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].signature = "(Ljava/net/URI;)Lorg/apache/flink/core/fs/FileSystem;"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].isStatic = + true; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; @@ -251,9 +253,17 @@ IOStatus JavaClassCache::Init() { int numCachedMethods = sizeof(cached_java_methods_) / sizeof(JavaMethodContext); for (int i = 0; i < numCachedMethods; i++) { - cached_java_methods_[i].javaMethod = jni_env_->GetMethodID( - cached_java_methods_[i].javaClassAndName.javaClass, - cached_java_methods_[i].methodName, cached_java_methods_[i].signature); + if (cached_java_methods_[i].isStatic) { + cached_java_methods_[i].javaMethod = jni_env_->GetStaticMethodID( + cached_java_methods_[i].javaClassAndName.javaClass, + cached_java_methods_[i].methodName, + cached_java_methods_[i].signature); + } else { + cached_java_methods_[i].javaMethod = jni_env_->GetMethodID( + cached_java_methods_[i].javaClassAndName.javaClass, + cached_java_methods_[i].methodName, + cached_java_methods_[i].signature); + } if (!cached_java_methods_[i].javaMethod) { return IOStatus::IOError(std::string("Exception when GetMethodID, ") diff --git a/env/flink/jni_helper.h b/env/flink/jni_helper.h index 1927a2c07..54a6da85b 100644 --- a/env/flink/jni_helper.h +++ b/env/flink/jni_helper.h @@ -84,13 +84,16 @@ class JavaClassCache { jmethodID javaMethod; const char* methodName; const char* signature; + bool isStatic = false; std::string ToString() const { return javaClassAndName.ToString() .append(", methodName: ") .append(methodName) .append(", signature: ") - .append(signature); + .append(signature) + .append(", isStatic:") + .append(isStatic ? "true" : "false"); } }; diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 759f9967a..076800414 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -31,6 +31,7 @@ set(JNI_NATIVE_SOURCES rocksjni/config_options.cc rocksjni/env.cc rocksjni/env_flink.cc + rocksjni/env_flink_test_suite.cc rocksjni/env_options.cc rocksjni/event_listener.cc rocksjni/event_listener_jnicallback.cc @@ -150,6 +151,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/DirectSlice.java src/main/java/org/rocksdb/EncodingType.java src/main/java/org/rocksdb/Env.java + src/main/java/org/rocksdb/EnvFlinkTestSuite.java src/main/java/org/rocksdb/EnvOptions.java src/main/java/org/rocksdb/EventListener.java src/main/java/org/rocksdb/Experimental.java @@ -458,6 +460,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4") org.rocksdb.DBOptions org.rocksdb.DirectSlice org.rocksdb.Env + org.rocksdb.EnvFlinkTestSuite org.rocksdb.EnvOptions org.rocksdb.Filter org.rocksdb.FlinkCompactionFilter diff --git a/java/Makefile b/java/Makefile index ea8ca7eb6..0eb1cb36b 100644 --- a/java/Makefile +++ b/java/Makefile @@ -200,6 +200,9 @@ JAVA_TESTS = \ org.rocksdb.WriteOptionsTest\ org.rocksdb.WriteBatchWithIndexTest +FLINK_TESTS = \ + org.rocksdb.flink.FlinkEnvTest + MAIN_SRC = src/main/java TEST_SRC = src/test/java OUTPUT = target @@ -292,14 +295,15 @@ PLUGIN_SOURCES = $(foreach root, $(ROCKSDB_PLUGIN_JAVA_ROOTS), $(foreach pkg, or CORE_SOURCES = $(foreach pkg, org/rocksdb/util org/rocksdb, $(MAIN_SRC)/$(pkg)/*.java) SOURCES = $(wildcard $(CORE_SOURCES) $(PLUGIN_SOURCES)) PLUGIN_TEST_SOURCES = $(foreach root, $(ROCKSDB_PLUGIN_JAVA_ROOTS), $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb, $(root)/$(TEST_SRC)/$(pkg)/*.java)) -CORE_TEST_SOURCES = $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb, $(TEST_SRC)/$(pkg)/*.java) +CORE_TEST_SOURCES = $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb/flink org/rocksdb, $(TEST_SRC)/$(pkg)/*.java) TEST_SOURCES = $(wildcard $(CORE_TEST_SOURCES) $(PLUGIN_TEST_SOURCES)) +MOCK_FLINK_TEST_SOURCES = $(foreach pkg, org/apache/flink/core/fs org/apache/flink/state/forst/fs, flinktestmock/src/main/java/$(pkg)/*.java) # Configure the plugin tests and java classes ROCKSDB_PLUGIN_NATIVE_JAVA_CLASSES = $(foreach plugin, $(ROCKSDB_PLUGINS), $(foreach class, $($(plugin)_NATIVE_JAVA_CLASSES), $(class))) NATIVE_JAVA_CLASSES = $(NATIVE_JAVA_CLASSES) $(ROCKSDB_PLUGIN_NATIVE_JAVA_CLASSES) ROCKSDB_PLUGIN_JAVA_TESTS = $(foreach plugin, $(ROCKSDB_PLUGINS), $(foreach testclass, $($(plugin)_JAVA_TESTS), $(testclass))) -ALL_JAVA_TESTS = $(JAVA_TESTS) $(ROCKSDB_PLUGIN_JAVA_TESTS) +ALL_JAVA_TESTS = $(FLINK_TESTS) $(JAVA_TESTS) $(ROCKSDB_PLUGIN_JAVA_TESTS) # When debugging add -Xcheck:jni to the java args ifneq ($(DEBUG_LEVEL),0) @@ -439,7 +443,7 @@ java_test: java resolve_test_deps $(AM_V_at) $(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -h $(NATIVE_INCLUDE) -d $(TEST_CLASSES)\ $(TEST_SOURCES) -test: java java_test +test: java mock_flink_fs java_test $(MAKE) run_test run_test: @@ -451,3 +455,13 @@ run_plugin_test: db_bench: java $(AM_V_GEN)mkdir -p $(BENCHMARK_MAIN_CLASSES) $(AM_V_at)$(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES) -d $(BENCHMARK_MAIN_CLASSES) $(BENCHMARK_MAIN_SRC)/org/rocksdb/benchmark/*.java + +mock_flink_fs: + $(AM_V_at) $(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -h $(NATIVE_INCLUDE) -d $(TEST_CLASSES) \ + $(MOCK_FLINK_TEST_SOURCES) + +flink_test: java java_test mock_flink_fs + $(MAKE) run_flink_test + +run_flink_test: + $(JAVA_CMD) $(JAVA_ARGS) -Djava.library.path=target -cp "$(MAIN_CLASSES):$(TEST_CLASSES):$(JAVA_TESTCLASSPATH):target/*" org.rocksdb.test.RocksJunitRunner $(FLINK_TESTS) diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileStatus.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileStatus.java new file mode 100644 index 000000000..52d3360b7 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileStatus.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed + * by the Apache Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + */ + +package org.apache.flink.core.fs; + +/** + * Interface that represents the client side information for a file independent of the file system. + */ +public interface FileStatus { + /** + * Return the length of this file. + * + * @return the length of this file + */ + long getLen(); + + /** + * Get the block size of the file. + * + * @return the number of bytes + */ + long getBlockSize(); + + /** + * Get the replication factor of a file. + * + * @return the replication factor of a file. + */ + short getReplication(); + + /** + * Get the modification time of the file. + * + * @return the modification time of file in milliseconds since January 1, 1970 UTC. + */ + long getModificationTime(); + + /** + * Get the access time of the file. + * + * @return the access time of file in milliseconds since January 1, 1970 UTC. + */ + long getAccessTime(); + + /** + * Checks if this object represents a directory. + * + * @return true if this is a directory, false otherwise + */ + boolean isDir(); + + /** + * Returns the corresponding Path to the FileStatus. + * + * @return the corresponding Path to the FileStatus + */ + Path getPath(); +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileSystem.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileSystem.java new file mode 100644 index 000000000..5fef72b42 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed + * by the Apache Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + */ + +package org.apache.flink.core.fs; + +import static org.apache.flink.core.fs.LocalFileSystem.LOCAL_URI; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Objects; + +/** + * Abstract base class of all file systems used by Flink. This class may be extended to implement + * distributed file systems, or local file systems. The abstraction by this file system is very + * simple, and the set of available operations quite limited, to support the common denominator of a + * wide range of file systems. For example, appending to or mutating existing files is not + * supported. + */ +public abstract class FileSystem { + /** + * The possible write modes. The write mode decides what happens if a file should be created, + * but already exists. + */ + public enum WriteMode { + + /** + * Creates the target file only if no file exists at that path already. Does not overwrite + * existing files and directories. + */ + NO_OVERWRITE, + + /** + * Creates a new target file regardless of any existing files or directories. Existing files + * and directories will be deleted (recursively) automatically before creating the new file. + */ + OVERWRITE + } + + /** + * Returns a reference to the {@link FileSystem} instance for accessing the local file system. + * + * @return a reference to the {@link FileSystem} instance for accessing the local file system. + */ + public static FileSystem getLocalFileSystem() { + return LocalFileSystem.getSharedInstance(); + } + + /** + * Returns a reference to the {@link FileSystem} instance for accessing the file system + * identified by the given {@link URI}. + * + * @param uri the {@link URI} identifying the file system + * @return a reference to the {@link FileSystem} instance for accessing the file system + * identified by the given {@link URI}. + * @throws IOException thrown if a reference to the file system instance could not be obtained + */ + public static FileSystem get(URI uri) throws IOException { + if (Objects.equals(LOCAL_URI.getScheme(), uri.getScheme()) + && Objects.equals(LOCAL_URI.getAuthority(), LOCAL_URI.getAuthority())) { + return getLocalFileSystem(); + } + throw new UnsupportedOperationException("Unsupported URI pattern:" + uri); + } + + // ------------------------------------------------------------------------ + // File System Methods + // ------------------------------------------------------------------------ + + /** + * Returns the path of the file system's current working directory. + * + * @return the path of the file system's current working directory + */ + public abstract Path getWorkingDirectory(); + + /** + * Returns the path of the user's home directory in this file system. + * + * @return the path of the user's home directory in this file system. + */ + public abstract Path getHomeDirectory(); + + /** + * Returns a URI whose scheme and authority identify this file system. + * + * @return a URI whose scheme and authority identify this file system + */ + public abstract URI getUri(); + + /** + * Return a file status object that represents the path. + * + * @param f The path we want information from + * @return a FileStatus object + * @throws FileNotFoundException when the path does not exist; IOException see specific + * implementation + */ + public abstract FileStatus getFileStatus(Path f) throws IOException; + + /** + * Opens an FSDataInputStream at the indicated Path. + * + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ + public abstract InputStream open(Path f, int bufferSize) throws IOException; + + /** + * Opens an FSDataInputStream at the indicated Path. + * + * @param f the file to open + */ + public abstract InputStream open(Path f) throws IOException; + + /** + * List the statuses of the files/directories in the given path if the path is a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given path + * @throws IOException + */ + public abstract FileStatus[] listStatus(Path f) throws IOException; + + /** + * Check if exists. + * + * @param f source file + */ + public boolean exists(final Path f) throws IOException { + try { + return (getFileStatus(f) != null); + } catch (FileNotFoundException e) { + return false; + } + } + + /** + * Delete a file. + * + * @param f the path to delete + * @param recursive if path is a directory and set to true, the directory is + * deleted else throws an exception. In case of a file the recursive can be set to either + * true or false + * @return true if delete is successful, false otherwise + * @throws IOException + */ + public abstract boolean delete(Path f, boolean recursive) throws IOException; + + /** + * Make the given file and all non-existent parents into directories. Has the semantics of Unix + * 'mkdir -p'. Existence of the directory hierarchy is not an error. + * + * @param f the directory/directories to be created + * @return true if at least one new directory has been created, false + * otherwise + * @throws IOException thrown if an I/O error occurs while creating the directory + */ + public abstract boolean mkdirs(Path f) throws IOException; + + /** + * Opens an FSDataOutputStream at the indicated Path. + * + *

This method is deprecated, because most of its parameters are ignored by most file + * systems. To control for example the replication factor and block size in the Hadoop + * Distributed File system, make sure that the respective Hadoop configuration file is either + * linked from the Flink configuration, or in the classpath of either Flink or the user code. + * + * @param f the file name to open + * @param overwrite if a file with this name already exists, then if true, the file will be + * overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize the size of the file blocks + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + * @deprecated Deprecated because not well supported across types of file systems. Control the + * behavior of specific file systems via configurations instead. + */ + @Deprecated + public OutputStream create(Path f, boolean overwrite, int bufferSize, short replication, + long blockSize) throws IOException { + return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); + } + + /** + * Opens an FSDataOutputStream at the indicated Path. + * + * @param f the file name to open + * @param overwrite if a file with this name already exists, then if true, the file will be + * overwritten, and if false an error will be thrown. + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + * @deprecated Use {@link #create(Path, WriteMode)} instead. + */ + @Deprecated + public OutputStream create(Path f, boolean overwrite) throws IOException { + return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); + } + + /** + * Opens an FSDataOutputStream to a new file at the given path. + * + *

If the file already exists, the behavior depends on the given {@code WriteMode}. If the + * mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an exception. + * + * @param f The file path to write to + * @param overwriteMode The action to take if a file or directory already exists at the given + * path. + * @return The stream to the new file at the target path. + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + */ + public abstract OutputStream create(Path f, WriteMode overwriteMode) throws IOException; + + /** + * Renames the file/directory src to dst. + * + * @param src the file/directory to rename + * @param dst the new name of the file/directory + * @return true if the renaming was successful, false otherwise + * @throws IOException + */ + public abstract boolean rename(Path src, Path dst) throws IOException; + + /** + * Returns true if this is a distributed file system. A distributed file system here means that + * the file system is shared among all Flink processes that participate in a cluster or job and + * that all these processes can see the same files. + * + * @return True, if this is a distributed file system, false otherwise. + */ + public abstract boolean isDistributedFS(); +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataInputStream.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataInputStream.java new file mode 100644 index 000000000..64706ba8d --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataInputStream.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; + +/** + * The LocalDataInputStream class is a wrapper class for a data input stream to the + * local file system. + */ +public class LocalDataInputStream extends InputStream { + /** The file input stream used to read data from. */ + private final FileInputStream fis; + + private final FileChannel fileChannel; + + /** + * Constructs a new LocalDataInputStream object from a given {@link File} object. + * + * @param file The File the data stream is read from + * @throws IOException Thrown if the data input stream cannot be created. + */ + public LocalDataInputStream(File file) throws IOException { + this.fis = new FileInputStream(file); + this.fileChannel = fis.getChannel(); + } + + public void seek(long desired) throws IOException { + if (desired != getPos()) { + this.fileChannel.position(desired); + } + } + + public long getPos() throws IOException { + return this.fileChannel.position(); + } + + @Override + public int read() throws IOException { + return this.fis.read(); + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + return this.fis.read(buffer, offset, length); + } + + @Override + public void close() throws IOException { + // According to javadoc, this also closes the channel + this.fis.close(); + } + + @Override + public int available() throws IOException { + return this.fis.available(); + } + + @Override + public long skip(final long n) throws IOException { + return this.fis.skip(n); + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataOutputStream.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataOutputStream.java new file mode 100644 index 000000000..aabfcaa98 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalDataOutputStream.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.ClosedChannelException; + +/** + * The LocalDataOutputStream class is a wrapper class for a data output stream to the + * local file system. + */ +public class LocalDataOutputStream extends OutputStream { + /** The file output stream used to write data. */ + private final FileOutputStream fos; + + private boolean closed = false; + + /** + * Constructs a new LocalDataOutputStream object from a given {@link File} object. + * + * @param file the {@link File} object the data stream is read from + * @throws IOException thrown if the data output stream cannot be created + */ + public LocalDataOutputStream(final File file) throws IOException { + this.fos = new FileOutputStream(file); + } + + @Override + public void write(final int b) throws IOException { + checkOpen(); + fos.write(b); + } + + @Override + public void write(final byte[] b) throws IOException { + checkOpen(); + fos.write(b); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + checkOpen(); + fos.write(b, off, len); + } + + @Override + public void close() throws IOException { + closed = true; + fos.close(); + } + + @Override + public void flush() throws IOException { + checkOpen(); + fos.flush(); + } + + public void sync() throws IOException { + checkOpen(); + fos.getFD().sync(); + } + + public long getPos() throws IOException { + checkOpen(); + return fos.getChannel().position(); + } + + private void checkOpen() throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileStatus.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileStatus.java new file mode 100644 index 000000000..b79f112ce --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileStatus.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.File; + +/** + * The class LocalFileStatus provides an implementation of the {@link FileStatus} + * interface for the local file system. + */ +public class LocalFileStatus implements FileStatus { + /** The file this file status belongs to. */ + private final File file; + + /** The path of this file this file status belongs to. */ + private final Path path; + + /** Cached length field, to avoid repeated native/syscalls. */ + private final long len; + + /** + * Creates a LocalFileStatus object from a given {@link File} object. + * + * @param f the {@link File} object this LocalFileStatus refers to + * @param fs the file system the corresponding file has been read from + */ + public LocalFileStatus(final File f, final FileSystem fs) { + this.file = f; + this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath()); + this.len = f.length(); + } + + @Override + public long getAccessTime() { + return 0; // We don't have access files for local files + } + + @Override + public long getBlockSize() { + return this.len; + } + + @Override + public long getLen() { + return this.len; + } + + @Override + public long getModificationTime() { + return this.file.lastModified(); + } + + @Override + public short getReplication() { + return 1; // For local files replication is always 1 + } + + @Override + public boolean isDir() { + return this.file.isDirectory(); + } + + @Override + public Path getPath() { + return this.path; + } + + public File getFile() { + return this.file; + } + + @Override + public String toString() { + return "LocalFileStatus{" + + "file=" + file + ", path=" + path + '}'; + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileSystem.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileSystem.java new file mode 100644 index 000000000..863d689f3 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/LocalFileSystem.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Parts of earlier versions of this file were based on source code from the + * Hadoop Project (http://hadoop.apache.org/), licensed by the Apache Software Foundation (ASF) + * under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. + */ + +package org.apache.flink.core.fs; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.AccessDeniedException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.StandardCopyOption; + +/** + * The class {@code LocalFileSystem} is an implementation of the {@link FileSystem} interface for + * the local file system of the machine where the JVM runs. + */ +public class LocalFileSystem extends FileSystem { + /** The URI representing the local file system. */ + public static final URI LOCAL_URI = URI.create("file:///"); + + /** The shared instance of the local file system. */ + private static final LocalFileSystem INSTANCE = new LocalFileSystem(); + + /** + * Path pointing to the current working directory. Because Paths are not immutable, we cannot + * cache the proper path here + */ + private final URI workingDir; + + /** + * Path pointing to the current user home directory. Because Paths are not immutable, we cannot + * cache the proper path here. + */ + private final URI homeDir; + + /** Constructs a new LocalFileSystem object. */ + public LocalFileSystem() { + this.workingDir = new File(System.getProperty("user.dir")).toURI(); + this.homeDir = new File(System.getProperty("user.home")).toURI(); + } + + // ------------------------------------------------------------------------ + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + final File path = pathToFile(f); + if (path.exists()) { + return new LocalFileStatus(path, this); + } else { + throw new FileNotFoundException("File " + f + " does not exist or the user running " + + "Flink ('" + System.getProperty("user.name") + + "') has insufficient permissions to access it."); + } + } + + @Override + public URI getUri() { + return LOCAL_URI; + } + + @Override + public Path getWorkingDirectory() { + return new Path(workingDir); + } + + @Override + public Path getHomeDirectory() { + return new Path(homeDir); + } + + @Override + public InputStream open(final Path f, final int bufferSize) throws IOException { + return open(f); + } + + @Override + public InputStream open(final Path f) throws IOException { + final File file = pathToFile(f); + return new LocalDataInputStream(file); + } + + @Override + public boolean exists(Path f) throws IOException { + final File path = pathToFile(f); + return path.exists(); + } + + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + final File localf = pathToFile(f); + FileStatus[] results; + + if (!localf.exists()) { + return null; + } + if (localf.isFile()) { + return new FileStatus[] {new LocalFileStatus(localf, this)}; + } + + final String[] names = localf.list(); + if (names == null) { + return null; + } + results = new FileStatus[names.length]; + for (int i = 0; i < names.length; i++) { + results[i] = getFileStatus(new Path(f, names[i])); + } + + return results; + } + + @Override + public boolean delete(final Path f, final boolean recursive) throws IOException { + final File file = pathToFile(f); + if (file.isFile()) { + return file.delete(); + } else if ((!recursive) && file.isDirectory()) { + File[] containedFiles = file.listFiles(); + if (containedFiles == null) { + throw new IOException( + "Directory " + file.toString() + " does not exist or an I/O error occurred"); + } else if (containedFiles.length != 0) { + throw new IOException("Directory " + file.toString() + " is not empty"); + } + } + + return delete(file); + } + + /** + * Deletes the given file or directory. + * + * @param f the file to be deleted + * @return true if all files were deleted successfully, false + * otherwise + * @throws IOException thrown if an error occurred while deleting the files/directories + */ + private boolean delete(final File f) throws IOException { + if (f.isDirectory()) { + final File[] files = f.listFiles(); + if (files != null) { + for (File file : files) { + final boolean del = delete(file); + if (!del) { + return false; + } + } + } + } else { + return f.delete(); + } + + // Now directory is empty + return f.delete(); + } + + /** + * Recursively creates the directory specified by the provided path. + * + * @return trueif the directories either already existed or have been created + * successfully, false otherwise + * @throws IOException thrown if an error occurred while creating the directory/directories + */ + @Override + public boolean mkdirs(final Path f) throws IOException { + assert f != null; + return mkdirsInternal(pathToFile(f)); + } + + private boolean mkdirsInternal(File file) throws IOException { + if (file.isDirectory()) { + return true; + } else if (file.exists() && !file.isDirectory()) { + // Important: The 'exists()' check above must come before the 'isDirectory()' check to + // be safe when multiple parallel instances try to create the directory + + // exists and is not a directory -> is a regular file + throw new FileAlreadyExistsException(file.getAbsolutePath()); + } else { + File parent = file.getParentFile(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); + } + } + + @Override + public OutputStream create(final Path filePath, final WriteMode overwrite) throws IOException { + // checkNotNull(filePath, "filePath"); + + if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) { + throw new FileAlreadyExistsException("File already exists: " + filePath); + } + + final Path parent = filePath.getParent(); + if (parent != null && !mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent); + } + + final File file = pathToFile(filePath); + return new LocalDataOutputStream(file); + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + final File srcFile = pathToFile(src); + final File dstFile = pathToFile(dst); + + final File dstParent = dstFile.getParentFile(); + + // Files.move fails if the destination directory doesn't exist + // noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was + // created + dstParent.mkdirs(); + + try { + Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + return true; + } catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException + | SecurityException ex) { + // catch the errors that are regular "move failed" exceptions and return false + return false; + } + } + + @Override + public boolean isDistributedFS() { + return false; + } + + // ------------------------------------------------------------------------ + + /** + * Converts the given Path to a File for this file system. If the path is empty, we will return + * new File(".") instead of new File(""), since the latter returns + * false for isDirectory judgement (See issue + * https://issues.apache.org/jira/browse/FLINK-18612). + */ + public File pathToFile(Path path) { + String localPath = path.getPath(); + // checkState(localPath != null, "Cannot convert a null path to File"); + + if (localPath.length() == 0) { + return new File("."); + } + + return new File(localPath); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the URI that represents the local file system. That URI is {@code "file:/"} on Windows + * platforms and {@code "file:///"} on other UNIX family platforms. + * + * @return The URI that represents the local file system. + */ + public static URI getLocalFsURI() { + return LOCAL_URI; + } + + /** + * Gets the shared instance of this file system. + * + * @return The shared instance of this file system. + */ + public static LocalFileSystem getSharedInstance() { + return INSTANCE; + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/core/fs/Path.java b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/Path.java new file mode 100644 index 000000000..1d06ae4be --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/core/fs/Path.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed + * by the Apache Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. */ + +package org.apache.flink.core.fs; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.regex.Pattern; + +/** + * Names a file or directory in a {@link FileSystem}. Path strings use slash as the directory + * separator. A path string is absolute if it begins with a slash. + * + *

Tailing slashes are removed from the path. + * + *

Note: Path will no longer implement {@link IOReadableWritable} in future versions. Please use + * {@code serializeToDataOutputView} and {@code deserializeFromDataInputView} instead. + */ +public class Path implements Serializable { + private static final long serialVersionUID = 1L; + + /** The directory separator, a slash. */ + public static final String SEPARATOR = "/"; + + /** The directory separator, a slash (character). */ + public static final char SEPARATOR_CHAR = '/'; + + /** Character denoting the current directory. */ + public static final String CUR_DIR = "."; + + /** A pre-compiled regex/state-machine to match the windows drive pattern. */ + private static final Pattern WINDOWS_ROOT_DIR_REGEX = Pattern.compile("/\\p{Alpha}+:/"); + + /** The internal representation of the path, a hierarchical URI. */ + private URI uri; + + /** Constructs a new (empty) path object (used to reconstruct path object after RPC call). */ + public Path() {} + + /** + * Constructs a path object from a given URI. + * + * @param uri the URI to construct the path object from + */ + public Path(URI uri) { + this.uri = uri; + } + + /** + * Resolve a child path against a parent path. + * + * @param parent the parent path + * @param child the child path + */ + public Path(String parent, String child) { + this(new Path(parent), new Path(child)); + } + + /** + * Resolve a child path against a parent path. + * + * @param parent the parent path + * @param child the child path + */ + public Path(Path parent, String child) { + this(parent, new Path(child)); + } + + /** + * Resolve a child path against a parent path. + * + * @param parent the parent path + * @param child the child path + */ + public Path(String parent, Path child) { + this(new Path(parent), child); + } + + /** + * Resolve a child path against a parent path. + * + * @param parent the parent path + * @param child the child path + */ + public Path(Path parent, Path child) { + // Add a slash to parent's path so resolution is compatible with URI's + URI parentUri = parent.uri; + final String parentPath = parentUri.getPath(); + if (!(parentPath.equals("/") || parentPath.equals(""))) { + try { + parentUri = new URI( + parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null, null); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + if (child.uri.getPath().startsWith(Path.SEPARATOR)) { + child = new Path( + child.uri.getScheme(), child.uri.getAuthority(), child.uri.getPath().substring(1)); + } + + final URI resolved = parentUri.resolve(child.uri); + initialize(resolved.getScheme(), resolved.getAuthority(), resolved.getPath()); + } + + /** + * Checks if the provided path string is either null or has zero length and throws a {@link + * IllegalArgumentException} if any of the two conditions apply. + * + * @param path the path string to be checked + * @return The checked path. + */ + private String checkPathArg(String path) { + // disallow construction of a Path from an empty string + if (path == null) { + throw new IllegalArgumentException("Can not create a Path from a null string"); + } + if (path.length() == 0) { + throw new IllegalArgumentException("Can not create a Path from an empty string"); + } + return path; + } + + /** + * Construct a path from a String. Path strings are URIs, but with unescaped elements and some + * additional normalization. + * + * @param pathString the string to construct a path from + */ + public Path(String pathString) { + pathString = checkPathArg(pathString); + + // We can't use 'new URI(String)' directly, since it assumes things are + // escaped, which we don't require of Paths. + + // add a slash in front of paths with Windows drive letters + if (hasWindowsDrive(pathString, false)) { + pathString = "/" + pathString; + } + + // parse uri components + String scheme = null; + String authority = null; + + int start = 0; + + // parse uri scheme, if any + final int colon = pathString.indexOf(':'); + final int slash = pathString.indexOf('/'); + if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a + // scheme + scheme = pathString.substring(0, colon); + start = colon + 1; + } + + // parse uri authority, if any + if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has authority + final int nextSlash = pathString.indexOf('/', start + 2); + final int authEnd = nextSlash > 0 ? nextSlash : pathString.length(); + authority = pathString.substring(start + 2, authEnd); + start = authEnd; + } + + // uri path is the rest of the string -- query & fragment not supported + final String path = pathString.substring(start, pathString.length()); + + initialize(scheme, authority, path); + } + + /** + * Construct a Path from a scheme, an authority and a path string. + * + * @param scheme the scheme string + * @param authority the authority string + * @param path the path string + */ + public Path(String scheme, String authority, String path) { + path = checkPathArg(path); + initialize(scheme, authority, path); + } + + /** + * Initializes a path object given the scheme, authority and path string. + * + * @param scheme the scheme string. + * @param authority the authority string. + * @param path the path string. + */ + private void initialize(String scheme, String authority, String path) { + try { + this.uri = new URI(scheme, authority, normalizePath(path), null, null).normalize(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Normalizes a path string. + * + * @param path the path string to normalize + * @return the normalized path string + */ + private String normalizePath(String path) { + // remove consecutive slashes & backslashes + path = path.replace("\\", "/"); + path = path.replaceAll("/+", "/"); + + // remove tailing separator + if (path.endsWith(SEPARATOR) && !path.equals(SEPARATOR) && // UNIX root path + !WINDOWS_ROOT_DIR_REGEX.matcher(path).matches()) { // Windows root path) + + // remove tailing slash + path = path.substring(0, path.length() - SEPARATOR.length()); + } + + return path; + } + + /** + * Converts the path object to a {@link URI}. + * + * @return the {@link URI} object converted from the path object + */ + public URI toUri() { + return uri; + } + + /** + * Returns the FileSystem that owns this Path. + * + * @return the FileSystem that owns this Path + * @throws IOException thrown if the file system could not be retrieved + */ + public FileSystem getFileSystem() throws IOException { + return FileSystem.get(this.toUri()); + } + + /** + * Checks if the directory of this path is absolute. + * + * @return true if the directory of this path is absolute, false + * otherwise + */ + public boolean isAbsolute() { + final int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0; + return uri.getPath().startsWith(SEPARATOR, start); + } + + /** + * Returns the final component of this path, i.e., everything that follows the last separator. + * + * @return the final component of the path + */ + public String getName() { + final String path = uri.getPath(); + final int slash = path.lastIndexOf(SEPARATOR); + return path.substring(slash + 1); + } + + /** + * Return full path. + * + * @return full path + */ + public String getPath() { + return uri.getPath(); + } + + /** + * Returns the parent of a path, i.e., everything that precedes the last separator or null + * if at root. + * + * @return the parent of a path or null if at root. + */ + public Path getParent() { + final String path = uri.getPath(); + final int lastSlash = path.lastIndexOf('/'); + final int start = hasWindowsDrive(path, true) ? 3 : 0; + if ((path.length() == start) || // empty path + (lastSlash == start && path.length() == start + 1)) { // at root + return null; + } + String parent; + if (lastSlash == -1) { + parent = CUR_DIR; + } else { + final int end = hasWindowsDrive(path, true) ? 3 : 0; + parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash); + } + return new Path(uri.getScheme(), uri.getAuthority(), parent); + } + + /** + * Adds a suffix to the final name in the path. + * + * @param suffix The suffix to be added + * @return the new path including the suffix + */ + public Path suffix(String suffix) { + return new Path(getParent(), getName() + suffix); + } + + @Override + public String toString() { + // we can't use uri.toString(), which escapes everything, because we want + // illegal characters unescaped in the string, for glob processing, etc. + final StringBuilder buffer = new StringBuilder(); + if (uri.getScheme() != null) { + buffer.append(uri.getScheme()); + buffer.append(":"); + } + if (uri.getAuthority() != null) { + buffer.append("//"); + buffer.append(uri.getAuthority()); + } + if (uri.getPath() != null) { + String path = uri.getPath(); + if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has windows drive + uri.getScheme() == null && // but no scheme + uri.getAuthority() == null) { // or authority + path = path.substring(1); // remove slash before drive + } + buffer.append(path); + } + return buffer.toString(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Path)) { + return false; + } + Path that = (Path) o; + return this.uri.equals(that.uri); + } + + @Override + public int hashCode() { + return uri.hashCode(); + } + + public int compareTo(Object o) { + Path that = (Path) o; + return this.uri.compareTo(that.uri); + } + + /** + * Returns the number of elements in this path. + * + * @return the number of elements in this path + */ + public int depth() { + String path = uri.getPath(); + int depth = 0; + int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0; + while (slash != -1) { + depth++; + slash = path.indexOf(SEPARATOR, slash + 1); + } + return depth; + } + + /** + * Returns a qualified path object. + * + * @param fs the FileSystem that should be used to obtain the current working directory + * @return the qualified path object + */ + public Path makeQualified(FileSystem fs) { + Path path = this; + if (!isAbsolute()) { + path = new Path(fs.getWorkingDirectory(), this); + } + + final URI pathUri = path.toUri(); + final URI fsUri = fs.getUri(); + + String scheme = pathUri.getScheme(); + String authority = pathUri.getAuthority(); + + if (scheme != null && (authority != null || fsUri.getAuthority() == null)) { + return path; + } + + if (scheme == null) { + scheme = fsUri.getScheme(); + } + + if (authority == null) { + authority = fsUri.getAuthority(); + if (authority == null) { + authority = ""; + } + } + + return new Path(scheme + ":" + + "//" + authority + pathUri.getPath()); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Checks if the provided path string contains a windows drive letter. + * + * @return True, if the path string contains a windows drive letter, false otherwise. + */ + public boolean hasWindowsDrive() { + return hasWindowsDrive(uri.getPath(), true); + } + + /** + * Checks if the provided path string contains a windows drive letter. + * + * @param path the path to check + * @param slashed true to indicate the first character of the string is a slash, false otherwise + * @return true if the path string contains a windows drive letter, false otherwise + */ + private boolean hasWindowsDrive(String path, boolean slashed) { + final int start = slashed ? 1 : 0; + return path.length() >= start + 2 && (!slashed || path.charAt(0) == '/') + && path.charAt(start + 1) == ':' + && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') + || (path.charAt(start) >= 'a' && path.charAt(start) <= 'z')); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Creates a path for the given local file. + * + *

This method is useful to make sure the path creation for local files works seamlessly + * across different operating systems. Especially Windows has slightly different rules for + * slashes between schema and a local file path, making it sometimes tricky to produce + * cross-platform URIs for local files. + * + * @param file The file that the path should represent. + * @return A path representing the local file URI of the given file. + */ + public static Path fromLocalFile(File file) { + return new Path(file.toURI()); + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java new file mode 100644 index 000000000..b38a518bc --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.apache.flink.core.fs.LocalDataInputStream; +import org.apache.flink.core.fs.Path; + +/** + * ByteBufferReadableFSDataInputStream. + */ +public class ByteBufferReadableFSDataInputStream extends InputStream { + private final LocalDataInputStream localDataInputStream; + private final Path path; + private final long totalFileSize; + + public ByteBufferReadableFSDataInputStream( + Path path, InputStream inputStream, long totalFileSize) { + if (!(inputStream instanceof LocalDataInputStream)) { + throw new UnsupportedOperationException("Unsupported input stream type"); + } + this.localDataInputStream = (LocalDataInputStream) inputStream; + this.path = path; + this.totalFileSize = totalFileSize; + } + + public void seek(long desired) throws IOException { + localDataInputStream.seek(desired); + } + + public long getPos() throws IOException { + return localDataInputStream.getPos(); + } + + @Override + public int read() throws IOException { + return localDataInputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return localDataInputStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return localDataInputStream.read(b, off, len); + } + + /** + * Return the total number of bytes read into the buffer. + * REQUIRES: External synchronization + */ + public int readFully(ByteBuffer bb) throws IOException { + return readFullyFromFSDataInputStream(localDataInputStream, bb); + } + + private int readFullyFromFSDataInputStream(LocalDataInputStream fsdis, ByteBuffer bb) + throws IOException { + byte[] tmp = new byte[bb.remaining()]; + int n = 0; + long pos = fsdis.getPos(); + while (n < tmp.length) { + int read = fsdis.read(tmp, n, tmp.length - n); + if (read == -1) { + break; + } + n += read; + } + if (n > 0) { + bb.put(tmp, 0, n); + } + return n; + } + + /** + * Return the total number of bytes read into the buffer. + * Safe for concurrent use by multiple threads. + */ + public int readFully(long position, ByteBuffer bb) throws IOException { + localDataInputStream.seek(position); + return readFullyFromFSDataInputStream(localDataInputStream, bb); + } + + @Override + public long skip(long n) throws IOException { + seek(getPos() + n); + return getPos(); + } + + @Override + public int available() throws IOException { + return localDataInputStream.available(); + } + + @Override + public void close() throws IOException { + localDataInputStream.close(); + } + + @Override + public synchronized void mark(int readlimit) { + localDataInputStream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + localDataInputStream.reset(); + } + + @Override + public boolean markSupported() { + return localDataInputStream.markSupported(); + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java new file mode 100644 index 000000000..9c59fda3b --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.flink.core.fs.LocalDataOutputStream; +import org.apache.flink.core.fs.Path; + +/** + * ByteBufferWritableFSDataOutputStream. + */ +public class ByteBufferWritableFSDataOutputStream extends OutputStream { + private final Path path; + private final LocalDataOutputStream localDataOutputStream; + + public ByteBufferWritableFSDataOutputStream(Path path, OutputStream fsdos) { + if (!(fsdos instanceof LocalDataOutputStream)) { + throw new UnsupportedOperationException("Unsupported output stream type"); + } + this.path = path; + this.localDataOutputStream = (LocalDataOutputStream) fsdos; + } + + public long getPos() throws IOException { + return localDataOutputStream.getPos(); + } + + @Override + public void write(int b) throws IOException { + localDataOutputStream.write(b); + } + + public void write(byte[] b) throws IOException { + localDataOutputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + localDataOutputStream.write(b, off, len); + } + + public void write(ByteBuffer bb) throws IOException { + if (bb.hasArray()) { + write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + } else { + byte[] tmp = new byte[bb.remaining()]; + bb.get(tmp); + write(tmp, 0, tmp.length); + } + } + + @Override + public void flush() throws IOException { + localDataOutputStream.flush(); + } + + public void sync() throws IOException { + localDataOutputStream.sync(); + } + + @Override + public void close() throws IOException { + localDataOutputStream.close(); + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java new file mode 100644 index 000000000..afb32d754 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +/** + * RemoteRocksdbFlinkFileSystem, used to expose flink fileSystem interface to frocksdb. + */ +public class ForStFlinkFileSystem extends FileSystem { + private final FileSystem flinkFS; + + public ForStFlinkFileSystem(FileSystem flinkFS) { + this.flinkFS = flinkFS; + } + + public static FileSystem get(URI uri) throws IOException { + return new ForStFlinkFileSystem(FileSystem.get(uri)); + } + + @Override + public Path getWorkingDirectory() { + return flinkFS.getWorkingDirectory(); + } + + @Override + public Path getHomeDirectory() { + return flinkFS.getHomeDirectory(); + } + + @Override + public URI getUri() { + return flinkFS.getUri(); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return flinkFS.getFileStatus(f); + } + + @Override + public ByteBufferReadableFSDataInputStream open(Path f, int bufferSize) throws IOException { + InputStream original = flinkFS.open(f, bufferSize); + long fileSize = flinkFS.getFileStatus(f).getLen(); + return new ByteBufferReadableFSDataInputStream(f, original, fileSize); + } + + @Override + public ByteBufferReadableFSDataInputStream open(Path f) throws IOException { + InputStream original = flinkFS.open(f); + long fileSize = flinkFS.getFileStatus(f).getLen(); + return new ByteBufferReadableFSDataInputStream(f, original, fileSize); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return flinkFS.listStatus(f); + } + + @Override + public boolean exists(final Path f) throws IOException { + return flinkFS.exists(f); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return flinkFS.delete(f, recursive); + } + + @Override + public boolean mkdirs(Path f) throws IOException { + return flinkFS.mkdirs(f); + } + + public ByteBufferWritableFSDataOutputStream create(Path f) throws IOException { + return create(f, WriteMode.OVERWRITE); + } + + @Override + public ByteBufferWritableFSDataOutputStream create(Path f, WriteMode overwriteMode) + throws IOException { + OutputStream original = flinkFS.create(f, overwriteMode); + return new ByteBufferWritableFSDataOutputStream(f, original); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + // The rename is not atomic for RocksDB. Some FileSystems e.g. HDFS, OSS does not allow a + // renaming if the target already exists. So, we delete the target before attempting the + // rename. + if (flinkFS.exists(dst)) { + boolean deleted = flinkFS.delete(dst, false); + if (!deleted) { + throw new IOException("Fail to delete dst path: " + dst); + } + } + return flinkFS.rename(src, dst); + } + + @Override + public boolean isDistributedFS() { + return flinkFS.isDistributedFS(); + } +} diff --git a/java/rocksjni/env_flink_test_suite.cc b/java/rocksjni/env_flink_test_suite.cc new file mode 100644 index 000000000..5e66ca746 --- /dev/null +++ b/java/rocksjni/env_flink_test_suite.cc @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "env/flink/env_flink_test_suite.h" + +#include + +#include "include/org_rocksdb_EnvFlinkTestSuite.h" +#include "java/rocksjni/portal.h" + +/* + * Class: org_rocksdb_EnvFlinkTestSuite + * Method: buildNativeObject + * Signature: (Ljava/lang/String;)J + */ +jlong Java_org_rocksdb_EnvFlinkTestSuite_buildNativeObject(JNIEnv* env, jobject, + jstring basePath) { + jboolean has_exception = JNI_FALSE; + auto path = + ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, basePath, &has_exception); + if (has_exception == JNI_TRUE) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Could not copy jstring to std::string"); + return 0; + } + auto env_flink_test_suites = new ROCKSDB_NAMESPACE::EnvFlinkTestSuites(path); + return reinterpret_cast(env_flink_test_suites); +} + +/* + * Class: org_rocksdb_EnvFlinkTestSuite + * Method: runAllTestSuites + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_EnvFlinkTestSuite_runAllTestSuites( + JNIEnv* jniEnv, jobject, jlong objectHandle) { + auto env_flink_test_suites = + reinterpret_cast(objectHandle); + env_flink_test_suites->runAllTestSuites(); + if (jniEnv->ExceptionCheck()) { + jthrowable throwable = jniEnv->ExceptionOccurred(); + jniEnv->ExceptionDescribe(); + jniEnv->ExceptionClear(); + jniEnv->Throw(throwable); + } +} + +/* + * Class: org_rocksdb_EnvFlinkTestSuite + * Method: disposeInternal + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_EnvFlinkTestSuite_disposeInternal( + JNIEnv*, jobject, jlong objectHandle) { + auto test_suites = + reinterpret_cast(objectHandle); + delete test_suites; +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/EnvFlinkTestSuite.java b/java/src/main/java/org/rocksdb/EnvFlinkTestSuite.java new file mode 100644 index 000000000..92e503509 --- /dev/null +++ b/java/src/main/java/org/rocksdb/EnvFlinkTestSuite.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb; + +/** + * The test suite used for flink-env interfaces testing. You could define and implement test + * procedures in the "env/flink/env_flink_test_suite.h" and "env/flink/env_flink_test_suite.cc", and + * these tests will be executed by EnvFlinkTestSuite#runAllTestSuites. + */ +public class EnvFlinkTestSuite implements AutoCloseable { + private final String basePath; + + private final long nativeObjectHandle; + + public EnvFlinkTestSuite(String basePath) { + this.basePath = basePath; + this.nativeObjectHandle = buildNativeObject(basePath); + } + + private native long buildNativeObject(String basePath); + + private native void runAllTestSuites(long nativeObjectHandle); + + private native void disposeInternal(long nativeObjectHandle); + + public void runAllTestSuites() { + runAllTestSuites(nativeObjectHandle); + } + + @Override + public void close() throws Exception { + disposeInternal(nativeObjectHandle); + } +} \ No newline at end of file diff --git a/java/src/test/java/org/rocksdb/flink/FlinkEnvTest.java b/java/src/test/java/org/rocksdb/flink/FlinkEnvTest.java new file mode 100644 index 000000000..5c7166557 --- /dev/null +++ b/java/src/test/java/org/rocksdb/flink/FlinkEnvTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb.flink; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.EnvFlinkTestSuite; +import org.rocksdb.RocksNativeLibraryResource; + +/** + * Unit test for env/flink/env_flink.cc. + */ +public class FlinkEnvTest { + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + @Rule public TemporaryFolder parentFolder = new TemporaryFolder(); + + @Test + public void runEnvFlinkTestSuites() throws Exception { + String basePath = parentFolder.newFolder().toURI().toString(); + try (EnvFlinkTestSuite testSuite = new EnvFlinkTestSuite(basePath)) { + testSuite.runAllTestSuites(); + } + } +} \ No newline at end of file diff --git a/src.mk b/src.mk index 41f4c0076..1aaa0a949 100644 --- a/src.mk +++ b/src.mk @@ -114,6 +114,7 @@ LIB_SOURCES = \ env/flink/env_flink.cc \ env/flink/jvm_util.cc \ env/flink/jni_helper.cc \ + env/flink/env_flink_test_suite.cc \ file/delete_scheduler.cc \ file/file_prefetch_buffer.cc \ file/file_util.cc \ @@ -664,6 +665,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/export_import_files_metadatajni.cc \ java/rocksjni/env.cc \ java/rocksjni/env_flink.cc \ + java/rocksjni/env_flink_test_suite.cc \ java/rocksjni/env_options.cc \ java/rocksjni/event_listener.cc \ java/rocksjni/event_listener_jnicallback.cc \