Skip to content

Commit

Permalink
[env]Introduce flink-env test suite (#17)
Browse files Browse the repository at this point in the history
* [env]Introduce flink-env test suite
  • Loading branch information
ljz2051 authored Mar 29, 2024
1 parent ec88681 commit de9582b
Show file tree
Hide file tree
Showing 23 changed files with 2,030 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,8 @@ else()
env/io_posix.cc
env/flink/env_flink.cc
env/flink/jvm_util.cc
env/flink/jni_helper.cc)
env/flink/jni_helper.cc
env/flink/env_flink_test_suite.cc)
endif()

if(USE_FOLLY_LITE)
Expand Down
2 changes: 1 addition & 1 deletion env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class FlinkDirectory : public FSDirectory {

FlinkFileSystem::FlinkFileSystem(const std::shared_ptr<FileSystem>& base_fs,
const std::string& base_path)
: FileSystemWrapper(base_fs), base_path_(base_path) {}
: FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {}

FlinkFileSystem::~FlinkFileSystem() {
if (file_system_instance_ != nullptr) {
Expand Down
8 changes: 8 additions & 0 deletions env/flink/env_flink.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ class FlinkFileSystem : public FileSystemWrapper {
const IOOptions& /*options*/, IODebugContext* /*dbg*/,
jobject* /*fileStatus*/);
std::string ConstructPath(const std::string& /*file_name*/);

static std::string TrimTrailingSlash(const std::string& base_path) {
if (!base_path.empty() && base_path.back() == '/') {
return base_path.substr(0, base_path.size() - 1);
} else {
return base_path;
}
}
};

// Returns a `FlinkEnv` with base_path
Expand Down
66 changes: 66 additions & 0 deletions env/flink/env_flink_test_suite.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "env/flink/env_flink_test_suite.h"

#include <fstream>
#include <iostream>

#define ASSERT_TRUE(expression) \
if (!(expression)) { \
std::cerr << "Assertion failed: " << #expression << ", file " << __FILE__ \
<< ", line " << __LINE__ << "." << std::endl; \
std::abort(); \
}

namespace ROCKSDB_NAMESPACE {

EnvFlinkTestSuites::EnvFlinkTestSuites(const std::string& basePath)
: base_path_(basePath) {}

void EnvFlinkTestSuites::runAllTestSuites() {
setUp();
testFileExist();
}

void EnvFlinkTestSuites::setUp() {
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_);
if (!status.ok()) {
throw std::runtime_error("New FlinkEnv failed");
}
}

void EnvFlinkTestSuites::testFileExist() {
std::string fileName("test-file");
Status result = flink_env_->FileExists(fileName);
ASSERT_TRUE(result.IsNotFound());

// Generate a file manually
const std::string prefix = "file:";
std::string writeFileName = base_path_ + fileName;
if (writeFileName.compare(0, prefix.size(), prefix) == 0) {
writeFileName = writeFileName.substr(prefix.size());
}
std::ofstream writeFile(writeFileName);
writeFile << "testFileExist";
writeFile.close();

result = flink_env_->FileExists(fileName);
ASSERT_TRUE(result.ok());
}
} // namespace ROCKSDB_NAMESPACE
34 changes: 34 additions & 0 deletions env/flink/env_flink_test_suite.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "env_flink.h"

namespace ROCKSDB_NAMESPACE {

class EnvFlinkTestSuites {
public:
EnvFlinkTestSuites(const std::string& basePath);
void runAllTestSuites();

private:
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env_;
const std::string base_path_;
void setUp();
void testFileExist();
};
} // namespace ROCKSDB_NAMESPACE
18 changes: 14 additions & 4 deletions env/flink/jni_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ IOStatus JavaClassCache::Init() {
cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].methodName =
"<init>";
cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].signature =
"(Lorg/apache/flink/core/fs/Path;)Z";
"(Ljava/lang/String;)V";

cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING]
.javaClassAndName = cached_java_classes_[JC_FLINK_PATH];
Expand All @@ -103,6 +103,8 @@ IOStatus JavaClassCache::Init() {
"get";
cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].signature =
"(Ljava/net/URI;)Lorg/apache/flink/core/fs/FileSystem;";
cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].isStatic =
true;

cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS]
.javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM];
Expand Down Expand Up @@ -251,9 +253,17 @@ IOStatus JavaClassCache::Init() {
int numCachedMethods =
sizeof(cached_java_methods_) / sizeof(JavaMethodContext);
for (int i = 0; i < numCachedMethods; i++) {
cached_java_methods_[i].javaMethod = jni_env_->GetMethodID(
cached_java_methods_[i].javaClassAndName.javaClass,
cached_java_methods_[i].methodName, cached_java_methods_[i].signature);
if (cached_java_methods_[i].isStatic) {
cached_java_methods_[i].javaMethod = jni_env_->GetStaticMethodID(
cached_java_methods_[i].javaClassAndName.javaClass,
cached_java_methods_[i].methodName,
cached_java_methods_[i].signature);
} else {
cached_java_methods_[i].javaMethod = jni_env_->GetMethodID(
cached_java_methods_[i].javaClassAndName.javaClass,
cached_java_methods_[i].methodName,
cached_java_methods_[i].signature);
}

if (!cached_java_methods_[i].javaMethod) {
return IOStatus::IOError(std::string("Exception when GetMethodID, ")
Expand Down
5 changes: 4 additions & 1 deletion env/flink/jni_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,16 @@ class JavaClassCache {
jmethodID javaMethod;
const char* methodName;
const char* signature;
bool isStatic = false;

std::string ToString() const {
return javaClassAndName.ToString()
.append(", methodName: ")
.append(methodName)
.append(", signature: ")
.append(signature);
.append(signature)
.append(", isStatic:")
.append(isStatic ? "true" : "false");
}
};

Expand Down
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/config_options.cc
rocksjni/env.cc
rocksjni/env_flink.cc
rocksjni/env_flink_test_suite.cc
rocksjni/env_options.cc
rocksjni/event_listener.cc
rocksjni/event_listener_jnicallback.cc
Expand Down Expand Up @@ -150,6 +151,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/DirectSlice.java
src/main/java/org/rocksdb/EncodingType.java
src/main/java/org/rocksdb/Env.java
src/main/java/org/rocksdb/EnvFlinkTestSuite.java
src/main/java/org/rocksdb/EnvOptions.java
src/main/java/org/rocksdb/EventListener.java
src/main/java/org/rocksdb/Experimental.java
Expand Down Expand Up @@ -458,6 +460,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4")
org.rocksdb.DBOptions
org.rocksdb.DirectSlice
org.rocksdb.Env
org.rocksdb.EnvFlinkTestSuite
org.rocksdb.EnvOptions
org.rocksdb.Filter
org.rocksdb.FlinkCompactionFilter
Expand Down
20 changes: 17 additions & 3 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ JAVA_TESTS = \
org.rocksdb.WriteOptionsTest\
org.rocksdb.WriteBatchWithIndexTest

FLINK_TESTS = \
org.rocksdb.flink.FlinkEnvTest

MAIN_SRC = src/main/java
TEST_SRC = src/test/java
OUTPUT = target
Expand Down Expand Up @@ -292,14 +295,15 @@ PLUGIN_SOURCES = $(foreach root, $(ROCKSDB_PLUGIN_JAVA_ROOTS), $(foreach pkg, or
CORE_SOURCES = $(foreach pkg, org/rocksdb/util org/rocksdb, $(MAIN_SRC)/$(pkg)/*.java)
SOURCES = $(wildcard $(CORE_SOURCES) $(PLUGIN_SOURCES))
PLUGIN_TEST_SOURCES = $(foreach root, $(ROCKSDB_PLUGIN_JAVA_ROOTS), $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb, $(root)/$(TEST_SRC)/$(pkg)/*.java))
CORE_TEST_SOURCES = $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb, $(TEST_SRC)/$(pkg)/*.java)
CORE_TEST_SOURCES = $(foreach pkg, org/rocksdb/test org/rocksdb/util org/rocksdb/flink org/rocksdb, $(TEST_SRC)/$(pkg)/*.java)
TEST_SOURCES = $(wildcard $(CORE_TEST_SOURCES) $(PLUGIN_TEST_SOURCES))
MOCK_FLINK_TEST_SOURCES = $(foreach pkg, org/apache/flink/core/fs org/apache/flink/state/forst/fs, flinktestmock/src/main/java/$(pkg)/*.java)

# Configure the plugin tests and java classes
ROCKSDB_PLUGIN_NATIVE_JAVA_CLASSES = $(foreach plugin, $(ROCKSDB_PLUGINS), $(foreach class, $($(plugin)_NATIVE_JAVA_CLASSES), $(class)))
NATIVE_JAVA_CLASSES = $(NATIVE_JAVA_CLASSES) $(ROCKSDB_PLUGIN_NATIVE_JAVA_CLASSES)
ROCKSDB_PLUGIN_JAVA_TESTS = $(foreach plugin, $(ROCKSDB_PLUGINS), $(foreach testclass, $($(plugin)_JAVA_TESTS), $(testclass)))
ALL_JAVA_TESTS = $(JAVA_TESTS) $(ROCKSDB_PLUGIN_JAVA_TESTS)
ALL_JAVA_TESTS = $(FLINK_TESTS) $(JAVA_TESTS) $(ROCKSDB_PLUGIN_JAVA_TESTS)

# When debugging add -Xcheck:jni to the java args
ifneq ($(DEBUG_LEVEL),0)
Expand Down Expand Up @@ -439,7 +443,7 @@ java_test: java resolve_test_deps
$(AM_V_at) $(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -h $(NATIVE_INCLUDE) -d $(TEST_CLASSES)\
$(TEST_SOURCES)

test: java java_test
test: java mock_flink_fs java_test
$(MAKE) run_test

run_test:
Expand All @@ -451,3 +455,13 @@ run_plugin_test:
db_bench: java
$(AM_V_GEN)mkdir -p $(BENCHMARK_MAIN_CLASSES)
$(AM_V_at)$(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES) -d $(BENCHMARK_MAIN_CLASSES) $(BENCHMARK_MAIN_SRC)/org/rocksdb/benchmark/*.java

mock_flink_fs:
$(AM_V_at) $(JAVAC_CMD) $(JAVAC_ARGS) -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -h $(NATIVE_INCLUDE) -d $(TEST_CLASSES) \
$(MOCK_FLINK_TEST_SOURCES)

flink_test: java java_test mock_flink_fs
$(MAKE) run_flink_test

run_flink_test:
$(JAVA_CMD) $(JAVA_ARGS) -Djava.library.path=target -cp "$(MAIN_CLASSES):$(TEST_CLASSES):$(JAVA_TESTCLASSPATH):target/*" org.rocksdb.test.RocksJunitRunner $(FLINK_TESTS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed
* by the Apache Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership.
*/

package org.apache.flink.core.fs;

/**
* Interface that represents the client side information for a file independent of the file system.
*/
public interface FileStatus {
/**
* Return the length of this file.
*
* @return the length of this file
*/
long getLen();

/**
* Get the block size of the file.
*
* @return the number of bytes
*/
long getBlockSize();

/**
* Get the replication factor of a file.
*
* @return the replication factor of a file.
*/
short getReplication();

/**
* Get the modification time of the file.
*
* @return the modification time of file in milliseconds since January 1, 1970 UTC.
*/
long getModificationTime();

/**
* Get the access time of the file.
*
* @return the access time of file in milliseconds since January 1, 1970 UTC.
*/
long getAccessTime();

/**
* Checks if this object represents a directory.
*
* @return <code>true</code> if this is a directory, <code>false</code> otherwise
*/
boolean isDir();

/**
* Returns the corresponding Path to the FileStatus.
*
* @return the corresponding Path to the FileStatus
*/
Path getPath();
}
Loading

0 comments on commit de9582b

Please sign in to comment.