Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-402] Skip unneeded columns when decode row #173

Merged
merged 45 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6abfa1d
skip unneeded column
lidezhu Aug 10, 2019
b83a4b5
small fix
lidezhu Aug 11, 2019
445fc4d
check unknown column id when decode row
lidezhu Aug 12, 2019
fce6f15
add initializer for boolean variable
lidezhu Aug 13, 2019
0231b95
fix the case when tikv value contains just a nill flag
lidezhu Aug 13, 2019
0366142
remove unnecessary copy
lidezhu Aug 13, 2019
f5013e9
small improvement
lidezhu Aug 13, 2019
b0d3e9c
fix comment
lidezhu Aug 14, 2019
93a10a7
don't store column id in field
lidezhu Aug 14, 2019
8a5bb7b
small improvement
lidezhu Aug 15, 2019
dcc4265
remove unnecessary blank line
lidezhu Aug 15, 2019
a92d878
add log for performance debug
lidezhu Aug 15, 2019
1881a5c
adjust time point
lidezhu Aug 15, 2019
eae234b
small fix and adjust log
lidezhu Aug 15, 2019
43472ba
small improvement
lidezhu Aug 15, 2019
1ddb5d2
small improvement
lidezhu Aug 16, 2019
1952272
remove unnecessary construction
lidezhu Aug 16, 2019
d051cbb
small improvement
lidezhu Aug 16, 2019
7aee3e5
fix conflict with master
lidezhu Aug 16, 2019
9191506
uncomment flushregion
lidezhu Aug 16, 2019
8befbd6
Merge branch 'master' into FLASH402
solotzg Aug 19, 2019
a64e2c5
skip uint by calling DecodeVarUInt
lidezhu Aug 19, 2019
68a688d
Merge branch 'FLASH402' of github.com:lidezhu/tics into FLASH402
lidezhu Aug 19, 2019
33b14ef
Merge branch 'master' into FLASH402
lidezhu Aug 19, 2019
171b2f9
avoid insert column id
lidezhu Aug 20, 2019
dfabdb2
Merge branch 'FLASH402' of github.com:lidezhu/tics into FLASH402
lidezhu Aug 20, 2019
a8a25ca
small fix
lidezhu Aug 20, 2019
72beacc
add exception message
lidezhu Aug 20, 2019
237b7f2
comment flushRegion
lidezhu Aug 20, 2019
ad152f8
uncomment flushRegion and other minor fix
lidezhu Aug 20, 2019
c2afabb
modify push_back to emplace_back
lidezhu Aug 20, 2019
f50a6f4
add const
lidezhu Aug 20, 2019
9daf974
fix comment
lidezhu Aug 20, 2019
b6cb23f
fix comment
lidezhu Aug 20, 2019
8b89ea1
optimize by using dense_hash_map&dense_hash_set
solotzg Aug 20, 2019
8970e15
fix
solotzg Aug 20, 2019
1422d0a
fix
solotzg Aug 20, 2019
8b2e256
fix comment
lidezhu Aug 20, 2019
b574f5b
small fix
lidezhu Aug 20, 2019
12db36f
Merge branch 'FLASH402' of github.com:lidezhu/tics into lidezhu-FLASH…
solotzg Aug 20, 2019
96d62ed
Merge branch 'FLASH402' of github.com:lidezhu/tics into lidezhu-FLASH…
solotzg Aug 20, 2019
5119717
remove useless comment
lidezhu Aug 20, 2019
ad9bb51
format
solotzg Aug 20, 2019
e361ea5
Merge pull request #1 from solotzg/FLASH-402-optimize-hash
lidezhu Aug 20, 2019
f886ff3
[FLASH-402] optimize column_map (#2)
solotzg Aug 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions dbms/src/Storages/Transaction/Codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,96 @@ inline Field DecodeDatum(size_t & cursor, const String & raw_value)
}
}

inline void SkipBytes(size_t & cursor, const String & raw_value)
{
while (true)
{
size_t next_cursor = cursor + 9;
if (next_cursor > raw_value.size())
throw Exception("Wrong format, cursor over buffer size. (DecodeBytes)", ErrorCodes::LOGICAL_ERROR);
UInt8 marker = (UInt8)raw_value[cursor + 8];
UInt8 pad_size = ENC_MARKER - marker;

if (pad_size > 8)
throw Exception("Wrong format, too many padding bytes. (DecodeBytes)", ErrorCodes::LOGICAL_ERROR);
cursor = next_cursor;
if (pad_size != 0)
break;
}
}

