Skip to content

Commit

Permalink
[fix](map) the implementation of ColumnMap::replicate was incorrect (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg authored Nov 13, 2023
1 parent 87862f9 commit 4d9a8f8
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 35 deletions.
51 changes: 22 additions & 29 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,42 +833,35 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons
return replicate_generic(replicate_offsets);
}

void ColumnArray::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
void ColumnArray::replicate(const uint32_t* indices, size_t target_size, IColumn& column) const {
if (target_size == 0) {
return;
}
auto total_size = get_offsets().size();
// |---------------------|-------------------------|-------------------------|
// [0, begin) [begin, begin + count_sz) [begin + count_sz, size())
// do not need to copy copy counts[n] times do not need to copy
IColumn::Offsets replicate_offsets(total_size, 0);
// copy original data at offset n counts[n] times
auto begin = 0, end = 0;
while (begin < target_size) {
while (end < target_size && indexs[begin] == indexs[end]) {
end++;
}
long index = indexs[begin];
replicate_offsets[index] = end - begin;
begin = end;
}

// ignored
for (size_t i = 1; i < total_size; ++i) {
replicate_offsets[i] += replicate_offsets[i - 1];
}
auto& dst_col = assert_cast<ColumnArray&>(column);
auto& dst_data_col = dst_col.get_data();
auto& dst_offsets = dst_col.get_offsets();
dst_offsets.reserve(target_size);

auto rep_res = replicate(replicate_offsets);
if (!rep_res) {
LOG(WARNING) << "ColumnArray replicate failed, replicate_offsets count="
<< replicate_offsets.size() << ", max=" << replicate_offsets.back();
return;
PODArray<uint32> data_indices_to_replicate;

for (size_t i = 0; i < target_size; ++i) {
const auto index = indices[i];
const auto start = offset_at(index);
const auto length = size_at(index);
dst_offsets.push_back(dst_offsets.back() + length);
if (UNLIKELY(length == 0)) {
continue;
}

data_indices_to_replicate.reserve(data_indices_to_replicate.size() + length);
for (size_t j = start; j != start + length; ++j) {
data_indices_to_replicate.push_back(j);
}
}
auto& rep_res_arr = typeid_cast<const ColumnArray&>(*rep_res);

ColumnArray& res_arr = typeid_cast<ColumnArray&>(column);
res_arr.data = rep_res_arr.get_data_ptr();
res_arr.offsets = rep_res_arr.get_offsets_ptr();
get_data().replicate(data_indices_to_replicate.data(), data_indices_to_replicate.size(),
dst_data_col);
}

template <typename T>
Expand Down
22 changes: 16 additions & 6 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,24 @@ ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
return res;
}

void ColumnMap::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const {
void ColumnMap::replicate(const uint32_t* indices, size_t target_size, IColumn& column) const {
auto& res = reinterpret_cast<ColumnMap&>(column);

// Make a temp column array for reusing its replicate function
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->replicate(indexs, target_size, res.keys_column->assume_mutable_ref());
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->replicate(indexs, target_size, res.values_column->assume_mutable_ref());
auto keys_array =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable());

auto result_array = ColumnArray::create(res.keys_column->assume_mutable(),
res.offsets_column->assume_mutable());
keys_array->replicate(indices, target_size, result_array->assume_mutable_ref());

result_array = ColumnArray::create(res.values_column->assume_mutable(),
res.offsets_column->clone_empty());

auto values_array =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable());

/// FIXME: To reuse the replicate of ColumnArray, the offsets column was replicated twice
values_array->replicate(indices, target_size, result_array->assume_mutable_ref());
}

MutableColumnPtr ColumnMap::get_shrinked_column() {
Expand Down
16 changes: 16 additions & 0 deletions regression-test/data/datatype_p0/complex_types/test_map.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 {"key1":"value1"} 1 1
1 {"key1_1":"value1_1"} 1 1
1 {"key1":"value1"} 2 1
1 {"key1_1":"value1_1"} 2 1
2 {"key2":"value2", "key22":"value22"} 3 2
2 {"key2_1":"value2_1", "key22_1":"value22_1"} 3 2
2 {"key2_2":"value2_2", "key22_2":"value22_2"} 3 2
2 {"key2":"value2", "key22":"value22"} 4 2
2 {"key2_1":"value2_1", "key22_1":"value22_1"} 4 2
2 {"key2_2":"value2_2", "key22_2":"value22_2"} 4 2
3 {"key3":"value3", "key33":"value33", "key3333":"value333"} 5 3
3 {"key3":"value3", "key33":"value33", "key3333":"value333"} 6 3
4 {"key4":"value4", "key44":"value44", "key444":"value444", "key4444":"value4444"} \N \N

54 changes: 54 additions & 0 deletions regression-test/suites/datatype_p0/complex_types/test_map.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_map") {
sql 'set enable_nereids_planner=false'
sql "DROP TABLE IF EXISTS `test_map_table`"
sql """
create table `test_map_table` (
`k1` int,
`value` map<text, text>
) distributed by hash(`k1`) buckets 1 properties("replication_num" = "1");
"""

sql 'insert into `test_map_table` values (1, {"key1": "value1"});'
sql 'insert into `test_map_table` values (1, {"key1_1": "value1_1"});'
sql 'insert into `test_map_table` values (2, {"key2": "value2", "key22": "value22"});'
sql 'insert into `test_map_table` values (2, {"key2_1": "value2_1", "key22_1": "value22_1"});'
sql 'insert into `test_map_table` values (2, {"key2_2": "value2_2", "key22_2": "value22_2"});'
sql 'insert into `test_map_table` values (3, {"key3": "value3", "key33": "value33", "key3333": "value333"});'
sql 'insert into `test_map_table` values (4, {"key4": "value4", "key44": "value44", "key444": "value444", "key4444": "value4444"});'

sql "DROP TABLE IF EXISTS `test_map_table_right`"
sql """
create table `test_map_table_right` (
`id` int,
`value` int
) distributed by hash(`id`) buckets 1 properties("replication_num" = "1");
"""

sql 'insert into `test_map_table_right` values(1, 1);'
sql 'insert into `test_map_table_right` values(2, 1);'
sql 'insert into `test_map_table_right` values(3, 2);'
sql 'insert into `test_map_table_right` values(4, 2);'
sql 'insert into `test_map_table_right` values(5, 3);'
sql 'insert into `test_map_table_right` values(6, 3);'

qt_sql """
select * from test_map_table left join test_map_table_right on test_map_table.k1 = test_map_table_right.value order by 1,3;
"""
}

0 comments on commit 4d9a8f8

Please sign in to comment.