Skip to content

Commit

Permalink
[env] Introduce interface of env_flink (ververica#7)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4a511b3)
  • Loading branch information
masteryhx authored and fredia committed Sep 24, 2024
1 parent b8cb45e commit 0a7f5f1
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,8 @@ else()
env/fs_posix.cc
env/io_posix.cc
env/flink/env_flink.cc
env/flink/jvm_util.cc)
env/flink/jvm_util.cc
env/flink/jni_helper.cc)
endif()

if(USE_FOLLY_LITE)
Expand Down
76 changes: 76 additions & 0 deletions env/flink/jni_helper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "jni_helper.h"

namespace ROCKSDB_NAMESPACE {

JavaClassCache::JavaClassCache(JNIEnv *env) : jni_env_(env) {
// Set all class names
cached_java_classes_[JavaClassCache::JC_URI].className = "java/net/URI";
cached_java_classes_[JavaClassCache::JC_BYTE_BUFFER].className =
"java/nio/ByteBuffer";
cached_java_classes_[JavaClassCache::JC_THROWABLE].className =
"java/lang/Throwable";
cached_java_classes_[JavaClassCache::JC_FLINK_PATH].className =
"org/apache/flink/core/fs/Path";
cached_java_classes_[JavaClassCache::JC_FLINK_FILE_SYSTEM].className =
"org/apache/flink/state/forst/fs/ForStFlinkFileSystem";
cached_java_classes_[JavaClassCache::JC_FLINK_FILE_STATUS].className =
"org/apache/flink/core/fs/FileStatus";
cached_java_classes_[JavaClassCache::JC_FLINK_FS_INPUT_STREAM].className =
"org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream";
cached_java_classes_[JavaClassCache::JC_FLINK_FS_OUTPUT_STREAM].className =
"org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream";

// Try best to create and set the jclass objects based on the class names set
// above
int numCachedClasses =
sizeof(cached_java_classes_) / sizeof(javaClassAndName);
for (int i = 0; i < numCachedClasses; i++) {
initCachedClass(cached_java_classes_[i].className,
&cached_java_classes_[i].javaClass);
}
}

JavaClassCache::~JavaClassCache() {
// Release all global ref of cached jclasses
for (const auto &item : cached_java_classes_) {
if (item.javaClass) {
jni_env_->DeleteGlobalRef(item.javaClass);
}
}
}

Status JavaClassCache::initCachedClass(const char *className,
jclass *cachedJclass) {
jclass tempLocalClassRef = jni_env_->FindClass(className);
if (!tempLocalClassRef) {
return Status::IOError("Exception when FindClass, class name: " +
std::string(className));
}
*cachedJclass = (jclass)jni_env_->NewGlobalRef(tempLocalClassRef);
if (!*cachedJclass) {
return Status::IOError("Exception when NewGlobalRef, class name " +
std::string(className));
}

jni_env_->DeleteLocalRef(tempLocalClassRef);
return Status::OK();
}

Status JavaClassCache::GetJClass(CachedJavaClass cachedJavaClass,
jclass *javaClass) {
jclass targetClass = cached_java_classes_[cachedJavaClass].javaClass;
Status status = Status::OK();
if (!targetClass) {
status = initCachedClass(cached_java_classes_[cachedJavaClass].className,
&targetClass);
}
*javaClass = targetClass;
return status;
}

} // namespace ROCKSDB_NAMESPACE
45 changes: 45 additions & 0 deletions env/flink/jni_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "jni.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {

// A cache for java classes to avoid calling FindClass frequently
class JavaClassCache {
public:
// Frequently-used class type representing jclasses which will be cached.
typedef enum {
JC_URI,
JC_BYTE_BUFFER,
JC_THROWABLE,
JC_FLINK_PATH,
JC_FLINK_FILE_SYSTEM,
JC_FLINK_FILE_STATUS,
JC_FLINK_FS_INPUT_STREAM,
JC_FLINK_FS_OUTPUT_STREAM,
NUM_CACHED_CLASSES
} CachedJavaClass;

// Constructor and Destructor
explicit JavaClassCache(JNIEnv* env);
~JavaClassCache();

// Get jclass by specific CachedJavaClass
Status GetJClass(CachedJavaClass cachedJavaClass, jclass* javaClass);

private:
typedef struct {
jclass javaClass;
const char* className;
} javaClassAndName;

JNIEnv* jni_env_;
javaClassAndName cached_java_classes_[JavaClassCache::NUM_CACHED_CLASSES];

Status initCachedClass(const char* className, jclass* cachedClass);
};
} // namespace ROCKSDB_NAMESPACE
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ LIB_SOURCES = \
env/unique_id_gen.cc \
env/flink/env_flink.cc \
env/flink/jvm_util.cc \
env/flink/jni_helper.cc \
file/delete_scheduler.cc \
file/file_prefetch_buffer.cc \
file/file_util.cc \
Expand Down

0 comments on commit 0a7f5f1

Please sign in to comment.