Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-479 select from empty table throw error in tiflash #223

Merged
merged 2 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
}
else
{
region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, region_id);
region = context.getTMTContext().getKVStore()->getRegion(region_id);
if (!region)
throw Exception("No such region", ErrorCodes::BAD_ARGUMENTS);
}
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
region_version(region_version_),
region_conf_version(region_conf_version_),
dag_response(dag_response_),
internal(internal_)
internal(internal_),
log(&Logger::get("DAGDriver"))
{}

void DAGDriver::execute()
Expand Down Expand Up @@ -98,10 +99,12 @@ catch (const LockException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
recordError(e.code(), e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what());
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class DAGDriver

bool internal;

Poco::Logger * log;

void recordError(Int32 err_code, const String & err_msg);
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
Expand All @@ -16,6 +17,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
Expand Down Expand Up @@ -136,7 +138,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.region_id = dag.getRegionID();
info.version = dag.getRegionVersion();
info.conf_version = dag.getRegionConfVersion();
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
auto current_region = context.getTMTContext().getKVStore()->getRegion(info.region_id);
if (!current_region)
{
std::vector<RegionID> region_ids;
Expand All @@ -148,6 +150,11 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);

if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
}

pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); });

/// Set the limits and quota for reading data, the speed and time of the query.
Expand Down Expand Up @@ -178,7 +185,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
});
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,24 +501,6 @@ std::vector<std::pair<RegionID, RegionPtr>> RegionTable::getRegionsByTable(const
return regions;
}

RegionPtr RegionTable::getRegionByTableAndID(const TableID table_id, const RegionID region_id)
{
auto & kvstore = context.getTMTContext().getKVStore();
{
std::lock_guard<std::mutex> lock(mutex);
auto & table = getOrCreateTable(table_id);

for (const auto & region_info : table.regions)
{
if (region_info.second.region_id == region_id)
{
return kvstore->getRegion(region_info.second.region_id);
}
}
}
return nullptr;
}

void RegionTable::mockDropRegionsInTable(TableID table_id)
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ class RegionTable : private boost::noncopyable
void traverseInternalRegions(std::function<void(TableID, InternalRegion &)> && callback);
void traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback);
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id);
RegionPtr getRegionByTableAndID(const TableID table_id, const RegionID region_id);

/// Write the data of the given region into the table with the given table ID, fill the data list for outer to remove.
/// Will trigger schema sync on read error for only once,
Expand Down