Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-402] Skip unneeded columns when decode row #173

Merged
merged 45 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6abfa1d
skip unneeded column
lidezhu Aug 10, 2019
b83a4b5
small fix
lidezhu Aug 11, 2019
445fc4d
check unknown column id when decode row
lidezhu Aug 12, 2019
fce6f15
add initializer for boolean variable
lidezhu Aug 13, 2019
0231b95
fix the case when tikv value contains just a nill flag
lidezhu Aug 13, 2019
0366142
remove unnecessary copy
lidezhu Aug 13, 2019
f5013e9
small improvement
lidezhu Aug 13, 2019
b0d3e9c
fix comment
lidezhu Aug 14, 2019
93a10a7
don't store column id in field
lidezhu Aug 14, 2019
8a5bb7b
small improvement
lidezhu Aug 15, 2019
dcc4265
remove unnecessary blank line
lidezhu Aug 15, 2019
a92d878
add log for performance debug
lidezhu Aug 15, 2019
1881a5c
adjust time point
lidezhu Aug 15, 2019
eae234b
small fix and adjust log
lidezhu Aug 15, 2019
43472ba
small improvement
lidezhu Aug 15, 2019
1ddb5d2
small improvement
lidezhu Aug 16, 2019
1952272
remove unnecessary construction
lidezhu Aug 16, 2019
d051cbb
small improvement
lidezhu Aug 16, 2019
7aee3e5
fix conflict with master
lidezhu Aug 16, 2019
9191506
uncomment flushregion
lidezhu Aug 16, 2019
8befbd6
Merge branch 'master' into FLASH402
solotzg Aug 19, 2019
a64e2c5
skip uint by calling DecodeVarUInt
lidezhu Aug 19, 2019
68a688d
Merge branch 'FLASH402' of github.com:lidezhu/tics into FLASH402
lidezhu Aug 19, 2019
33b14ef
Merge branch 'master' into FLASH402
lidezhu Aug 19, 2019
171b2f9
avoid insert column id
lidezhu Aug 20, 2019
dfabdb2
Merge branch 'FLASH402' of github.com:lidezhu/tics into FLASH402
lidezhu Aug 20, 2019
a8a25ca
small fix
lidezhu Aug 20, 2019
72beacc
add exception message
lidezhu Aug 20, 2019
237b7f2
comment flushRegion
lidezhu Aug 20, 2019
ad152f8
uncomment flushRegion and other minor fix
lidezhu Aug 20, 2019
c2afabb
modify push_back to emplace_back
lidezhu Aug 20, 2019
f50a6f4
add const
lidezhu Aug 20, 2019
9daf974
fix comment
lidezhu Aug 20, 2019
b6cb23f
fix comment
lidezhu Aug 20, 2019
8b89ea1
optimize by using dense_hash_map&dense_hash_set
solotzg Aug 20, 2019
8970e15
fix
solotzg Aug 20, 2019
1422d0a
fix
solotzg Aug 20, 2019
8b2e256
fix comment
lidezhu Aug 20, 2019
b574f5b
small fix
lidezhu Aug 20, 2019
12db36f
Merge branch 'FLASH402' of github.com:lidezhu/tics into lidezhu-FLASH…
solotzg Aug 20, 2019
96d62ed
Merge branch 'FLASH402' of github.com:lidezhu/tics into lidezhu-FLASH…
solotzg Aug 20, 2019
5119717
remove useless comment
lidezhu Aug 20, 2019
ad9bb51
format
solotzg Aug 20, 2019
e361ea5
Merge pull request #1 from solotzg/FLASH-402-optimize-hash
lidezhu Aug 20, 2019
f886ff3
[FLASH-402] optimize column_map (#2)
solotzg Aug 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions dbms/src/Storages/Transaction/Codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ String DecodeBytes(size_t & cursor, const String & raw_value)
return ss.str();
}

void SkipBytes(size_t & cursor, const String & raw_value)
{
while (true)
{
size_t next_cursor = cursor + 9;
if (next_cursor > raw_value.size())
throw Exception("Wrong format, cursor over buffer size. (DecodeBytes)", ErrorCodes::LOGICAL_ERROR);
UInt8 marker = (UInt8)raw_value[cursor + 8];
UInt8 pad_size = ENC_MARKER - marker;

if (pad_size > 8)
throw Exception("Wrong format, too many padding bytes. (DecodeBytes)", ErrorCodes::LOGICAL_ERROR);
cursor = next_cursor;
if (pad_size != 0)
break;
}
}

