diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 7a367623eaf197..59711d513dd510 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -59,18 +59,28 @@ struct AggregateJavaUdafData { AggregateJavaUdafData() = default; AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; } - ~AggregateJavaUdafData() { + ~AggregateJavaUdafData() = default; + + Status close_and_delete_object() { JNIEnv* env = nullptr; - if (!JniUtil::GetJNIEnv(&env).ok()) { + Defer defer {[&]() { + if (env != nullptr) { + env->DeleteGlobalRef(executor_cl); + env->DeleteGlobalRef(executor_obj); + } + }}; + Status st = JniUtil::GetJNIEnv(&env); + if (!st.ok()) { LOG(WARNING) << "Failed to get JNIEnv"; + return st; } env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id); - Status st = JniUtil::GetJniExceptionMsg(env); + st = JniUtil::GetJniExceptionMsg(env); if (!st.ok()) { LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string(); + return st; } - env->DeleteGlobalRef(executor_cl); - env->DeleteGlobalRef(executor_obj); + return Status::OK(); } Status init_udaf(const TFunction& fn, const std::string& local_location) { @@ -268,8 +278,8 @@ class AggregateJavaUdaf final } void create(AggregateDataPtr __restrict place) const override { + new (place) Data(argument_types.size()); if (_first_created) { - new (place) Data(argument_types.size()); Status status = Status::OK(); SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location)), @@ -279,16 +289,24 @@ class AggregateJavaUdaf final }); _first_created = false; _exec_place = place; + if (UNLIKELY(!status.ok())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string()); + } } } // To avoid multiple times JNI call, Here will destroy all data at once void destroy(AggregateDataPtr __restrict place) const noexcept override { if (place == _exec_place) { - static_cast(this->data(_exec_place).destroy()); - this->data(_exec_place).~Data(); + Status status = Status::OK(); + status = this->data(_exec_place).destroy(); + status = this->data(_exec_place).close_and_delete_object(); _first_created = true; + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to destroy function: " << status.to_string(); + } } + this->data(place).~Data(); } String get_name() const override { return _fn.name.function_name; } @@ -372,7 +390,6 @@ class AggregateJavaUdaf final // so it's can't call ~Data, only to change _destory_deserialize flag. void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena*) const override { - new (place) Data(argument_types.size()); this->data(place).read(buf); }