Skip to content

Commit

Permalink
Fine grained partition writer optimization (#6173)
Browse files Browse the repository at this point in the history
close #6157
  • Loading branch information
yibin87 authored Oct 31, 2022
1 parent e57a606 commit c73daf6
Show file tree
Hide file tree
Showing 35 changed files with 591 additions and 73 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ MutableColumns ColumnAggregateFunction::scatter(IColumn::ColumnIndex num_columns
return columns;
}

void ColumnAggregateFunction::scatterTo(ScatterColumns & columns [[maybe_unused]], const Selector & selector [[maybe_unused]]) const
{
throw TiFlashException("ColumnAggregateFunction does not support scatterTo", Errors::Coprocessor::Unimplemented);
}

void ColumnAggregateFunction::getPermutation(bool /*reverse*/, size_t /*limit*/, int /*nan_direction_hint*/, IColumn::Permutation & res) const
{
size_t s = getData().size();
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega

MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;

void scatterTo(ScatterColumns & columns, const Selector & selector) const override;

void gather(ColumnGathererStream & gatherer_stream) override;

int compareAt(size_t, size_t, const IColumn &, int) const override
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
{
return scatterImpl<ColumnArray>(num_columns, selector);
}

void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
scatterToImpl<ColumnArray>(columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;

void forEachSubcolumn(ColumnCallback callback) override
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Columns/ColumnConst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector & se
return res;
}

void ColumnConst::scatterTo(ScatterColumns & columns, const Selector & selector) const
{
if (s != selector.size())
throw Exception(
fmt::format("Size of selector ({}) doesn't match size of column ({})", selector.size(), s),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);

ColumnIndex num_columns = columns.size();
std::vector<size_t> counts = countColumnsSizeInSelector(num_columns, selector);

for (size_t i = 0; i < num_columns; ++i)
columns[i]->insertRangeFrom(*this, 0, counts[i]);
}

void ColumnConst::getPermutation(bool /*reverse*/, size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const
{
res.resize(s);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>

MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;

void scatterTo(ScatterColumns & columns, const Selector & selector) const override;
void gather(ColumnGathererStream &) override
{
throw Exception("Cannot gather into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecima
return this->template scatterImpl<Self>(num_columns, selector);
}

void scatterTo(IColumn::ScatterColumns & columns, const IColumn::Selector & selector) const override
{
return this->template scatterToImpl<Self>(columns, selector);
}

void gather(ColumnGathererStream & gatherer_stream) override;

//bool structureEquals(const IColumn & rhs) const override
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnFixedString.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class ColumnFixedString final : public COWPtrHelper<IColumn, ColumnFixedString>
return scatterImpl<ColumnFixedString>(num_columns, selector);
}

void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
scatterToImpl<ColumnFixedString>(columns, selector);
}

void gather(ColumnGathererStream & gatherer_stream) override;

void reserve(size_t size) override
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ std::vector<MutableColumnPtr> ColumnFunction::scatter(
return columns;
}

void ColumnFunction::scatterTo(ScatterColumns & columns [[maybe_unused]], const Selector & selector [[maybe_unused]]) const
{
throw TiFlashException("ColumnFunction does not support scatterTo", Errors::Coprocessor::Unimplemented);
}

void ColumnFunction::insertDefault()
{
for (auto & column : captured_columns)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Columns/ColumnFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ class ColumnFunction final : public COWPtrHelper<IColumn, ColumnFunction>
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
void insertDefault() override;
void popBack(size_t n) override;
std::vector<MutableColumnPtr> scatter(
ScatterColumns scatter(
IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector) const override;
void scatterTo(ScatterColumns & columns, const Selector & selector) const override;

void getExtremes(Field &, Field &) const override {}

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Columns/ColumnNothing.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <Columns/IColumnDummy.h>


namespace DB
{
class ColumnNothing final : public COWPtrHelper<IColumnDummy, ColumnNothing>
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
return scatterImpl<ColumnNullable>(num_columns, selector);
}

void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
scatterToImpl<ColumnNullable>(columns, selector);
}

void gather(ColumnGathererStream & gatherer_stream) override;

void forEachSubcolumn(ColumnCallback callback) override
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
return scatterImpl<ColumnString>(num_columns, selector);
}

void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
scatterToImpl<ColumnString>(columns, selector);
}

void gather(ColumnGathererStream & gatherer_stream) override;

void reserve(size_t n) override;
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Columns/ColumnTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ MutableColumns ColumnTuple::scatter(ColumnIndex num_columns, const Selector & se
return res;
}

void ColumnTuple::scatterTo(ScatterColumns & scatterColumns, const Selector & selector) const
{
const size_t tuple_size = columns.size();
ColumnIndex scattered_num_columns = scatterColumns.size();
std::vector<MutableColumns> scattered_tuple_elements(tuple_size);
for (size_t tuple_element_idx = 0; tuple_element_idx < tuple_size; ++tuple_element_idx)
{
for (size_t scatter_idx = 0; scatter_idx < scattered_num_columns; ++scatter_idx)
{
auto col = static_cast<ColumnTuple &>(scatterColumns[scatter_idx]->assumeMutableRef()).columns[tuple_element_idx]->assumeMutable();
scattered_tuple_elements[tuple_element_idx].push_back(std::move(col));
}
columns[tuple_element_idx]->scatterTo(scattered_tuple_elements[tuple_element_idx], selector);
}

for (size_t scattered_idx = 0; scattered_idx < scattered_num_columns; ++scattered_idx)
{
MutableColumns new_columns(tuple_size);
for (size_t tuple_element_idx = 0; tuple_element_idx < tuple_size; ++tuple_element_idx)
new_columns[tuple_element_idx] = std::move(scattered_tuple_elements[tuple_element_idx][scattered_idx]);
scatterColumns[scattered_idx] = ColumnTuple::create(std::move(new_columns));
}
}

int ColumnTuple::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const
{
const size_t tuple_size = columns.size();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnTuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ColumnTuple final : public COWPtrHelper<IColumn, ColumnTuple>
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void scatterTo(ScatterColumns & scatterColumns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void getExtremes(Field & min, Field & max) const override;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Columns/ColumnVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ class ColumnVector final : public COWPtrHelper<ColumnVectorHelper, ColumnVector<
return this->template scatterImpl<Self>(num_columns, selector);
}

void scatterTo(IColumn::ScatterColumns & columns, const IColumn::Selector & selector) const override
{
this->template scatterToImpl<Self>(columns, selector);
}

void gather(ColumnGathererStream & gatherer_stream) override;

bool canBeInsideNullable() const override { return true; }
Expand Down
48 changes: 35 additions & 13 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,25 @@ class IColumn : public COWPtr<IColumn>
* For default implementation, see scatterImpl.
*/
using ColumnIndex = UInt64;
using ScatterColumns = std::vector<MutablePtr>;
using Selector = PaddedPODArray<ColumnIndex>;
virtual std::vector<MutablePtr> scatter(ColumnIndex num_columns, const Selector & selector) const = 0;
virtual ScatterColumns scatter(ColumnIndex num_columns, const Selector & selector) const = 0;

void initializeScatterColumns(ScatterColumns & columns, ColumnIndex num_columns, size_t num_rows) const
{
columns.reserve(num_columns);
for (ColumnIndex i = 0; i < num_columns; ++i)
columns.emplace_back(cloneEmpty());

size_t reserve_size = num_rows * 1.1 / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.

if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}

/// Different from scatter, scatterTo appends the scattered data to 'columns' instead of creating ScatterColumns
virtual void scatterTo(ScatterColumns & columns, const Selector & selector) const = 0;

/// Insert data from several other columns according to source mask (used in vertical merge).
/// For now it is a helper to de-virtualize calls to insert*() functions inside gather loop
Expand Down Expand Up @@ -388,7 +405,7 @@ class IColumn : public COWPtr<IColumn>
String dumpStructure() const;

protected:
/// Template is to devirtualize calls to insertFrom method.
/// Template is to de-virtualize calls to insertFrom method.
/// In derived classes (that use final keyword), implement scatter method as call to scatterImpl.
template <typename Derived>
std::vector<MutablePtr> scatterImpl(ColumnIndex num_columns, const Selector & selector) const
Expand All @@ -400,23 +417,28 @@ class IColumn : public COWPtr<IColumn>
fmt::format("Size of selector: {} doesn't match size of column: {}", selector.size(), num_rows),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);

std::vector<MutablePtr> columns(num_columns);
for (auto & column : columns)
column = cloneEmpty();

{
size_t reserve_size = num_rows * 1.1 / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.

if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
ScatterColumns columns;
initializeScatterColumns(columns, num_columns, num_rows);

for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);

return columns;
}

template <typename Derived>
void scatterToImpl(ScatterColumns & columns, const Selector & selector) const
{
size_t num_rows = size();

if (num_rows != selector.size())
throw Exception(
fmt::format("Size of selector: {} doesn't match size of column: {}", selector.size(), num_rows),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);

for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
}
};

using ColumnPtr = IColumn::Ptr;
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Columns/IColumnDummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ class IColumnDummy : public IColumn
return res;
}

void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
if (s != selector.size())
throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);

IColumn::ColumnIndex num_columns = columns.size();
std::vector<size_t> counts(num_columns);
for (auto idx : selector)
++counts[idx];

for (size_t i = 0; i < num_columns; ++i)
columns[i]->insertRangeFrom(*this, 0, counts[i]);
}

void gather(ColumnGathererStream &) override
{
throw Exception("Method gather is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
Expand Down
Loading

0 comments on commit c73daf6

Please sign in to comment.