String DecodeCompactBytes(size_t & cursor, const String & raw_value)
{
size_t size = DecodeVarInt(cursor, raw_value);
Expand All @@ -73,13 +91,24 @@ String DecodeCompactBytes(size_t & cursor, const String & raw_value)
return res;
}

void SkipCompactBytes(size_t & cursor, const String & raw_value)
{
size_t size = DecodeVarInt(cursor, raw_value);
cursor += size;
}

Int64 DecodeVarInt(size_t & cursor, const String & raw_value)
{
UInt64 v = DecodeVarUInt(cursor, raw_value);
Int64 vx = v >> 1;
return (v & 1) ? ~vx : vx;
}

void SkipVarInt(size_t & cursor, const String & raw_value)
solotzg marked this conversation as resolved.
Show resolved Hide resolved
{
SkipVarUInt(cursor, raw_value);
}

UInt64 DecodeVarUInt(size_t & cursor, const String & raw_value)
{
UInt64 res = 0;
Expand All @@ -99,6 +128,11 @@ UInt64 DecodeVarUInt(size_t & cursor, const String & raw_value)
throw Exception("Wrong format. (DecodeVarUInt)", ErrorCodes::LOGICAL_ERROR);
}

void SkipVarUInt(size_t & cursor, const String & raw_value)
{
std::ignore = DecodeVarUInt(cursor, raw_value);
}

inline Int8 getWords(PrecType prec, ScaleType scale)
{
Int8 scale_word = scale / 9 + (scale % 9 > 0);
Expand Down Expand Up @@ -204,6 +238,15 @@ Decimal DecodeDecimal(size_t & cursor, const String & raw_value)
return Decimal(value, prec, frac);
}

void SkipDecimal(size_t & cursor, const String & raw_value)
{
PrecType prec = raw_value[cursor++];
ScaleType frac = raw_value[cursor++];

int binSize = getBytes(prec, frac);
cursor += binSize;
}

Field DecodeDatum(size_t & cursor, const String & raw_value)
{
switch (raw_value[cursor++])
Expand Down Expand Up @@ -233,6 +276,43 @@ Field DecodeDatum(size_t & cursor, const String & raw_value)
}
}

void SkipDatum(size_t & cursor, const String & raw_value)
{
switch (raw_value[cursor++])
{
case TiDB::CodecFlagNil:
return;
case TiDB::CodecFlagInt:
cursor += sizeof(Int64);
return;
case TiDB::CodecFlagUInt:
cursor += sizeof(UInt64);
return;
case TiDB::CodecFlagBytes:
SkipBytes(cursor, raw_value);
return;
case TiDB::CodecFlagCompactBytes:
SkipCompactBytes(cursor, raw_value);
return;
case TiDB::CodecFlagFloat:
cursor += sizeof(UInt64);
return;
case TiDB::CodecFlagVarUInt:
SkipVarUInt(cursor, raw_value);
return;
case TiDB::CodecFlagVarInt:
SkipVarInt(cursor, raw_value);
return;
case TiDB::CodecFlagDuration:
throw Exception("Not implented yet. DecodeDatum: CodecFlagDuration", ErrorCodes::LOGICAL_ERROR);
case TiDB::CodecFlagDecimal:
SkipDecimal(cursor, raw_value);
return;
default:
throw Exception("Unknown Type:" + std::to_string(raw_value[cursor - 1]), ErrorCodes::LOGICAL_ERROR);
}
}