inline void SkipCompactBytes(size_t & cursor, const String & raw_value)
{
size_t size = DecodeVarInt(cursor, raw_value);
cursor += size;
}

inline void SkipVarUInt(size_t & cursor, const String & raw_value)
{
for (int i = 0; cursor < raw_value.size(); i++)
{
UInt64 v = raw_value[cursor++];
if (v < 0x80)
{
if (i > 9 || (i == 9 && v > 1))
throw Exception("Overflow when DecodeVarUInt", ErrorCodes::LOGICAL_ERROR);
return;
}
}
throw Exception("Wrong format. (DecodeVarUInt)", ErrorCodes::LOGICAL_ERROR);
}

inline void SkipVarInt(size_t & cursor, const String & raw_value)
{
SkipVarUInt(cursor, raw_value);
}

inline void SkipDecimal(size_t & cursor, const String & raw_value)
{
PrecType prec = raw_value[cursor++];
ScaleType frac = raw_value[cursor++];

int binSize = getBytes(prec, frac);
cursor += binSize;
}

inline void SkipDatum(size_t & cursor, const String & raw_value)
{
switch (raw_value[cursor++])
{
case TiDB::CodecFlagNil:
return;
case TiDB::CodecFlagInt:
cursor += sizeof(Int64);
return;
case TiDB::CodecFlagUInt:
cursor += sizeof(UInt64);
return;
case TiDB::CodecFlagBytes:
SkipBytes(cursor, raw_value);
return;
case TiDB::CodecFlagCompactBytes:
SkipCompactBytes(cursor, raw_value);
return;
case TiDB::CodecFlagFloat:
cursor += sizeof(UInt64);
return;
case TiDB::CodecFlagVarUInt:
SkipVarUInt(cursor, raw_value);
return;
case TiDB::CodecFlagVarInt:
SkipVarInt(cursor, raw_value);
return;
case TiDB::CodecFlagDuration:
throw Exception("Not implented yet. DecodeDatum: CodecFlagDuration", ErrorCodes::LOGICAL_ERROR);
case TiDB::CodecFlagDecimal:
SkipDecimal(cursor, raw_value);
return;
default:
throw Exception("Unknown Type:" + std::to_string(raw_value[cursor - 1]), ErrorCodes::LOGICAL_ERROR);
}
}

