Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#5899
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JaySon-Huang authored and ti-chi-bot committed Sep 16, 2022
1 parent 4079241 commit 603ff72
Show file tree
Hide file tree
Showing 5 changed files with 1,094 additions and 3 deletions.
313 changes: 310 additions & 3 deletions dbms/src/Storages/Transaction/RowCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,18 @@ struct RowEncoderV2
/// Cache encoded individual columns.
for (size_t i_col = 0, i_val = 0; i_col < table_info.columns.size(); i_col++)
{
if (i_val == fields.size())
break;

const auto & column_info = table_info.columns[i_col];
const auto & field = fields[i_val];
if ((table_info.pk_is_handle || table_info.is_common_handle) && column_info.hasPriKeyFlag())
{
// for common handle/pk is handle table,
// the field with primary key flag is usually encoded to key instead of value
continue;
}

if (column_info.id > std::numeric_limits<typename RowV2::Types<false>::ColumnIDType>::max())
is_big = true;
if (!field.isNull())
Expand All @@ -452,9 +460,6 @@ struct RowEncoderV2
null_column_ids.emplace(column_info.id);
}
i_val++;

if (i_val == fields.size())
break;
}
is_big = is_big || value_length > std::numeric_limits<RowV2::Types<false>::ValueOffsetType>::max();

Expand Down Expand Up @@ -524,4 +529,306 @@ void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector<Field> &
RowEncoderV2(table_info, fields).encode(ss);
}

<<<<<<< HEAD
=======
bool appendRowToBlock(
const TiKVValue::Base & raw_value,
SortedColumnIDWithPosConstIter column_ids_iter,
SortedColumnIDWithPosConstIter column_ids_iter_end,
Block & block,
size_t block_column_pos,
const ColumnInfos & column_infos,
ColumnID pk_handle_id,
bool force_decode)
{
switch (static_cast<UInt8>(raw_value[0]))
{
case static_cast<UInt8>(RowCodecVer::ROW_V2):
return appendRowV2ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
default:
return appendRowV1ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
}
}

bool appendRowV2ToBlock(
const TiKVValue::Base & raw_value,
SortedColumnIDWithPosConstIter column_ids_iter,
SortedColumnIDWithPosConstIter column_ids_iter_end,
Block & block,
size_t block_column_pos,
const ColumnInfos & column_infos,
ColumnID pk_handle_id,
bool force_decode)
{
auto row_flag = readLittleEndian<UInt8>(&raw_value[1]);
bool is_big = row_flag & RowV2::BigRowMask;
return is_big ? appendRowV2ToBlockImpl<true>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode)
: appendRowV2ToBlockImpl<false>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
}

inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool force_decode)
{
// We consider a missing column could be safely filled with NULL, unless it has not default value and is NOT NULL.
// This could saves lots of unnecessary schema syncs for old data with a schema that has newly added columns.
// for clustered index, if the pk column does not exists, it can still be decoded from the key
if (column_info.hasPriKeyFlag())
return true;

if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag())
{
if (!force_decode)
return false;
}
// not null or has no default value, tidb will fill with specific value.
auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
raw_column->insert(column_info.defaultValueToField());
return true;
}

