Skip to content

Commit

Permalink
Merge branch 'master' into bundle-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 authored Jan 21, 2022
2 parents 9016536 + ac440bb commit 571a628
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 188 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void GRPCReceiverContext::fillSchema(DAGSchema & schema) const
{
String name = "exchange_receiver_" + std::to_string(i);
ColumnInfo info = TiDB::fieldTypeToColumnInfo(exchange_receiver_meta.field_types(i));
schema.push_back(std::make_pair(name, info));
schema.emplace_back(std::move(name), std::move(info));
}
}

Expand Down
36 changes: 23 additions & 13 deletions dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ struct MockReceiverContext
return "{Request}";
}

int source_index = 0;
int send_task_id = 0;
int recv_task_id = -1;
};

struct Reader
Expand Down Expand Up @@ -95,22 +97,31 @@ struct MockReceiverContext
PacketQueuePtr queue;
};

explicit MockReceiverContext(const std::vector<PacketQueuePtr> & queues_)
MockReceiverContext(
const std::vector<PacketQueuePtr> & queues_,
const std::vector<tipb::FieldType> & field_types_)
: queues(queues_)
, field_types(field_types_)
{
}

void fillSchema(DAGSchema & schema) const
{
schema.clear();
for (size_t i = 0; i < field_types.size(); ++i)
{
String name = "exchange_receiver_" + std::to_string(i);
ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]);
schema.emplace_back(std::move(name), std::move(info));
}
}

Request makeRequest(
int index [[maybe_unused]],
const tipb::ExchangeReceiver & pb_exchange_receiver [[maybe_unused]],
const ::mpp::TaskMeta & task_meta [[maybe_unused]]) const
Request makeRequest(int index) const
{
return {index};
return {index, index, -1};
}

std::shared_ptr<Reader> makeReader(
const Request & request,
const String & target_addr [[maybe_unused]])
std::shared_ptr<Reader> makeReader(const Request & request)
{
return std::make_shared<Reader>(queues[request.send_task_id]);
}
Expand All @@ -121,6 +132,7 @@ struct MockReceiverContext
}

std::vector<PacketQueuePtr> queues;
std::vector<tipb::FieldType> field_types;
};

