Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-204] Wrong Process About Applying Snapshot And Deleting Data #27

Merged
merged 3 commits into from
Mar 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,28 +272,4 @@ size_t executeQueryAndCountRows(Context & context,const std::string & query)
return count;
}

std::vector<std::tuple<HandleID, HandleID, RegionID>> getRegionRanges(
Context& context, TableID table_id, std::vector<RegionRange>* vec = nullptr)
{
std::vector<std::tuple<HandleID, HandleID, RegionID>> handle_ranges;
auto callback = [&](Regions regions)
{
for (auto region : regions)
{
auto [start_key, end_key] = region->getRange();
HandleID start_handle = TiKVRange::getRangeHandle<true>(start_key, table_id);
HandleID end_handle = TiKVRange::getRangeHandle<false>(end_key, table_id);
handle_ranges.push_back({start_handle, end_handle, region->id()});
if (vec)
{
vec->push_back(region->getRange());
}
}
};

TMTContext & tmt = context.getTMTContext();
tmt.region_table.traverseRegionsByTable(table_id, callback);
return handle_ranges;
}

}
10 changes: 8 additions & 2 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,14 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_

{
TMTContext & tmt = context.getTMTContext();
tmt.region_table.traverseRegionsByTable(table_info.id, [&](Regions d) {
regions.insert(regions.end(), d.begin(), d.end());
tmt.region_table.traverseRegionsByTable(table_info.id, [&](std::vector<std::pair<RegionID, RegionPtr>> & d) {
for (auto && [_, r] : d)
{
std::ignore = _;
if (r == nullptr)
continue;
regions.push_back(r);
}
});
}

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
ranges.push_back(region.range_in_table);
});

// ranged may overlap, should merge them
std::sort(ranges.begin(), ranges.end());
size_t size = 0;
for (size_t i = 1; i < ranges.size(); ++i)
{
if (ranges[i].first == ranges[size].second)
ranges[size].second = ranges[i].second;
if (ranges[i].first <= ranges[size].second)
ranges[size].second = std::max(ranges[i].second, ranges[size].second);
else
ranges[++size] = ranges[i];
}
Expand Down
95 changes: 58 additions & 37 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
bool is_txn_engine = data.merging_params.mode == MergeTreeData::MergingParams::Txn;

std::vector<RegionQueryInfo> regions_query_info = query_info.regions_query_info;
std::vector<bool> regions_query_res;
RegionMap kvstore_region;
std::vector<RegionTable::RegionReadStatus> regions_query_res;
BlockInputStreams region_block_data;
String handle_col_name;
size_t region_cnt = 0;
Expand Down Expand Up @@ -277,18 +278,23 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

if (!select.no_kvstore && regions_query_info.empty())
{
tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) {
for (const auto & region : regions)
tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](std::vector<std::pair<RegionID, RegionPtr>> & regions) {
for (const auto & [id, region]: regions)
{
regions_query_info.push_back({region->id(), region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)});
kvstore_region.emplace(id, region);
if (region == nullptr)
// maybe region is removed.
regions_query_info.push_back({id, InvalidRegionVersion, InvalidRegionVersion, {0, 0}});
else
regions_query_info.push_back({id, region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)});
}
});
}

std::sort(regions_query_info.begin(), regions_query_info.end());
region_cnt = regions_query_info.size();
region_range_parts.assign(region_cnt, {});
regions_query_res.assign(region_cnt, true);
regions_query_res.assign(region_cnt, RegionTable::OK);
region_block_data.assign(region_cnt, nullptr);
rows_in_mem.assign(region_cnt, 0);

Expand All @@ -298,36 +304,42 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

// get data block from region first.

ThreadPool pool(num_streams);
ThreadPool pool(std::min(region_cnt, num_streams));
for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size)
{
pool.schedule([&, region_begin, size] {
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)

std::vector<RegionID> region_ids;

for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, region_query_info.conf_version,
data.table_info, data.getColumns(), column_names_to_read,
auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion(
data.table_info.id, kvstore_region[region_query_info.region_id], region_query_info.version,
region_query_info.conf_version, data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);

regions_query_res[region_index] = status;

if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
std::vector<RegionID> region_ids;
for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
region_ids.push_back(regions_query_info[region_index].region_id);
}
throw RegionException(region_ids);
} else {
LOG_WARNING(log, "Region " << region_query_info.region_id << ", version "
<< region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
region_ids.push_back(region_query_info.region_id);
}
else
{
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
}
}

if (!region_ids.empty())
throw RegionException(region_ids);
});
}

Expand Down Expand Up @@ -706,32 +718,38 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
else
{
TMTContext & tmt = context.getTMTContext();
std::vector<RegionID> region_ids;

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
if (select.no_kvstore)
continue;

if (!regions_query_res[region_index])
if (regions_query_res[region_index] != RegionTable::OK)
continue;

const RegionQueryInfo & region_query_info = regions_query_info[region_index];

if (tmt.kvstore->getRegion(region_query_info.region_id) == nullptr)
{
// Region may be removed.
// If region is in kvstore, even if its state is pending_remove, the new parts with del data are not flushed into ch.
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(RegionTable::RegionReadStatus::NOT_FOUND));
std::vector<RegionID> region_ids;
for (size_t region_index = 0; region_index < region_cnt; ++region_index)
auto region = tmt.kvstore->getRegion(region_query_info.region_id);
RegionTable::RegionReadStatus status = RegionTable::OK;
if (region != kvstore_region[region_query_info.region_id])
status = RegionTable::NOT_FOUND;
else if (region->version() != region_query_info.version)
status = RegionTable::VERSION_ERROR;

if (status != RegionTable::OK)
{
region_ids.push_back(regions_query_info[region_index].region_id);
regions_query_res[region_index] = status;
// ABA problem may cause because one region is removed and inserted back.
// if the version of region is changed, the part may has less data because of compaction.
LOG_WARNING(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
region_ids.push_back(region_query_info.region_id);
continue;
}
throw RegionException(region_ids);
}

size_t sum_marks = 0;
Expand Down Expand Up @@ -761,6 +779,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
<< region_range_parts[region_index].size() << " parts, " << sum_marks << " marks to read from "
<< sum_ranges << " ranges, read " << rows_in_mem[region_index] << " rows from memory");
}