template <typename T>
inline void writeIntBinary(const T & x, std::stringstream & ss)
{
Expand Down
106 changes: 66 additions & 40 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,18 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
ColumnID handle_col_id = InvalidColumnID;

std::unordered_map<ColumnID, std::pair<MutableColumnPtr, NameAndTypePair>> column_map;
std::unordered_set<ColumnID> column_ids_to_read;
std::unordered_set<ColumnID> schema_all_column_ids;
for (const auto & column_info : table_info.columns)
{
ColumnID col_id = column_info.id;
String col_name = column_info.name;
schema_all_column_ids.insert(col_id);
if (std::find(column_names_to_read.begin(), column_names_to_read.end(), col_name) == column_names_to_read.end())
{
continue;
}
column_ids_to_read.emplace(col_id);
auto ch_col = columns.getPhysical(col_name);
column_map[col_id] = std::make_pair(ch_col.type->createColumn(), ch_col);
column_map[col_id].first->reserve(data_list.size());
Expand Down Expand Up @@ -168,17 +176,19 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,

const auto & date_lut = DateLUT::instance();

std::unordered_set<ColumnID> col_id_included;

const size_t target_col_size = (!table_info.pk_is_handle ? table_info.columns.size() : table_info.columns.size() - 1) * 2;
const size_t target_col_size = column_names_to_read.size() - 3;

Block block;

// optimize for only need handle, tso, delmark.
if (column_names_to_read.size() > 3)
{
unsigned long sum_decode = 0;
unsigned long sum_transform = 0;

for (const auto & [handle, write_type, commit_ts, value_ptr] : data_list)
{
auto start_time = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now().time_since_epoch()).count();
std::ignore = handle;

// Ignore data after the start_ts.
Expand All @@ -187,37 +197,47 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,

// TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible.

std::vector<Field> row;
std::vector<ColumnID> col_ids;
std::vector<Field> fields;
std::unordered_set<ColumnID> row_all_column_ids;

if (write_type == Region::DelFlag)
{
row.reserve(table_info.columns.size() * 2);
col_ids.reserve(target_col_size);
fields.reserve(target_col_size);
for (const TiDB::ColumnInfo & column : table_info.columns)
solotzg marked this conversation as resolved.
Show resolved Hide resolved
{
if (handle_col_id == column.id)
continue;

row.push_back(Field(column.id));
row.push_back(GenDecodeRow(column.getCodecFlag()));
if (!column_ids_to_read.count(column.id))
continue;

col_ids.push_back(column.id);
fields.push_back(GenDecodeRow(column.getCodecFlag()));
}
}
else
row = RecordKVFormat::DecodeRow(*value_ptr);

if (row.size() == 1 && row[0].isNull())
{
// all field is null
row.clear();
std::tie(col_ids, fields, row_all_column_ids) = RecordKVFormat::DecodeRow(*value_ptr, column_ids_to_read);
if ((schema_all_column_ids != row_all_column_ids) && !force_decode)
{
return std::make_tuple(block, false);
}
if (col_ids.empty() && fields.size() == 1 && fields[0].isNull())
{
// all field is null
fields.clear();
}
}

if (row.size() & 1)
auto mid_time = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now().time_since_epoch()).count();

if (col_ids.size() != fields.size())
throw Exception("row size is wrong.", ErrorCodes::LOGICAL_ERROR);

/// Modify `row` by adding missing column values or removing useless column values.

col_id_included.clear();
for (size_t i = 0; i < row.size(); i += 2)
col_id_included.emplace(row[i].get<ColumnID>());
std::unordered_set<ColumnID> col_id_included(col_ids.begin(), col_ids.end());

// Fill in missing column values.
for (const TiDB::ColumnInfo & column : table_info.columns)
solotzg marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -227,52 +247,51 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
if (col_id_included.count(column.id))
continue;

if (!force_decode)
return std::make_tuple(block, false);
if (!column_ids_to_read.count(column.id))
continue;

row.emplace_back(Field(column.id));
col_ids.push_back(column.id);
if (column.hasNoDefaultValueFlag())
// Fill `zero` value if NOT NULL specified or else NULL.
row.push_back(column.hasNotNullFlag() ? GenDecodeRow(column.getCodecFlag()) : Field());
fields.push_back(column.hasNotNullFlag() ? GenDecodeRow(column.getCodecFlag()) : Field());
else
// Fill default value.
row.push_back(column.defaultValueToField());
fields.push_back(column.defaultValueToField());
}

// Remove values of non-existing columns, which could be data inserted (but not flushed) before DDLs that drop some columns.
// TODO: May need to log this.
for (int i = int(row.size()) - 2; i >= 0; i -= 2)
for (int i = int(col_ids.size()) - 1; i >= 0; i--)
{
Field & col_id = row[i];
if (column_map.find(col_id.get<ColumnID>()) == column_map.end())
ColumnID col_id = col_ids[i];
if (column_map.find(col_id) == column_map.end())
{
if (!force_decode)
return std::make_tuple(block, false);

row.erase(row.begin() + i, row.begin() + i + 2);
col_ids.erase(col_ids.begin() + i, col_ids.begin() + i + 1);
fields.erase(fields.begin() + i, fields.begin() + i + 1);
}
}

if (row.size() != target_col_size)
if (col_ids.size() != target_col_size || fields.size() != target_col_size)
throw Exception("decode row error.", ErrorCodes::LOGICAL_ERROR);

/// Transform `row` to columnar format.

for (size_t i = 0; i < row.size(); i += 2)

/// Transform `row` to columnar format.
for (size_t i = 0; i < col_ids.size(); i++)
{
Field & col_id = row[i];
auto it = column_map.find(col_id.get<ColumnID>());
ColumnID col_id = col_ids[i];
auto it = column_map.find(col_id);
if (it == column_map.end())
throw Exception("col_id not found in column_map", ErrorCodes::LOGICAL_ERROR);

const auto & tp = it->second.second.type;
if (tp->isDateOrDateTime()
|| (tp->isNullable() && dynamic_cast<const DataTypeNullable *>(tp.get())->getNestedType()->isDateOrDateTime()))
{
Field & field = row[i + 1];
Field & field = fields[i];
if (field.isNull())
{
it->second.first->insert(row[i + 1]);
it->second.first->insert(fields[i]);
continue;
}
UInt64 packed = field.get<UInt64>();
Expand Down Expand Up @@ -317,7 +336,7 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
}
else
{
it->second.first->insert(row[i + 1]);
it->second.first->insert(fields[i]);
solotzg marked this conversation as resolved.
Show resolved Hide resolved

// Check overflow for potential un-synced data type widen,
// i.e. schema is old and narrow, meanwhile data is new and wide.
Expand All @@ -335,14 +354,14 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
{
// Unsigned checking by bitwise compare.
UInt64 inserted = nested_column.get64(inserted_index);
UInt64 orig = row[i + 1].get<UInt64>();
UInt64 orig = fields[i].get<UInt64>();
overflow = inserted != orig;
}
else
{
// Singed checking by arithmetical cast.
Int64 inserted = nested_column.getInt(inserted_index);
Int64 orig = row[i + 1].get<Int64>();
Int64 orig = fields[i].get<Int64>();
overflow = inserted != orig;
}
if (overflow)
Expand All @@ -352,7 +371,7 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
throw Exception(
"Detected overflow for data " + std::to_string(row[i + 1].get<UInt64>()) + " of type " + tp->getName(),
"Detected overflow for data " + std::to_string(fields[i].get<UInt64>()) + " of type " + tp->getName(),
ErrorCodes::LOGICAL_ERROR);

return std::make_tuple(block, false);
Expand All @@ -361,9 +380,16 @@ std::tuple<Block, bool> readRegionBlock(const TiDB::TableInfo & table_info,
// TODO: Consider other kind of type change? I.e. arbitrary type change.
}
}
auto end_time = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now().time_since_epoch()).count();

sum_decode += (mid_time - start_time);
sum_transform += (end_time - mid_time);
}
std::cout << "decode time: " << sum_decode << "ms" << std::endl;
std::cout << "transform time: " << sum_transform << "ms" << std::endl;
}


