Skip to content

Commit

Permalink
Merge branch 'master' into feature/add_module_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored May 30, 2022
2 parents 9ac0e19 + 7dff463 commit 0161857
Show file tree
Hide file tree
Showing 103 changed files with 3,139 additions and 1,703 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
37 changes: 20 additions & 17 deletions dbms/src/Columns/ColumnNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <DataStreams/ColumnGathererStream.h>
#include <fmt/core.h>


namespace DB
Expand All @@ -41,10 +42,10 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
nested_column = nested_column_materialized;

if (!getNestedColumn().canBeInsideNullable())
throw Exception{getNestedColumn().getName() + " cannot be inside Nullable column", ErrorCodes::ILLEGAL_COLUMN};
throw Exception(fmt::format("{} cannot be inside Nullable column", getNestedColumn().getName()), ErrorCodes::ILLEGAL_COLUMN);

if (null_map->isColumnConst())
throw Exception{"ColumnNullable cannot have constant null map", ErrorCodes::ILLEGAL_COLUMN};
throw Exception("ColumnNullable cannot have constant null map", ErrorCodes::ILLEGAL_COLUMN);
}


Expand Down Expand Up @@ -106,7 +107,7 @@ void ColumnNullable::updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollato
auto s = size();

if (hash.getData().size() != s)
throw Exception("Size of WeakHash32 does not match size of column: column size is " + std::to_string(s) + ", hash size is " + std::to_string(hash.getData().size()), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Size of WeakHash32 does not match size of column: column size is {}, hash size is {}", s, hash.getData().size()), ErrorCodes::LOGICAL_ERROR);

WeakHash32 old_hash = hash;
nested_column->updateWeakHash32(hash, collator, sort_key_container);
Expand Down Expand Up @@ -158,12 +159,12 @@ void ColumnNullable::get(size_t n, Field & res) const

StringRef ColumnNullable::getDataAt(size_t /*n*/) const
{
throw Exception{"Method getDataAt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED};
throw Exception(fmt::format("Method getDataAt is not supported for {}", getName()), ErrorCodes::NOT_IMPLEMENTED);
}

void ColumnNullable::insertData(const char * /*pos*/, size_t /*length*/)
{
throw Exception{"Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED};
throw Exception(fmt::format("Method insertData is not supported for {}", getName()), ErrorCodes::NOT_IMPLEMENTED);
}

bool ColumnNullable::decodeTiDBRowV2Datum(size_t cursor, const String & raw_value, size_t length, bool force_decode)
Expand Down Expand Up @@ -212,7 +213,7 @@ const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos, con

void ColumnNullable::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(src);
const auto & nullable_col = static_cast<const ColumnNullable &>(src);
getNullMapColumn().insertRangeFrom(*nullable_col.null_map, start, length);
getNestedColumn().insertRangeFrom(*nullable_col.nested_column, start, length);
}
Expand All @@ -233,7 +234,7 @@ void ColumnNullable::insert(const Field & x)

void ColumnNullable::insertFrom(const IColumn & src, size_t n)
{
const ColumnNullable & src_concrete = static_cast<const ColumnNullable &>(src);
const auto & src_concrete = static_cast<const ColumnNullable &>(src);
getNestedColumn().insertFrom(src_concrete.getNestedColumn(), n);
getNullMapData().push_back(src_concrete.getNullMapData()[n]);
}
Expand Down Expand Up @@ -285,40 +286,40 @@ std::tuple<bool, int> ColumnNullable::compareAtCheckNull(size_t n, size_t m, con
return std::make_tuple(has_null, res);
}

int ColumnNullable::compareAtWithCollation(
int ColumnNullable::compareAt(
size_t n,
size_t m,
const IColumn & rhs_,
int null_direction_hint,
const ICollator & collator) const
{
const ColumnNullable & nullable_rhs = static_cast<const ColumnNullable &>(rhs_);
const auto & nullable_rhs = static_cast<const ColumnNullable &>(rhs_);
auto [has_null, res] = compareAtCheckNull(n, m, nullable_rhs, null_direction_hint);
if (has_null)
return res;
const IColumn & nested_rhs = nullable_rhs.getNestedColumn();
return getNestedColumn().compareAtWithCollation(n, m, nested_rhs, null_direction_hint, collator);
return getNestedColumn().compareAt(n, m, nested_rhs, null_direction_hint, collator);
}

int ColumnNullable::compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const
{
const ColumnNullable & nullable_rhs = static_cast<const ColumnNullable &>(rhs_);
const auto & nullable_rhs = static_cast<const ColumnNullable &>(rhs_);
auto [has_null, res] = compareAtCheckNull(n, m, nullable_rhs, null_direction_hint);
if (has_null)
return res;
const IColumn & nested_rhs = nullable_rhs.getNestedColumn();
return getNestedColumn().compareAt(n, m, nested_rhs, null_direction_hint);
}

void ColumnNullable::getPermutationWithCollation(
void ColumnNullable::getPermutation(
const ICollator & collator,
bool reverse,
size_t limit,
int null_direction_hint,
DB::IColumn::Permutation & res) const
{
/// Cannot pass limit because of unknown amount of NULLs.
getNestedColumn().getPermutationWithCollation(collator, reverse, 0, null_direction_hint, res);
getNestedColumn().getPermutation(collator, reverse, 0, null_direction_hint, res);
adjustPermutationWithNullDirection(reverse, limit, null_direction_hint, res);
}

Expand Down Expand Up @@ -538,7 +539,7 @@ void ColumnNullable::applyNullMapImpl(const ColumnUInt8 & map)
const NullMap & arr2 = map.getData();

if (arr1.size() != arr2.size())
throw Exception{"Inconsistent sizes of ColumnNullable objects", ErrorCodes::LOGICAL_ERROR};
throw Exception("Inconsistent sizes of ColumnNullable objects", ErrorCodes::LOGICAL_ERROR);

for (size_t i = 0, size = arr1.size(); i < size; ++i)
arr1[i] |= negative ^ arr2[i];
Expand All @@ -565,9 +566,11 @@ void ColumnNullable::applyNullMap(const ColumnNullable & other)
void ColumnNullable::checkConsistency() const
{
if (null_map->size() != getNestedColumn().size())
throw Exception("Logical error: Sizes of nested column and null map of Nullable column are not equal: null size is : "
+ std::to_string(null_map->size()) + " column size is : " + std::to_string(getNestedColumn().size()),
ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT);
throw Exception(
fmt::format("Logical error: Sizes of nested column and null map of Nullable column are not equal: null size is : {} column size is : {}",
null_map->size(),
getNestedColumn().size()),
ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT);
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
std::tuple<bool, int> compareAtCheckNull(size_t n, size_t m, const ColumnNullable & rhs, int null_direction_hint) const;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint, const ICollator & collator) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint, const ICollator & collator) const override;
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void getPermutationWithCollation(const ICollator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void getPermutation(const ICollator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void adjustPermutationWithNullDirection(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const;
void reserve(size_t n) override;
size_t byteSize() const override;
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Columns/ColumnsCommon.h>
#include <Common/HashTable/Hash.h>
#include <DataStreams/ColumnGathererStream.h>
#include <fmt/core.h>

/// Used in the `reserve` method, when the number of rows is known, but sizes of elements are not.
#define APPROX_STRING_SIZE 64
Expand Down Expand Up @@ -80,7 +81,7 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len
if (length == 0)
return;

const ColumnString & src_concrete = static_cast<const ColumnString &>(src);
const auto & src_concrete = static_cast<const ColumnString &>(src);

if (start + length > src_concrete.offsets.size())
throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.",
Expand Down Expand Up @@ -310,7 +311,7 @@ void ColumnString::getExtremes(Field & min, Field & max) const

int ColumnString::compareAtWithCollationImpl(size_t n, size_t m, const IColumn & rhs_, const ICollator & collator) const
{
const ColumnString & rhs = static_cast<const ColumnString &>(rhs_);
const auto & rhs = static_cast<const ColumnString &>(rhs_);

return collator.compare(
reinterpret_cast<const char *>(&chars[offsetAt(n)]),
Expand Down Expand Up @@ -374,7 +375,7 @@ void ColumnString::updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorP
auto s = offsets.size();

if (hash.getData().size() != s)
throw Exception("Size of WeakHash32 does not match size of column: column size is " + std::to_string(s) + ", hash size is " + std::to_string(hash.getData().size()), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Size of WeakHash32 does not match size of column: column size is {}, hash size is {}", s, hash.getData().size()), ErrorCodes::LOGICAL_ERROR);

const UInt8 * pos = chars.data();
UInt32 * hash_data = hash.getData().data();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
return size > rhs_size ? 1 : (size < rhs_size ? -1 : 0);
}

int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_, int, const ICollator & collator) const override
int compareAt(size_t n, size_t m, const IColumn & rhs_, int, const ICollator & collator) const override
{
return compareAtWithCollationImpl(n, m, rhs_, collator);
}
Expand All @@ -324,7 +324,7 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>

void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;

void getPermutationWithCollation(const ICollator & collator, bool reverse, size_t limit, int, Permutation & res) const override
void getPermutation(const ICollator & collator, bool reverse, size_t limit, int, Permutation & res) const override
{
getPermutationWithCollationImpl(collator, reverse, limit, res);
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ class IColumn : public COWPtr<IColumn>
*/
virtual int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const = 0;

virtual int compareAtWithCollation(size_t, size_t, const IColumn &, int, const ICollator &) const
virtual int compareAt(size_t, size_t, const IColumn &, int, const ICollator &) const
{
throw Exception("Method compareAtWithCollation is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(fmt::format("Method compareAt with collation is not supported for {}" + getName()), ErrorCodes::NOT_IMPLEMENTED);
}

/** Returns a permutation that sorts elements of this column,
Expand All @@ -252,9 +252,9 @@ class IColumn : public COWPtr<IColumn>
*/
virtual void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const = 0;

virtual void getPermutationWithCollation(const ICollator &, bool, size_t, int, Permutation &) const
virtual void getPermutation(const ICollator &, bool, size_t, int, Permutation &) const
{
throw Exception("Method getPermutationWithCollation is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(fmt::format("Method getPermutation with collation is not supported for {}", getName()), ErrorCodes::NOT_IMPLEMENTED);
}

/** Copies each element according offsets parameter.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Core/SortCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ struct SortCursorWithCollation
int nulls_direction = impl->desc[i].nulls_direction;
int res;
if (impl->need_collation[i])
res = impl->sort_columns[i]->compareAtWithCollation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction, *impl->desc[i].collator);
res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction, *impl->desc[i].collator);
else
res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);

Expand All @@ -241,7 +241,7 @@ struct SortCursorWithCollation
int res;
if (impl->need_collation[i])
{
res = impl->sort_columns[i]->compareAtWithCollation(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction, *impl->desc[i].collator);
res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction, *impl->desc[i].collator);
}
else
res = impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ FilterBlockInputStream::FilterBlockInputStream(

Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.


#include <Common/FmtUtils.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
namespace DB
{
Expand All @@ -25,4 +26,26 @@ Block HashJoinBuildBlockInputStream::readImpl()
return block;
}

void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
static const std::unordered_map<ASTTableJoin::Kind, String> join_type_map{
{ASTTableJoin::Kind::Inner, "Inner"},
{ASTTableJoin::Kind::Left, "Left"},
{ASTTableJoin::Kind::Right, "Right"},
{ASTTableJoin::Kind::Full, "Full"},
{ASTTableJoin::Kind::Cross, "Cross"},
{ASTTableJoin::Kind::Comma, "Comma"},
{ASTTableJoin::Kind::Anti, "Anti"},
{ASTTableJoin::Kind::LeftSemi, "Left_Semi"},
{ASTTableJoin::Kind::LeftAnti, "Left_Anti"},
{ASTTableJoin::Kind::Cross_Left, "Cross_Left"},
{ASTTableJoin::Kind::Cross_Right, "Cross_Right"},
{ASTTableJoin::Kind::Cross_Anti, "Cross_Anti"},
{ASTTableJoin::Kind::Cross_LeftSemi, "Cross_LeftSemi"},
{ASTTableJoin::Kind::Cross_LeftAnti, "Cross_LeftAnti"}};
auto join_type_it = join_type_map.find(join->getKind());
if (join_type_it == join_type_map.end())
throw TiFlashException("Unknown join type", Errors::Coprocessor::Internal);
buffer.fmtAppend(", join_kind = {}", join_type_it->second);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
JoinPtr join;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/IBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
return res + 1;
}


void IBlockInputStream::dumpTree(FmtBuffer & buffer, size_t indent, size_t multiplier)
{
// todo append getHeader().dumpStructure()
buffer.fmtAppend(
"{}{}{}\n",
"{}{}{}",
String(indent, ' '),
getName(),
multiplier > 1 ? fmt::format(" x {}", multiplier) : "");
if (!extra_info.empty())
buffer.fmtAppend(": <{}>", extra_info);
appendInfo(buffer);
buffer.append("\n");
++indent;

/// If the subtree is repeated several times, then we output it once with the multiplier.
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class IBlockInputStream : private boost::noncopyable
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

void setExtraInfo(String info) { extra_info = info; }

template <typename F>
void forEachChild(F && f)
Expand Down Expand Up @@ -176,6 +177,8 @@ class IBlockInputStream : private boost::noncopyable
}
}

virtual void appendInfo(FmtBuffer & /*buffer*/) const {};

protected:
BlockInputStreams children;
mutable std::shared_mutex children_mutex;
Expand All @@ -188,6 +191,9 @@ class IBlockInputStream : private boost::noncopyable
mutable std::mutex tree_id_mutex;
mutable String tree_id;

/// The info that hints why the inputStream is needed to run.
String extra_info;

/// Get text with names of this source and the entire subtree, this function should only be called after the
/// InputStream tree is constructed.
String getTreeID() const;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ Block LimitBlockInputStream::readImpl()
return res;
}

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
size_t limit;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
return blocks[0].cloneWithColumns(std::move(merged_columns));
}

void MergeSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
SortDescription description;
Expand Down
Loading

0 comments on commit 0161857

Please sign in to comment.