Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support new reading process #13

Merged
merged 8 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ namespace ErrorCodes
extern const int DECIMAL_OVERFLOW_ERROR = 9004;
extern const int FILE_SIZE_NOT_MATCH = 9005;
extern const int LOCK_EXCEPTION = 10000;
extern const int VERSION_ERROR = 10001;
extern const int REGION_MISS = 10002;
}

}
1 change: 1 addition & 0 deletions dbms/src/Core/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ namespace Protocol
Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request.
LockInfos = 100, /// Lock infos of some pending transactions.
RegionException = 101, /// Lock infos of some pending transactions.
};

/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/Transaction/TiKVKeyValue.h>

#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
Expand All @@ -45,6 +46,7 @@
#include <Columns/Collator.h>
#include <Common/typeid_cast.h>

#include <google/protobuf/text_format.h>

namespace ProfileEvents
{
Expand Down Expand Up @@ -663,6 +665,27 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
query_info.resolve_locks = settings.resolve_locks;
query_info.read_tso = settings.read_tso;

String request_str = settings.regions;

if (request_str.size() > 0) {
::flashpb::FlashRequest req;
::google::protobuf::TextFormat::ParseFromString(request_str, &req);
for (int i = 0; i < req.regions_size(); i++) {
auto region = req.regions(i);
RegionQueryInfo info;
info.region_id = region.region().id();
auto epoch = region.region().region_epoch();
info.version = epoch.version();
info.conf_version = epoch.conf_ver();

auto table_id = static_cast<StorageMergeTree*>(storage.get()) -> getTableInfo().id;
Int64 start_key = TiKVRange::getRangeHandle<true, true>(TiKVKey(region.region().start_key()), table_id);
Int64 end_key = TiKVRange::getRangeHandle<false, true>(TiKVKey(region.region().end_key()), table_id);
info.range_in_table = HandleRange(start_key, end_key);
query_info.regions_query_info.push_back(info);
}
}

/// PREWHERE optimization
{
auto optimize_prewhere = [&](auto & merge_tree)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct Settings
*/

#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "the region need to be read.") \
M(SettingBool, resolve_locks, false, "tmt read tso.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <Storages/Transaction/LockException.h>
#include <Interpreters/SharedQueries.h>
#include <Storages/Transaction/RegionException.h>

namespace ProfileEvents
{
Expand Down Expand Up @@ -154,6 +155,7 @@ void TCPHandler::runImpl()
*/
std::unique_ptr<Exception> exception;
Region::LockInfos lock_infos;
std::vector<UInt64> region_ids;
bool network_error = false;

String shared_query_id;
Expand Down Expand Up @@ -228,6 +230,10 @@ void TCPHandler::runImpl()
state.io.onException();
lock_infos = std::move(e.lock_infos);
}
catch (RegionException & e)
{
sendRegionException(e.region_ids);
}
catch (const Exception & e)
{
state.io.onException();
Expand Down Expand Up @@ -846,6 +852,13 @@ void TCPHandler::sendException(const Exception & e)
out->next();
}

void TCPHandler::sendRegionException(const std::vector<UInt64> & region_ids) {
writeVarUInt(Protocol::Server::RegionException, *out);
writeVarUInt(region_ids.size(), *out);
for (size_t i = 0; i < region_ids.size(); i++)
writeVarUInt(region_ids[i], *out);
out->next();
}

void TCPHandler::sendLockInfos(const Region::LockInfos & lock_infos)
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/TCPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendException(const Exception & e);
void sendRegionException(const std::vector<UInt64> & region_ids);
void sendLockInfos(const Region::LockInfos & lock_infos);
void sendProgress();
void sendEndOfStream();
Expand Down
23 changes: 11 additions & 12 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace std
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/Transaction/TMTContext.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/Transaction/RegionException.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>

Expand Down Expand Up @@ -79,6 +80,8 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int ILLEGAL_COLUMN;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int VERSION_ERROR;
extern const int REGION_MISS;
}


Expand Down Expand Up @@ -601,23 +604,14 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

// TODO: set regions_query_info from setting.

std::vector<RegionQueryInfo> regions_query_info;
std::vector<RegionQueryInfo> regions_query_info = query_info.regions_query_info;
std::vector<bool> regions_query_res;
BlockInputStreams region_block_data;
String handle_col_name;

if (is_txn_engine)
{
handle_col_name = data.primary_expr_ast->children[0]->getColumnName();

TMTContext & tmt = context.getTMTContext();

tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) {
for (const auto & region : regions)
{
regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)});
}
});
}