for (const auto & name : column_names_to_read)
{
if (name == MutableSupport::delmark_column_name)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ void RegionTable::tryFlushRegion(RegionID region_id)
if (!status)
return;

flushRegion(table_id, region_id, cache_bytes, false);
//flushRegion(table_id, region_id, cache_bytes, false);

func_update_region([&](InternalRegion & region) -> bool {
region.pause_flush = false;
Expand All @@ -443,8 +443,8 @@ bool RegionTable::tryFlushRegions()
});
}

for (auto & [id, cache_bytes] : to_flush)
flushRegion(id.first, id.second, cache_bytes);
// for (auto & [id, cache_bytes] : to_flush)
// flushRegion(id.first, id.second, cache_bytes);

{ // Now reset status information.
Timepoint now = Clock::now();
Expand Down
26 changes: 22 additions & 4 deletions dbms/src/Storages/Transaction/TiKVRecordFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,38 @@ static const UInt64 SIGN_MARK = UInt64(1) << 63;
static const size_t RAW_KEY_NO_HANDLE_SIZE = 1 + 8 + 2;
static const size_t RAW_KEY_SIZE = RAW_KEY_NO_HANDLE_SIZE + 8;

inline std::vector<Field> DecodeRow(const TiKVValue & value)
inline std::tuple<std::vector<ColumnID>, std::vector<Field>, std::unordered_set<ColumnID>> DecodeRow(const TiKVValue & value, const std::unordered_set<ColumnID> & column_ids_to_read)
{
std::vector<Field> vec;
std::vector<ColumnID> col_ids;
std::vector<Field> fields;
std::unordered_set<ColumnID> row_all_column_ids;
const String & raw_value = value.getStr();
size_t cursor = 0;
while (cursor < raw_value.size())
solotzg marked this conversation as resolved.
Show resolved Hide resolved
{
vec.push_back(DecodeDatum(cursor, raw_value));
Field f = DecodeDatum(cursor, raw_value);
if (f.isNull())
{
fields.push_back(std::move(f));
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
break;
}
ColumnID col_id = f.get<ColumnID>();
row_all_column_ids.insert(col_id);
if (!column_ids_to_read.count(col_id))
{
SkipDatum(cursor, raw_value);
}
else
{
col_ids.push_back(col_id);
fields.push_back(DecodeDatum(cursor, raw_value));
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (cursor != raw_value.size())
throw Exception("DecodeRow cursor is not end", ErrorCodes::LOGICAL_ERROR);

return vec;
return std::make_tuple(std::move(col_ids), std::move(fields), std::move(row_all_column_ids));
}

// Key format is here:
Expand Down