Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Hashmap build opt for semi/anti/exists join (#197)
Browse files Browse the repository at this point in the history
Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Mar 29, 2021
1 parent 155afa4 commit fc16976
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ case class ColumnarShuffledHashJoinExec(
}
}

val builder_type = {
if (condition.isDefined) 1
else {
joinType match {
case LeftSemi =>
3
case LeftAnti =>
3
case j: ExistenceJoin =>
3
case other =>
1
}
}
}

def buildCheck(): Unit = {
// build check for condition
val conditionExpr: Expression = condition.orNull
Expand Down Expand Up @@ -180,7 +196,8 @@ case class ColumnarShuffledHashJoinExec(
ColumnarCodegenContext(
inputSchema,
null,
ColumnarConditionedProbeJoin.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1))
ColumnarConditionedProbeJoin
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type))
}

override def supportColumnarCodegen: Boolean = true
Expand Down Expand Up @@ -256,7 +273,7 @@ case class ColumnarShuffledHashJoinExec(
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()
val hash_relation_function =
ColumnarConditionedProbeJoin
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1)
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type)
val hash_relation_schema = ConverterUtils.toArrowSchema(buildPlan.output)
val hash_relation_expr =
TreeBuilder.makeExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object ColumnarConditionedProbeJoin extends Logging {
def prepareHashBuildFunction(
buildKeys: Seq[Expression],
buildInputAttributes: Seq[Attribute],
builder_type: Int = 0,
builder_type: Int = 1,
is_broadcast: Boolean = false): TreeNode = {
val buildInputFieldList: List[Field] = buildInputAttributes.toList.map(attr => {
Field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class HashRelationKernel::Impl {
std::dynamic_pointer_cast<gandiva::LiteralNode>(parameter_nodes[0])->holder());
builder_type_ = std::stoi(builder_type_str);
}
if (builder_type_ == 3) {
// This is for using unsafeHashMap while with skipDuplication strategy
semi_ = true;
builder_type_ = 1;
}
if (builder_type_ == 0) {
// builder_type_ == 0 will be abandoned in near future, won't support
// decimal here.
Expand Down Expand Up @@ -227,11 +232,11 @@ class HashRelationKernel::Impl {
PROCESS(arrow::Decimal128Type)
if (project_outputs.size() == 1) {
switch (project_outputs[0]->type_id()) {
#define PROCESS(InType) \
case TypeTraits<InType>::type_id: { \
using ArrayType = precompile::TypeTraits<InType>::ArrayType; \
auto typed_key_arr = std::make_shared<ArrayType>(project_outputs[0]); \
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr)); \
#define PROCESS(InType) \
case TypeTraits<InType>::type_id: { \
using ArrayType = precompile::TypeTraits<InType>::ArrayType; \
auto typed_key_arr = std::make_shared<ArrayType>(project_outputs[0]); \
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr, semi_)); \
} break;
PROCESS_SUPPORTED_TYPES(PROCESS)
#undef PROCESS
Expand All @@ -252,7 +257,7 @@ class HashRelationKernel::Impl {
RETURN_NOT_OK(MakeUnsafeArray(arr->type(), i++, arr, &payload));
payloads.push_back(payload);
}
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads));
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads, semi_));
}
}
}
Expand Down Expand Up @@ -281,6 +286,7 @@ class HashRelationKernel::Impl {
std::vector<std::shared_ptr<arrow::Array>> key_hash_cached_;
uint64_t num_total_cached_ = 0;
int builder_type_ = 0;
bool semi_ = false;
int key_size_ = -1; // If key_size_ != 0, key will be stored directly in key_map

class HashRelationResultIterator : public ResultIterator<HashRelation> {
Expand Down
68 changes: 58 additions & 10 deletions native-sql-engine/cpp/src/codegen/common/hash_relation.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ class HashRelation {
return arrow::Status::Invalid("Error minimizing hash table");
}

arrow::Status AppendKeyColumn(
std::shared_ptr<arrow::Array> in,
const std::vector<std::shared_ptr<UnsafeArray>>& payloads) {
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
const std::vector<std::shared_ptr<UnsafeArray>>& payloads,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -184,7 +184,11 @@ class HashRelation {
// chendi: Since spark won't join rows contain null, we will skip null
// row.
if (payload->isNullExists()) continue;
RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), payload, num_arrays_, i));
}
}

num_arrays_++;
Expand All @@ -196,7 +200,8 @@ class HashRelation {
typename std::enable_if_t<!std::is_same<KeyArrayType, StringArray>::value>* =
nullptr>
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<KeyArrayType> original_key) {
std::shared_ptr<KeyArrayType> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -212,8 +217,13 @@ class HashRelation {
if (original_key->IsNull(i)) {
RETURN_NOT_OK(InsertNull(num_arrays_, i));
} else {
RETURN_NOT_OK(
Insert(typed_array->GetView(i), original_key->GetView(i), num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(Insert(typed_array->GetView(i), original_key->GetView(i),
num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), original_key->GetView(i),
num_arrays_, i));
}
}
}
}
Expand All @@ -224,7 +234,8 @@ class HashRelation {
}

arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<StringArray> original_key) {
std::shared_ptr<StringArray> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -242,8 +253,13 @@ class HashRelation {
RETURN_NOT_OK(InsertNull(num_arrays_, i));
} else {
auto str = original_key->GetString(i);
RETURN_NOT_OK(
Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(
Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), str.data(), str.size(),
num_arrays_, i));
}
}
}
}
Expand Down Expand Up @@ -462,6 +478,38 @@ class HashRelation {
return arrow::Status::OK();
}

arrow::Status InsertSkipDup(int32_t v, std::shared_ptr<UnsafeRow> payload,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload.get(), v, (char*)&index,
sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

template <typename CType>
arrow::Status InsertSkipDup(int32_t v, CType payload, uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

arrow::Status InsertSkipDup(int32_t v, const char* payload, size_t payload_len,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, payload_len, v, (char*)&index,
sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

arrow::Status InsertNull(uint32_t array_id, uint32_t id) {
// since vanilla spark doesn't support match null in join
// we can directly retun to optimize
Expand Down
Loading

0 comments on commit fc16976

Please sign in to comment.