using MockExchangeReceiver = ExchangeReceiverBase<MockReceiverContext>;
Expand Down Expand Up @@ -404,9 +416,8 @@ struct ReceiverHelper
MockExchangeReceiverPtr buildReceiver()
{
return std::make_shared<MockExchangeReceiver>(
std::make_shared<MockReceiverContext>(queues),
pb_exchange_receiver,
task_meta,
std::make_shared<MockReceiverContext>(queues, fields),
source_num,
source_num * 5,
nullptr);
}
Expand Down Expand Up @@ -495,7 +506,6 @@ struct SenderHelper
task_meta,
task_meta,
std::chrono::seconds(60),
[] { return false; },
concurrency,
false);
tunnel->connect(writer.get());
Expand Down
97 changes: 49 additions & 48 deletions dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteBuffer.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/RoughCheck.h>
Expand Down Expand Up @@ -53,8 +54,8 @@ void MinMaxIndex::addPack(const IColumn & column, const ColumnVector<UInt8> * de
{
const auto * del_mark_data = (!del_mark) ? nullptr : &(del_mark->getData());

auto & nullable_column = static_cast<const ColumnNullable &>(column);
auto & null_mark_data = nullable_column.getNullMapColumn().getData();
const auto & nullable_column = static_cast<const ColumnNullable &>(column);
const auto & null_mark_data = nullable_column.getNullMapColumn().getData();
column_ptr = &nullable_column.getNestedColumn();

for (size_t i = 0; i < size; ++i)
Expand Down Expand Up @@ -89,8 +90,8 @@ void MinMaxIndex::write(const IDataType & type, WriteBuffer & buf)
{
UInt64 size = has_null_marks->size();
DB::writeIntBinary(size, buf);
buf.write((char *)has_null_marks->data(), sizeof(UInt8) * size);
buf.write((char *)has_value_marks->data(), sizeof(UInt8) * size);
buf.write(reinterpret_cast<const char *>(has_null_marks->data()), sizeof(UInt8) * size);
buf.write(reinterpret_cast<const char *>(has_value_marks->data()), sizeof(UInt8) * size);
type.serializeBinaryBulkWithMultipleStreams(*minmaxes, //
[&](const IDataType::SubstreamPath &) { return &buf; },
0,
Expand All @@ -110,8 +111,8 @@ MinMaxIndexPtr MinMaxIndex::read(const IDataType & type, ReadBuffer & buf, size_
auto has_null_marks = std::make_shared<PaddedPODArray<UInt8>>(size);
auto has_value_marks = std::make_shared<PaddedPODArray<UInt8>>(size);
auto minmaxes = type.createColumn();
buf.read((char *)has_null_marks->data(), sizeof(UInt8) * size);
buf.read((char *)has_value_marks->data(), sizeof(UInt8) * size);
buf.read(reinterpret_cast<char *>(has_null_marks->data()), sizeof(UInt8) * size);
buf.read(reinterpret_cast<char *>(has_value_marks->data()), sizeof(UInt8) * size);
type.deserializeBinaryBulkWithMultipleStreams(*minmaxes, //
[&](const IDataType::SubstreamPath &) { return &buf; },
size * 2,
Expand Down Expand Up @@ -143,171 +144,171 @@ std::pair<UInt64, UInt64> MinMaxIndex::getUInt64MinMax(size_t pack_index)
return {minmaxes->get64(pack_index * 2), minmaxes->get64(pack_index * 2 + 1)};
}

RSResult MinMaxIndex::checkEqual(size_t pack_id, const Field & value, const DataTypePtr & type)
RSResult MinMaxIndex::checkEqual(size_t pack_index, const Field & value, const DataTypePtr & type)
{
if ((*has_null_marks)[pack_id] || value.isNull())
if ((*has_null_marks)[pack_index] || value.isNull())
return RSResult::Some;
if (!(*has_value_marks)[pack_id])
if (!(*has_value_marks)[pack_index])
return RSResult::None;

auto raw_type = type.get();
const auto * raw_type = type.get();
#define DISPATCH(TYPE) \
if (typeid_cast<const DataType##TYPE *>(raw_type)) \
{ \
auto & minmaxes_data = toColumnVectorData<TYPE>(minmaxes); \
auto min = minmaxes_data[pack_id * 2]; \
auto max = minmaxes_data[pack_id * 2 + 1]; \
auto min = minmaxes_data[pack_index * 2]; \
auto max = minmaxes_data[pack_index * 2 + 1]; \
return RoughCheck::checkEqual<TYPE>(value, type, min, max); \
}
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (typeid_cast<const DataTypeDate *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDate::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkEqual<DataTypeDate::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeDateTime *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDateTime::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkEqual<DataTypeDateTime::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeMyDateTime *>(raw_type) || typeid_cast<const DataTypeMyDate *>(raw_type))
{
// For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK.
// Check `struct MyTimeBase` for more details.
auto & minmaxes_data = toColumnVectorData<DataTypeMyTimeBase::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkEqual<DataTypeMyTimeBase::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeString *>(raw_type))
{
auto * string_column = checkAndGetColumn<ColumnString>(minmaxes.get());
auto & chars = string_column->getChars();
auto & offsets = string_column->getOffsets();
size_t pos = pack_id * 2;
size_t pos = pack_index * 2;
size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1];
// todo use StringRef instead of String
auto min = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
pos = pack_id * 2 + 1;
pos = pack_index * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
return RoughCheck::checkEqual<String>(value, type, min, max);
}
return RSResult::Some;
}
RSResult MinMaxIndex::checkGreater(size_t pack_id, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/)
RSResult MinMaxIndex::checkGreater(size_t pack_index, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/)
{
if ((*has_null_marks)[pack_id] || value.isNull())
if ((*has_null_marks)[pack_index] || value.isNull())
return RSResult::Some;
if (!(*has_value_marks)[pack_id])
if (!(*has_value_marks)[pack_index])
return RSResult::None;

auto raw_type = type.get();
const auto * raw_type = type.get();
#define DISPATCH(TYPE) \
if (typeid_cast<const DataType##TYPE *>(raw_type)) \
{ \
auto & minmaxes_data = toColumnVectorData<TYPE>(minmaxes); \
auto min = minmaxes_data[pack_id * 2]; \
auto max = minmaxes_data[pack_id * 2 + 1]; \
auto min = minmaxes_data[pack_index * 2]; \
auto max = minmaxes_data[pack_index * 2 + 1]; \
return RoughCheck::checkGreater<TYPE>(value, type, min, max); \
}
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (typeid_cast<const DataTypeDate *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDate::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreater<DataTypeDate::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeDateTime *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDateTime::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreater<DataTypeDateTime::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeMyDateTime *>(raw_type) || typeid_cast<const DataTypeMyDate *>(raw_type))
{
// For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK.
// Check `struct MyTimeBase` for more details.
auto & minmaxes_data = toColumnVectorData<DataTypeMyTimeBase::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreater<DataTypeMyTimeBase::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeString *>(raw_type))
{
auto * string_column = checkAndGetColumn<ColumnString>(minmaxes.get());
auto & chars = string_column->getChars();
auto & offsets = string_column->getOffsets();
size_t pos = pack_id * 2;
size_t pos = pack_index * 2;
size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1];
// todo use StringRef instead of String
auto min = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
pos = pack_id * 2 + 1;
pos = pack_index * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
return RoughCheck::checkGreater<String>(value, type, min, max);
}
return RSResult::Some;
}
RSResult MinMaxIndex::checkGreaterEqual(size_t pack_id, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/)
RSResult MinMaxIndex::checkGreaterEqual(size_t pack_index, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/)
{
if ((*has_null_marks)[pack_id] || value.isNull())
if ((*has_null_marks)[pack_index] || value.isNull())
return RSResult::Some;
if (!(*has_value_marks)[pack_id])
if (!(*has_value_marks)[pack_index])
return RSResult::None;

auto raw_type = type.get();
const auto * raw_type = type.get();
#define DISPATCH(TYPE) \
if (typeid_cast<const DataType##TYPE *>(raw_type)) \
{ \
auto & minmaxes_data = toColumnVectorData<TYPE>(minmaxes); \
auto min = minmaxes_data[pack_id * 2]; \
auto max = minmaxes_data[pack_id * 2 + 1]; \
auto min = minmaxes_data[pack_index * 2]; \
auto max = minmaxes_data[pack_index * 2 + 1]; \
return RoughCheck::checkGreaterEqual<TYPE>(value, type, min, max); \
}
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (typeid_cast<const DataTypeDate *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDate::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreaterEqual<DataTypeDate::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeDateTime *>(raw_type))
{
auto & minmaxes_data = toColumnVectorData<DataTypeDateTime::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreaterEqual<DataTypeDateTime::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeMyDateTime *>(raw_type) || typeid_cast<const DataTypeMyDate *>(raw_type))
{
// For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK.
// Check `struct MyTimeBase` for more details.
auto & minmaxes_data = toColumnVectorData<DataTypeMyTimeBase::FieldType>(minmaxes);
auto min = minmaxes_data[pack_id * 2];
auto max = minmaxes_data[pack_id * 2 + 1];
auto min = minmaxes_data[pack_index * 2];
auto max = minmaxes_data[pack_index * 2 + 1];
return RoughCheck::checkGreaterEqual<DataTypeMyTimeBase::FieldType>(value, type, min, max);
}
if (typeid_cast<const DataTypeString *>(raw_type))
{
auto * string_column = checkAndGetColumn<ColumnString>(minmaxes.get());
auto & chars = string_column->getChars();
auto & offsets = string_column->getOffsets();
size_t pos = pack_id * 2;
size_t pos = pack_index * 2;
size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1];
// todo use StringRef instead of String
auto min = String(reinterpret_cast<const char *>(&chars[prev_offset]), offsets[pos] - prev_offset - 1);
pos = pack_id * 2 + 1;
pos = pack_index * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(reinterpret_cast<const char *>(&chars[prev_offset]), offsets[pos] - prev_offset - 1);
return RoughCheck::checkGreaterEqual<String>(value, type, min, max);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MinMaxIndex
}

public:
MinMaxIndex(const IDataType & type)
explicit MinMaxIndex(const IDataType & type)
: has_null_marks(std::make_shared<PaddedPODArray<UInt8>>())
, has_value_marks(std::make_shared<PaddedPODArray<UInt8>>())
, minmaxes(type.createColumn())
Expand All @@ -51,6 +51,7 @@ class MinMaxIndex
void addPack(const IColumn & column, const ColumnVector<UInt8> * del_mark);

void write(const IDataType & type, WriteBuffer & buf);

static MinMaxIndexPtr read(const IDataType & type, ReadBuffer & buf, size_t bytes_limit);

std::pair<Int64, Int64> getIntMinMax(size_t pack_index);
Expand Down
17 changes: 7 additions & 10 deletions dbms/src/Storages/DeltaMerge/Index/RoughCheck.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,15 @@ namespace DM
{
namespace RoughCheck
{
static constexpr int TRUE = 1;
static constexpr int FAILED = 0;

template <template <typename, typename> class Op>
using Cmp = ValueComparision<Op>;

#define IS_LEGAL(lef_field, right_value) (Cmp<EqualsOp>::compare(lef_field, type, right_value) != FAILED)
#define EQUAL(lef_field, right_value) (Cmp<EqualsOp>::compare(lef_field, type, right_value) == TRUE)
#define LESS(lef_field, right_value) (Cmp<LessOp>::compare(lef_field, type, right_value) == TRUE)
#define GREATER(lef_field, right_value) (Cmp<GreaterOp>::compare(lef_field, type, right_value) == TRUE)
#define LESS_EQ(lef_field, right_value) (Cmp<LessOrEqualsOp>::compare(lef_field, type, right_value) == TRUE)
#define GREATER_EQ(lef_field, right_value) (Cmp<GreaterOrEqualsOp>::compare(lef_field, type, right_value) == TRUE)
#define IS_LEGAL(lef_field, right_value) (Cmp<EqualsOp>::compare(lef_field, type, right_value) != ValueCompareResult::CanNotCompare)
#define EQUAL(lef_field, right_value) (Cmp<EqualsOp>::compare(lef_field, type, right_value) == ValueCompareResult::True)
#define LESS(lef_field, right_value) (Cmp<LessOp>::compare(lef_field, type, right_value) == ValueCompareResult::True)
#define GREATER(lef_field, right_value) (Cmp<GreaterOp>::compare(lef_field, type, right_value) == ValueCompareResult::True)
#define LESS_EQ(lef_field, right_value) (Cmp<LessOrEqualsOp>::compare(lef_field, type, right_value) == ValueCompareResult::True)
#define GREATER_EQ(lef_field, right_value) (Cmp<GreaterOrEqualsOp>::compare(lef_field, type, right_value) == ValueCompareResult::True)


template <typename T>
Expand Down Expand Up @@ -94,4 +91,4 @@ inline RSResult checkGreaterEqual(const Field & v, const DataTypePtr & type, T m

} // namespace RoughCheck
} // namespace DM
} // namespace DB
} // namespace DB
Loading

0 comments on commit 571a628

Please sign in to comment.