template <bool is_big>
bool appendRowV2ToBlockImpl(
const TiKVValue::Base & raw_value,
SortedColumnIDWithPosConstIter column_ids_iter,
SortedColumnIDWithPosConstIter column_ids_iter_end,
Block & block,
size_t block_column_pos,
const ColumnInfos & column_infos,
ColumnID pk_handle_id,
bool force_decode)
{
size_t cursor = 2; // Skip the initial codec ver and row flag.
size_t num_not_null_columns = decodeUInt<UInt16>(cursor, raw_value);
size_t num_null_columns = decodeUInt<UInt16>(cursor, raw_value);
std::vector<ColumnID> not_null_column_ids;
std::vector<ColumnID> null_column_ids;
std::vector<size_t> value_offsets;
decodeUInts<ColumnID, typename RowV2::Types<is_big>::ColumnIDType>(cursor, raw_value, num_not_null_columns, not_null_column_ids);
decodeUInts<ColumnID, typename RowV2::Types<is_big>::ColumnIDType>(cursor, raw_value, num_null_columns, null_column_ids);
decodeUInts<size_t, typename RowV2::Types<is_big>::ValueOffsetType>(cursor, raw_value, num_not_null_columns, value_offsets);
size_t values_start_pos = cursor;
size_t idx_not_null = 0;
size_t idx_null = 0;
// Merge ordered not null/null columns to keep order.
while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size())
{
if (column_ids_iter == column_ids_iter_end)
{
// extra column
return force_decode;
}

bool is_null;
if (idx_not_null < not_null_column_ids.size() && idx_null < null_column_ids.size())
is_null = not_null_column_ids[idx_not_null] > null_column_ids[idx_null];
else
is_null = idx_null < null_column_ids.size();

auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null];
const auto next_column_id = column_ids_iter->first;
if (next_column_id > next_datum_column_id)
{
// The next column id to read is bigger than the column id of next datum in encoded row.
// It means this is the datum of extra column. May happen when reading after dropping
// a column.
if (!force_decode)
return false;
// Ignore the extra column and continue to parse other datum
if (is_null)
idx_null++;
else
idx_not_null++;
}
else if (next_column_id < next_datum_column_id)
{
// The next column id to read is less than the column id of next datum in encoded row.
// It means this is the datum of missing column. May happen when reading after adding
// a column.
// Fill with default value and continue to read data for next column id.
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
column_ids_iter++;
block_column_pos++;
}
else
{
// If pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// we can just ignore the pk value encoded in value part
if (unlikely(next_column_id == pk_handle_id))
{
column_ids_iter++;
block_column_pos++;
if (is_null)
{
idx_null++;
}
else
{
idx_not_null++;
}
continue;
}

// Parse the datum.
auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto & column_info = column_infos[column_ids_iter->second];
if (is_null)
{
if (!raw_column->isColumnNullable())
{
if (!force_decode)
{
return false;
}
else
{
throw Exception("Detected invalid null when decoding data of column " + column_info.name + " with column type " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
// ColumnNullable::insertDefault just insert a null value
raw_column->insertDefault();
idx_null++;
}
else
{
size_t start = idx_not_null ? value_offsets[idx_not_null - 1] : 0;
size_t length = value_offsets[idx_not_null] - start;
if (!raw_column->decodeTiDBRowV2Datum(values_start_pos + start, raw_value, length, force_decode))
return false;
idx_not_null++;
}
column_ids_iter++;
block_column_pos++;
}
}
while (column_ids_iter != column_ids_iter_end)
{
if (column_ids_iter->first != pk_handle_id)
{
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
}
column_ids_iter++;
block_column_pos++;
}
return true;
}

using TiDB::DatumFlat;
bool appendRowV1ToBlock(
const TiKVValue::Base & raw_value,
SortedColumnIDWithPosConstIter column_ids_iter,
SortedColumnIDWithPosConstIter column_ids_iter_end,
Block & block,
size_t block_column_pos,
const ColumnInfos & column_infos,
ColumnID pk_handle_id,
bool force_decode)
{
size_t cursor = 0;
std::map<ColumnID, Field> decoded_fields;
while (cursor < raw_value.size())
{
Field f = DecodeDatum(cursor, raw_value);
if (f.isNull())
break;
ColumnID col_id = f.get<ColumnID>();
decoded_fields.emplace(col_id, DecodeDatum(cursor, raw_value));
}
if (cursor != raw_value.size())
throw Exception(std::string(__PRETTY_FUNCTION__) + ": cursor is not end, remaining: " + raw_value.substr(cursor),
ErrorCodes::LOGICAL_ERROR);

auto decoded_field_iter = decoded_fields.begin();
while (decoded_field_iter != decoded_fields.end())
{
if (column_ids_iter == column_ids_iter_end)
{
// extra column
return force_decode;
}

auto next_field_column_id = decoded_field_iter->first;
if (column_ids_iter->first > next_field_column_id)
{
// extra column
if (!force_decode)
return false;
decoded_field_iter++;
}
else if (column_ids_iter->first < next_field_column_id)
{
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
column_ids_iter++;
block_column_pos++;
}
else
{
// if pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// we can just ignore the pk value encoded in value part
if (unlikely(column_ids_iter->first == pk_handle_id))
{
decoded_field_iter++;
column_ids_iter++;
block_column_pos++;
continue;
}

auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto & column_info = column_infos[column_ids_iter->second];
DatumFlat datum(decoded_field_iter->second, column_info.tp);
const Field & unflattened = datum.field();
if (datum.overflow(column_info))
{
// Overflow detected, fatal if force_decode is true,
// as schema being newer and narrow shouldn't happen.
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get<UInt64>()) + " of column "
+ column_info.name + " with column " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return false;
}
if (datum.invalidNull(column_info))
{
// Null value with non-null type detected, fatal if force_decode is true,
// as schema being newer and with invalid null shouldn't happen.
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get<UInt64>())
+ " of column " + column_info.name + " with type " + raw_column->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return false;
}
raw_column->insert(unflattened);
decoded_field_iter++;
column_ids_iter++;
block_column_pos++;
}
}
while (column_ids_iter != column_ids_iter_end)
{
if (column_ids_iter->first != pk_handle_id)
{
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
}
column_ids_iter++;
block_column_pos++;
}
return true;
}

>>>>>>> aae88b120d (tests: Fix RegionBlockReaderTest helper functions (#5899))
} // namespace DB
Loading

0 comments on commit 603ff72

Please sign in to comment.