void EncodeFloat64(Float64 num, std::stringstream & ss)
{
UInt64 u = enforce_cast<UInt64>(num);
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/Transaction/Codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ Decimal DecodeDecimal(size_t & cursor, const String & raw_value);

Field DecodeDatum(size_t & cursor, const String & raw_value);

void SkipBytes(size_t & cursor, const String & raw_value);

void SkipCompactBytes(size_t & cursor, const String & raw_value);

void SkipVarInt(size_t & cursor, const String & raw_value);

void SkipVarUInt(size_t & cursor, const String & raw_value);

void SkipDecimal(size_t & cursor, const String & raw_value);

void SkipDatum(size_t & cursor, const String & raw_value);

template <typename T>
inline std::enable_if_t<std::is_unsigned_v<T>, void> EncodeUInt(T u, std::stringstream & ss)
{
Expand Down
131 changes: 70 additions & 61 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,32 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
ColumnID handle_col_id = InvalidColumnID;

std::unordered_map<ColumnID, std::pair<MutableColumnPtr, NameAndTypePair>> column_map;
for (const auto & column_info : table_info.columns)
std::unordered_map<ColumnID, size_t> column_id_to_info_index_map;
std::unordered_set<ColumnID> column_ids_to_read;
std::unordered_set<ColumnID> schema_all_column_ids;
for (size_t i = 0; i < table_info.columns.size(); i++)
{
auto & column_info = table_info.columns[i];
ColumnID col_id = column_info.id;
String col_name = column_info.name;
schema_all_column_ids.insert(col_id);
if (std::find(column_names_to_read.begin(), column_names_to_read.end(), col_name) == column_names_to_read.end())
{
continue;
}
auto ch_col = columns.getPhysical(col_name);
column_map[col_id] = std::make_pair(ch_col.type->createColumn(), ch_col);
column_map[col_id].first->reserve(data_list.size());
if (table_info.pk_is_handle && column_info.hasPriKeyFlag())
handle_col_id = col_id;
else
{
column_ids_to_read.emplace(col_id);
column_id_to_info_index_map.emplace(std::make_pair(col_id, i));
}
}
if (column_names_to_read.size() - 3 != column_ids_to_read.size())
throw Exception("schema doesn't contain needed columns.", ErrorCodes::LOGICAL_ERROR);

if (!table_info.pk_is_handle)
{
Expand Down Expand Up @@ -168,15 +184,19 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,

const auto & date_lut = DateLUT::instance();

std::unordered_set<ColumnID> col_id_included;

const size_t target_col_size = (!table_info.pk_is_handle ? table_info.columns.size() : table_info.columns.size() - 1) * 2;
const size_t target_col_size = column_names_to_read.size() - 3;

Block block;

// optimize for only need handle, tso, delmark.
if (column_names_to_read.size() > 3)
{
// TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible.
std::vector<ColumnID> decoded_col_ids;
std::vector<Field> decoded_fields;
decoded_col_ids.reserve(target_col_size);
decoded_fields.reserve(target_col_size);

for (const auto & [handle, write_type, commit_ts, value_ptr] : data_list)
{
std::ignore = handle;
Expand All @@ -185,94 +205,82 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
if (commit_ts > start_ts)
continue;

// TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible.

std::vector<Field> row;

decoded_col_ids.clear();
decoded_fields.clear();
if (write_type == Region::DelFlag)
{
row.reserve(table_info.columns.size() * 2);
for (const TiDB::ColumnInfo & column : table_info.columns)
for (auto col_id : column_ids_to_read)
{
if (handle_col_id == column.id)
continue;
const auto & column = table_info.columns[column_id_to_info_index_map[col_id]];

row.push_back(Field(column.id));
row.push_back(GenDecodeRow(column.getCodecFlag()));
decoded_col_ids.push_back(column.id);
decoded_fields.emplace_back(GenDecodeRow(column.getCodecFlag()));
}
}
else
row = RecordKVFormat::DecodeRow(*value_ptr);

if (row.size() == 1 && row[0].isNull())
{
// all field is null
row.clear();
bool schema_matches = RecordKVFormat::DecodeRow(*value_ptr, column_ids_to_read, decoded_col_ids, decoded_fields, schema_all_column_ids);
if (!schema_matches && !force_decode)
{
return std::make_tuple(block, false);
}
if (decoded_col_ids.empty() && decoded_fields.size() == 1 && decoded_fields[0].isNull())
{
// all field is null
decoded_fields.clear();
}
}

if (row.size() & 1)
if (decoded_col_ids.size() != decoded_fields.size())
throw Exception("row size is wrong.", ErrorCodes::LOGICAL_ERROR);

/// Modify `row` by adding missing column values or removing useless column values.

col_id_included.clear();
for (size_t i = 0; i < row.size(); i += 2)
col_id_included.emplace(row[i].get<ColumnID>());

// Fill in missing column values.
for (const TiDB::ColumnInfo & column : table_info.columns)
if (unlikely(decoded_col_ids.size() > column_ids_to_read.size()))
{
if (handle_col_id == column.id)
continue;
if (col_id_included.count(column.id))
continue;

if (!force_decode)
return std::make_tuple(block, false);

row.emplace_back(Field(column.id));
if (column.hasNoDefaultValueFlag())
// Fill `zero` value if NOT NULL specified or else NULL.
row.push_back(column.hasNotNullFlag() ? GenDecodeRow(column.getCodecFlag()) : Field());
else
// Fill default value.
row.push_back(column.defaultValueToField());
throw Exception("read unexpected columns.", ErrorCodes::LOGICAL_ERROR);
}

// Remove values of non-existing columns, which could be data inserted (but not flushed) before DDLs that drop some columns.
// TODO: May need to log this.
for (int i = int(row.size()) - 2; i >= 0; i -= 2)
// redundant column values (column id not in current schema) has been dropped when decoding row
// this branch handles the case when the row doesn't contain all the needed column
if (decoded_col_ids.size() < column_ids_to_read.size())
{
Field & col_id = row[i];
if (column_map.find(col_id.get<ColumnID>()) == column_map.end())
std::unordered_set<ColumnID> decoded_col_ids_set(decoded_col_ids.begin(), decoded_col_ids.end());
solotzg marked this conversation as resolved.
Show resolved Hide resolved

for (auto col_id : column_ids_to_read)
{
if (!force_decode)
return std::make_tuple(block, false);
if (decoded_col_ids_set.count(col_id))
continue;

row.erase(row.begin() + i, row.begin() + i + 2);
const auto & column = table_info.columns[column_id_to_info_index_map[col_id]];
decoded_col_ids.push_back(column.id);
if (column.hasNoDefaultValueFlag())
// Fill `zero` value if NOT NULL specified or else NULL.
decoded_fields.push_back(column.hasNotNullFlag() ? GenDecodeRow(column.getCodecFlag()) : Field());
else
// Fill default value.
decoded_fields.push_back(column.defaultValueToField());
}
}

if (row.size() != target_col_size)
if (decoded_col_ids.size() != target_col_size || decoded_fields.size() != target_col_size)
throw Exception("decode row error.", ErrorCodes::LOGICAL_ERROR);

/// Transform `row` to columnar format.

for (size_t i = 0; i < row.size(); i += 2)
for (size_t i = 0; i < decoded_col_ids.size(); i++)
{
Field & col_id = row[i];
auto it = column_map.find(col_id.get<ColumnID>());
ColumnID col_id = decoded_col_ids[i];
auto it = column_map.find(col_id);
if (it == column_map.end())
throw Exception("col_id not found in column_map", ErrorCodes::LOGICAL_ERROR);

const auto & tp = it->second.second.type;
if (tp->isDateOrDateTime()
|| (tp->isNullable() && dynamic_cast<const DataTypeNullable *>(tp.get())->getNestedType()->isDateOrDateTime()))
{
Field & field = row[i + 1];
Field & field = decoded_fields[i];
if (field.isNull())
{
it->second.first->insert(row[i + 1]);
it->second.first->insert(decoded_fields[i]);
continue;
}
UInt64 packed = field.get<UInt64>();
Expand Down Expand Up @@ -317,7 +325,7 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
}
else
{
it->second.first->insert(row[i + 1]);
it->second.first->insert(decoded_fields[i]);

// Check overflow for potential un-synced data type widen,
// i.e. schema is old and narrow, meanwhile data is new and wide.
Expand All @@ -335,14 +343,14 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
{
// Unsigned checking by bitwise compare.
UInt64 inserted = nested_column.get64(inserted_index);
UInt64 orig = row[i + 1].get<UInt64>();
UInt64 orig = decoded_fields[i].get<UInt64>();
overflow = inserted != orig;
}
else
{
// Singed checking by arithmetical cast.
Int64 inserted = nested_column.getInt(inserted_index);
Int64 orig = row[i + 1].get<Int64>();
Int64 orig = decoded_fields[i].get<Int64>();
overflow = inserted != orig;
}
if (overflow)
Expand All @@ -352,7 +360,7 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
throw Exception(
"Detected overflow for data " + std::to_string(row[i + 1].get<UInt64>()) + " of type " + tp->getName(),
"Detected overflow for data " + std::to_string(decoded_fields[i].get<UInt64>()) + " of type " + tp->getName(),
ErrorCodes::LOGICAL_ERROR);

return std::make_tuple(block, false);
Expand All @@ -364,6 +372,7 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
}
}


for (const auto & name : column_names_to_read)
{
if (name == MutableSupport::delmark_column_name)
Expand Down
Loading