std::sort(regions_query_info.begin(), regions_query_info.end());
Expand Down Expand Up @@ -671,7 +665,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info.id, region_query_info.region_id, region_query_info.version, region_query_info.conf_version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);
if (status != RegionTable::OK)
Expand All @@ -681,7 +675,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
<< ", handle range [" << regions_query_info[region_index].range_in_table.first
<< ", " << regions_query_info[region_index].range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
continue;
std::vector<RegionID> region_ids;
for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
region_ids.push_back(regions_query_info[region_index].region_id);
}
throw RegionException(region_ids);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
}
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/SelectQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

#include <memory>
#include <unordered_map>

#include <kvproto/flash.pb.h>
#include <Storages/Transaction/Region.h>

namespace DB
{
Expand Down Expand Up @@ -32,6 +33,8 @@ struct SelectQueryInfo
bool resolve_locks = false;

UInt64 read_tso;

std::vector<RegionQueryInfo> regions_query_info;
};

}
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
TableID table_id,
const RegionID region_id,
const RegionVersion region_version,
const RegionVersion conf_version,
const TiDB::TableInfo & table_info,
const ColumnsDescription & columns,
const Names & ordered_columns,
Expand Down Expand Up @@ -43,7 +44,7 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
if (region->isPendingRemove())
return {nullptr, PENDING_REMOVE, 0};

if (region_version != InvalidRegionVersion && region->version() != region_version)
if (region_version != InvalidRegionVersion && region->version() != region_version && region->confVer() != conf_version)
return {nullptr, VERSION_ERROR, 0};

{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_
{
continue;
}
std::cout<<"got primary lock: "<<primary<<std::endl;
return std::make_unique<LockInfo>(LockInfo{primary, ts, decode_key, ttl});
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct RegionQueryInfo
{
RegionID region_id;
UInt64 version;
UInt64 conf_version;
HandleRange range_in_table;

bool operator < (const RegionQueryInfo & o) const
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Storages/Transaction/RegionException.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Transaction/Region.h>

namespace DB
{

class RegionException : public Exception
{
public:
explicit RegionException(std::vector<RegionID> region_ids_): region_ids(region_ids_) {}

std::vector<RegionID> region_ids;
};

}
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res
// TODO: confirm names is right
Names names = columns.getNamesOfPhysical();
auto [input, status, tol] = getBlockInputStreamByRegion(
table_id, region_id, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove);
table_id, region_id, InvalidRegionVersion, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove);
if (input == nullptr)
return;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class RegionTable : private boost::noncopyable
std::tuple<BlockInputStreamPtr, RegionReadStatus, size_t> getBlockInputStreamByRegion(TableID table_id,
const RegionID region_id,
const RegionVersion region_version,
const RegionVersion conf_version,
const TiDB::TableInfo & table_info,
const ColumnsDescription & columns,
const Names & ordered_columns,
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/Transaction/TiKVKeyValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ inline UInt64 getTsFromWriteCf(const TiKVValue & value)
namespace TiKVRange
{

template <bool start>
template <bool start, bool decoded = false>
inline HandleID getRangeHandle(const TiKVKey & tikv_key, const TableID table_id)
{
constexpr HandleID min = std::numeric_limits<HandleID>::min();
Expand All @@ -445,7 +445,12 @@ inline HandleID getRangeHandle(const TiKVKey & tikv_key, const TableID table_id)
return max;
}

const auto key = std::get<0>(RecordKVFormat::decodeTiKVKey(tikv_key));
String key;
if constexpr (decoded) {
key = tikv_key.getStr();
} else {
key = std::get<0>(RecordKVFormat::decodeTiKVKey(tikv_key));
}

if (key <= RecordKVFormat::genRawKey(table_id, min))
return min;
Expand Down