Skip to content

Commit

Permalink
[bug](udaf) fix memory leak in the java udaf (#32630)
Browse files Browse the repository at this point in the history
fix memory leak in the java udaf
  • Loading branch information
zhangstar333 authored Mar 22, 2024
1 parent 57f25c1 commit 70919f5
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,28 @@ struct AggregateJavaUdafData {
AggregateJavaUdafData() = default;
AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; }

~AggregateJavaUdafData() {
~AggregateJavaUdafData() = default;

Status close_and_delete_object() {
JNIEnv* env = nullptr;
if (!JniUtil::GetJNIEnv(&env).ok()) {
Defer defer {[&]() {
if (env != nullptr) {
env->DeleteGlobalRef(executor_cl);
env->DeleteGlobalRef(executor_obj);
}
}};
Status st = JniUtil::GetJNIEnv(&env);
if (!st.ok()) {
LOG(WARNING) << "Failed to get JNIEnv";
return st;
}
env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
Status st = JniUtil::GetJniExceptionMsg(env);
st = JniUtil::GetJniExceptionMsg(env);
if (!st.ok()) {
LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string();
return st;
}
env->DeleteGlobalRef(executor_cl);
env->DeleteGlobalRef(executor_obj);
return Status::OK();
}

Status init_udaf(const TFunction& fn, const std::string& local_location) {
Expand Down Expand Up @@ -268,8 +278,8 @@ class AggregateJavaUdaf final
}

void create(AggregateDataPtr __restrict place) const override {
new (place) Data(argument_types.size());
if (_first_created) {
new (place) Data(argument_types.size());
Status status = Status::OK();
SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
this->data(place).init_udaf(_fn, _local_location)),
Expand All @@ -279,16 +289,24 @@ class AggregateJavaUdaf final
});
_first_created = false;
_exec_place = place;
if (UNLIKELY(!status.ok())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string());
}
}
}

// To avoid multiple times JNI call, Here will destroy all data at once
void destroy(AggregateDataPtr __restrict place) const noexcept override {
if (place == _exec_place) {
static_cast<void>(this->data(_exec_place).destroy());
this->data(_exec_place).~Data();
Status status = Status::OK();
status = this->data(_exec_place).destroy();
status = this->data(_exec_place).close_and_delete_object();
_first_created = true;
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to destroy function: " << status.to_string();
}
}
this->data(place).~Data();
}

String get_name() const override { return _fn.name.function_name; }
Expand Down Expand Up @@ -372,7 +390,6 @@ class AggregateJavaUdaf final
// so it's can't call ~Data, only to change _destory_deserialize flag.
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
new (place) Data(argument_types.size());
this->data(place).read(buf);
}

Expand Down

0 comments on commit 70919f5

Please sign in to comment.