Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746) #6754

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include <Columns/ColumnAggregateFunction.h>
<<<<<<< HEAD
#include <AggregateFunctions/AggregateFunctionState.h>
=======
#include <Columns/ColumnsCommon.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))
#include <DataStreams/ColumnGathererStream.h>
#include <IO/WriteBufferFromArena.h>
#include <Common/SipHash.h>
Expand Down Expand Up @@ -126,7 +133,11 @@ ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_
auto & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filter);
res_data.reserve(result_size_hint);
}

for (size_t i = 0; i < size; ++i)
if (filter[i])
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
/// TODO: If result_size_hint < 0, makes reserve() using size of filtered column, not source column to avoid some OOM issues.
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter & filt, ssize_t result_
Container & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res_data.reserve(result_size_hint);
}

const UInt8 * filt_pos = filt.data();
const UInt8 * filt_end = filt_pos + size;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include <Columns/ColumnFixedString.h>
<<<<<<< HEAD

=======
#include <Columns/ColumnsCommon.h>
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))
#include <Common/Arena.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>
Expand Down Expand Up @@ -170,7 +174,11 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
auto res = ColumnFixedString::create(n);

if (result_size_hint)
res->chars.reserve(result_size_hint > 0 ? result_size_hint * n : chars.size());
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res->chars.reserve(result_size_hint * n);
}

const UInt8 * filt_pos = &filt[0];
const UInt8 * filt_end = filt_pos + col_size;
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#include <cstring>
#include <cmath>

<<<<<<< HEAD
=======
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsCommon.h>
#include <Common/Arena.h>
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))
#include <Common/Exception.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
Expand Down Expand Up @@ -177,7 +183,11 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
Container & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res_data.reserve(result_size_hint);
}

const UInt8 * filt_pos = &filt[0];
const UInt8 * filt_end = filt_pos + size;
Expand Down
54 changes: 54 additions & 0 deletions dbms/src/Columns/ColumnsCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,26 @@ namespace
/// Implementation details of filterArraysImpl function, used as template parameter.
/// Allow to build or not to build offsets array.

<<<<<<< HEAD
struct ResultOffsetsBuilder
{
IColumn::Offsets & res_offsets;
IColumn::Offset current_src_offset = 0;
=======
struct ResultOffsetsBuilder
{
IColumn::Offsets & res_offsets;
IColumn::Offset current_src_offset = 0;

explicit ResultOffsetsBuilder(IColumn::Offsets * res_offsets_)
: res_offsets(*res_offsets_)
{}

void reserve(size_t result_size_hint)
{
res_offsets.reserve(result_size_hint);
}
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))

explicit ResultOffsetsBuilder(IColumn::Offsets * res_offsets_) : res_offsets(*res_offsets_) {}

Expand All @@ -118,11 +134,19 @@ namespace
res_offsets.reserve(result_size_hint > 0 ? result_size_hint : src_size);
}

<<<<<<< HEAD
void insertOne(size_t array_size)
{
current_src_offset += array_size;
res_offsets.push_back(current_src_offset);
}
=======
struct NoResultOffsetsBuilder
{
explicit NoResultOffsetsBuilder(IColumn::Offsets *) {}
void reserve(size_t) {}
void insertOne(size_t) {}
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))

template <size_t SIMD_BYTES>
void insertChunk(
Expand All @@ -144,13 +168,43 @@ namespace
{
const auto res_offsets_pos = &res_offsets[offsets_size_old];

<<<<<<< HEAD
/// adjust offsets
for (size_t i = 0; i < SIMD_BYTES; ++i)
res_offsets_pos[i] -= diff_offset;
}
}
current_src_offset += chunk_size;
}
=======
if (result_size_hint)
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);

result_offsets_builder.reserve(result_size_hint);

if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Avoid overflow.
res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size);
}

const UInt8 * filt_pos = filt.data();
const auto * filt_end = filt_pos + size;

const auto * offsets_pos = src_offsets.data();
const auto * offsets_begin = offsets_pos;

/// copy array ending at *end_offset_ptr
const auto copy_array = [&](const IColumn::Offset * offset_ptr) {
const auto arr_offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1];
const auto arr_size = *offset_ptr - arr_offset;

result_offsets_builder.insertOne(arr_size);

const auto elems_size_old = res_elems.size();
res_elems.resize(elems_size_old + arr_size);
inline_memcpy(&res_elems[elems_size_old], &src_elems[arr_offset], arr_size * sizeof(T));
>>>>>>> dba70f9da8 (Use size of filtered column to reserve memory for filter when result_size_hint < 0 (#6746))
};

struct NoResultOffsetsBuilder
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class IColumn : public COWPtr<IColumn>
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column;
* if 0, then don't makes reserve(),
* otherwise (i.e. < 0), makes reserve() using size of source column.
* otherwise (i.e. < 0), makes reserve() using size of filtered column.
*/
using Filter = PaddedPODArray<UInt8>;
virtual Ptr filter(const Filter & filt, ssize_t result_size_hint) const = 0;
Expand Down