Skip to content

Commit

Permalink
FLASH-479 select from empty table throw error in tiflash (#223)
Browse files Browse the repository at this point in the history
* 1. select from empty table throw error in tiflash, 2. add some logs, 3. disable timestamp literal in DAG request

* revert unrelated change
  • Loading branch information
windtalker authored and zanmato1984 committed Sep 6, 2019
1 parent 17aacde commit 6b14b38
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 23 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
}
else
{
region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, region_id);
region = context.getTMTContext().getKVStore()->getRegion(region_id);
if (!region)
throw Exception("No such region", ErrorCodes::BAD_ARGUMENTS);
}
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
region_version(region_version_),
region_conf_version(region_conf_version_),
dag_response(dag_response_),
internal(internal_)
internal(internal_),
log(&Logger::get("DAGDriver"))
{}

void DAGDriver::execute()
Expand Down Expand Up @@ -98,10 +99,12 @@ catch (const LockException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
recordError(e.code(), e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what());
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class DAGDriver

bool internal;

Poco::Logger * log;

void recordError(Int32 err_code, const String & err_msg);
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
Expand All @@ -16,6 +17,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
Expand Down Expand Up @@ -136,7 +138,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.region_id = dag.getRegionID();
info.version = dag.getRegionVersion();
info.conf_version = dag.getRegionConfVersion();
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
auto current_region = context.getTMTContext().getKVStore()->getRegion(info.region_id);
if (!current_region)
{
std::vector<RegionID> region_ids;
Expand All @@ -148,6 +150,11 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);

if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
}

pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); });

/// Set the limits and quota for reading data, the speed and time of the query.
Expand Down Expand Up @@ -178,7 +185,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
});
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,24 +529,6 @@ std::vector<std::pair<RegionID, RegionPtr>> RegionTable::getRegionsByTable(const
return regions;
}

RegionPtr RegionTable::getRegionByTableAndID(const TableID table_id, const RegionID region_id)
{
auto & kvstore = context.getTMTContext().getKVStore();
{
std::lock_guard<std::mutex> lock(mutex);
auto & table = getOrCreateTable(table_id);

for (const auto & region_info : table.regions)
{
if (region_info.second.region_id == region_id)
{
return kvstore->getRegion(region_info.second.region_id);
}
}
}
return nullptr;
}

void RegionTable::mockDropRegionsInTable(TableID table_id)
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ class RegionTable : private boost::noncopyable
void traverseInternalRegions(std::function<void(TableID, InternalRegion &)> && callback);
void traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback);
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id);
RegionPtr getRegionByTableAndID(const TableID table_id, const RegionID region_id);

/// Write the data of the given region into the table with the given table ID, fill the data list for outer to remove.
/// Will trigger schema sync on read error for only once,
Expand Down

0 comments on commit 6b14b38

Please sign in to comment.