if (!region_ids.empty())
throw RegionException(region_ids);
}

if (parts_with_ranges.empty() && !is_txn_engine)
Expand Down Expand Up @@ -799,7 +820,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
if (!regions_query_res[region_index])
if (regions_query_res[region_index] != RegionTable::OK)
continue;
auto region_input_stream = region_block_data[region_index];
if (region_input_stream)
Expand All @@ -814,7 +835,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
BlockInputStreams union_regions_stream;
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
if (!regions_query_res[region_index])
if (regions_query_res[region_index] != RegionTable::OK)
continue;

const RegionQueryInfo & region_query_info = regions_query_info[region_index];
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context)
{
TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr;

auto table_ids = RegionTable::getRegionTableIds(new_region);

{
std::lock_guard<std::mutex> lock(task_mutex);

Expand All @@ -75,12 +77,18 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context)
std::lock_guard<std::mutex> lock(mutex);
regions[region_id] = new_region;
}

if (tmt_ctx)
tmt_ctx->region_table.applySnapshotRegion(new_region, table_ids);

if (new_region->isPendingRemove())
{
removeRegion(region_id, context);
return;
}
}

region_persister.persist(new_region);

if (tmt_ctx)
tmt_ctx->region_table.applySnapshotRegion(new_region);
}

void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx)
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/Transaction/PartitionDataMover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ErrorCodes

namespace PartitionDataMover
{

/*
BlockInputStreamPtr createBlockInputStreamFromRange(const Context& context, StorageMergeTree* storage,
const HandleID begin, const HandleID excluded_end)
{
Expand Down Expand Up @@ -106,9 +106,9 @@ void increaseVersionInBlock(Block & block, size_t increasement = 1)
block.insert(version_col_pos, ColumnWithTypeAndName{std::move(version_new_col),
version_type, MutableSupport::version_column_name});
}

*/
} // namespace PartitionDataMover

/*
void deleteRange(const Context& context, StorageMergeTree* storage,
const HandleID begin, const HandleID excluded_end)
{
Expand All @@ -132,7 +132,6 @@ void deleteRange(const Context& context, StorageMergeTree* storage,
output.writeSuffix();
}

/*
// TODO: use `new_ver = old_ver+1` to delete data is not a good way, may conflict with data in the future
void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage,
UInt64 src_partition_id, UInt64 dest_partition_id, const Field & begin, const Field & excluded_end)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/PartitionDataMover.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
namespace DB
{


/*
/// Remove range from this partition.
/// Note that [begin, excluded_end) is not necessarily to locate in the range of this partition (or table).
void deleteRange(const Context& context, StorageMergeTree* storage,
const HandleID begin, const HandleID excluded_end);

/*

/// Move data in [begin, excluded_end) from src_partition_id to dest_partition_id.
/// FIXME/TODO: currently this function is not atomic and need to fix.
void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage,
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include <functional>

#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/PartitionDataMover.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -15,6 +14,17 @@ namespace DB
std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTable::getBlockInputStreamByRegion(TMTContext & tmt,
TableID table_id,
const RegionID region_id,
const TiDB::TableInfo & table_info,
const ColumnsDescription & columns,
const Names & ordered_columns,
std::vector<TiKVKey> * keys)
{
return getBlockInputStreamByRegion(
table_id, tmt.kvstore->getRegion(region_id), InvalidRegionVersion, InvalidRegionVersion, table_info, columns, ordered_columns, false, false, 0, keys);
}

std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTable::getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
const RegionVersion region_version,
const RegionVersion conf_version,
const TiDB::TableInfo & table_info,
Expand All @@ -25,7 +35,6 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
UInt64 start_ts,
std::vector<TiKVKey> * keys)
{
auto region = tmt.kvstore->getRegion(region_id);
if (!region)
return {nullptr, NOT_FOUND, 0};

Expand All @@ -42,7 +51,7 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
if (region->isPendingRemove())
return {nullptr, PENDING_REMOVE, 0};

if (region_version != InvalidRegionVersion && region->version() != region_version && region->confVer() != conf_version)
if (region_version != InvalidRegionVersion && (region->version() != region_version || region->confVer() != conf_version))
return {nullptr, VERSION_ERROR, 0};

{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,4 +625,6 @@ void Region::reset(Region && new_region)
meta.notifyAll();
}

bool Region::isPeerRemoved() const { return meta.isPeerRemoved(); }

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class Region : public std::enable_shared_from_this<Region>

bool isPendingRemove() const;
void setPendingRemove();
bool isPeerRemoved() const;

size_t dataSize() const;

Expand Down
Loading