Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve KVStore tests #7537

Merged
merged 28 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 136 additions & 44 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ
{
auto _ = genLockGuard();

wake();
wakeNotifier();

auto region = doGetRegion(req.context().region_id());
if (region)
Expand All @@ -309,7 +309,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ
r->data = std::make_shared<RawMockReadIndexTask>();
r->data->req = std::move(req);
r->data->region = region;
tasks.push_back(r->data);
read_index_tasks.push_back(r->data);
return r;
}
return nullptr;
Expand All @@ -330,7 +330,7 @@ size_t MockRaftStoreProxy::size() const
return regions.size();
}

void MockRaftStoreProxy::wake()
void MockRaftStoreProxy::wakeNotifier()
{
notifier.wake();
}
Expand All @@ -347,17 +347,18 @@ void MockRaftStoreProxy::testRunNormal(const std::atomic_bool & over)
void MockRaftStoreProxy::runOneRound()
{
auto _ = genLockGuard();
while (!tasks.empty())
while (!read_index_tasks.empty())
{
auto & t = *tasks.front();
if (!region_id_to_drop.count(t.req.context().region_id()))
auto & t = *read_index_tasks.front();
auto region_id = t.req.context().region_id();
if (!region_id_to_drop.contains(region_id))
{
if (region_id_to_error.count(t.req.context().region_id()))
if (region_id_to_error.contains(region_id))
t.update(false, true);
else
t.update(false, false);
}
tasks.pop_front();
read_index_tasks.pop_front();
}
}

Expand All @@ -367,24 +368,40 @@ void MockRaftStoreProxy::unsafeInvokeForTest(std::function<void(MockRaftStorePro
cb(*this);
}

void MockRaftStoreProxy::bootstrap(
void MockRaftStoreProxy::bootstrapWithRegion(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::optional<std::pair<std::string, std::string>> maybe_range)
{
{
auto _ = genLockGuard();
RUNTIME_CHECK_MSG(regions.empty(), "Mock Proxy regions are not cleared");
auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
RUNTIME_CHECK_MSG(lock.regions.empty(), "KVStore regions are not cleared");
}
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id + 1, 0);
debugAddRegions(kvs, tmt, {region_id}, {maybe_range.value_or(std::make_pair(start.toString(), end.toString()))});
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
}

void MockRaftStoreProxy::debugAddRegions(
KVStore & kvs,
TMTContext & tmt,
std::vector<UInt64> region_ids,
std::vector<std::pair<std::string, std::string>> && ranges)
{
UNUSED(tmt);
int n = ranges.size();
auto _ = genLockGuard();
regions.emplace(region_id, std::make_shared<MockProxyRegion>(region_id));

auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
for (int i = 0; i < n; ++i)
{
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id + 1, 0);
auto range = maybe_range.value_or(std::make_pair(start.toString(), end.toString()));
auto region = tests::makeRegion(region_id, range.first, range.second);
lock.regions.emplace(region_id, region);
regions.emplace(region_ids[i], std::make_shared<MockProxyRegion>(region_ids[i]));
auto region = tests::makeRegion(region_ids[i], ranges[i].first, ranges[i].second, kvs.getProxyHelper());
lock.regions.emplace(region_ids[i], region);
lock.index.add(region);
}
}
Expand Down Expand Up @@ -464,7 +481,7 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::rawWrite(
}


std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index)
std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest && request, raft_cmdpb::AdminResponse && response)
{
uint64_t index = 0;
uint64_t term = 0;
Expand All @@ -477,16 +494,6 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id,
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
// We record them, as persisted raft log, for potential recovery.
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.mutable_compact_log();
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.count(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[index].term);
}
region->commands[index] = {
term,
MockProxyRegion::AdminCommand{
Expand All @@ -497,6 +504,100 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id,
return std::make_tuple(index, term);
}

std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index)
{
auto region = getRegion(region_id);
assert(region != nullptr);
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.contains(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[compact_index].term);
}
return adminCommand(region_id, std::move(request), std::move(response));
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
if (is_v2)
{
request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2);
}
else
{
request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer);
}
meta.mutable_peers()->Clear();
for (auto i : peer_ids)
{
meta.add_peers()->set_id(i);
}
*response.mutable_change_peer()->mutable_region() = meta;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composePrepareMerge(metapb::Region && target, UInt64 min_index)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge);
auto * prepare_merge = request.mutable_prepare_merge();
prepare_merge->set_min_index(min_index);
*prepare_merge->mutable_target() = target;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeCommitMerge(metapb::Region && source, UInt64 commit)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge);
auto * commit_merge = request.mutable_commit_merge();
commit_merge->set_commit(commit);
*commit_merge->mutable_source() = source;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeRollbackMerge(UInt64 commit)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge);
auto * rollback_merge = request.mutable_rollback_merge();
rollback_merge->set_commit(commit);
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeBatchSplit(std::vector<UInt64> && region_ids, std::vector<std::pair<std::string, std::string>> && ranges, metapb::RegionEpoch old_epoch)
{
RUNTIME_CHECK_MSG(region_ids.size() == ranges.size(), "error composeBatchSplit input");
auto n = region_ids.size();
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit);
metapb::RegionEpoch new_epoch;
new_epoch.set_version(old_epoch.version() + 1);
new_epoch.set_conf_ver(old_epoch.conf_ver());
{
raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits();
for (size_t i = 0; i < n; ++i)
{
auto * region = splits->add_regions();
region->set_id(region_ids[i]);
region->set_start_key(ranges[i].first);
region->set_end_key(ranges[i].second);
region->add_peers();
*region->mutable_region_epoch() = new_epoch;
}
}
return std::make_tuple(request, response);
}

void MockRaftStoreProxy::doApply(
KVStore & kvs,
TMTContext & tmt,
Expand Down Expand Up @@ -668,7 +769,7 @@ void MockRaftStoreProxy::snapshot(
term = region->getLatestCommitTerm();
}

auto new_kv_region = kvs.genRegionPtr(old_kv_region->getMetaRegion(), old_kv_region->mutMeta().peerId(), index, term);
auto new_kv_region = kvs.genRegionPtr(old_kv_region->cloneMetaRegion(), old_kv_region->mutMeta().peerId(), index, term);
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);

Expand All @@ -695,16 +796,21 @@ void MockRaftStoreProxy::snapshot(
new_kv_region->setApplied(index, term);
}

TableID MockRaftStoreProxy::bootstrap_table(
TableID MockRaftStoreProxy::bootstrapTable(
Context & ctx,
KVStore & kvs,
TMTContext & tmt)
TMTContext & tmt,
bool drop_at_first)
{
UNUSED(kvs);
ColumnsDescription columns;
auto & data_type_factory = DataTypeFactory::instance();
columns.ordinary = NamesAndTypesList({NameAndTypePair{"a", data_type_factory.get("Int64")}});
auto tso = tmt.getPDClient()->getTS();
if (drop_at_first)
{
MockTiDB::instance().dropDB(ctx, "d", true);
}
MockTiDB::instance().newDataBase("d");
// Make sure there is a table with smaller id.
MockTiDB::instance().newTable("d", "prevt" + toString(random()), columns, tso, "", "dt");
Expand All @@ -716,20 +822,6 @@ TableID MockRaftStoreProxy::bootstrap_table(
return table_id;
}

void MockRaftStoreProxy::clear_tables(
Context & ctx,
KVStore & kvs,
TMTContext & tmt)
{
UNUSED(kvs);
UNUSED(tmt);
if (this->table_id != 1)
{
MockTiDB::instance().dropTable(ctx, "d", "t", false);
}
this->table_id = 1;
}

void GCMonitor::add(RawObjType type, int64_t diff)
{
auto _ = genLockGuard();
Expand Down
47 changes: 35 additions & 12 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ struct MockRaftStoreProxy : MutexLockWrap

size_t size() const;

void wake();
void wakeNotifier();

void testRunNormal(const std::atomic_bool & over);

/// Handle one read index task.
void runOneRound();

void unsafeInvokeForTest(std::function<void(MockRaftStoreProxy &)> && cb);
Expand All @@ -183,27 +184,33 @@ struct MockRaftStoreProxy : MutexLockWrap
Type type = NORMAL;
};

/// boostrap a region.
void bootstrap(
/// Boostrap with a given region.
/// Similar to TiKV's `bootstrap_region`.
void bootstrapWithRegion(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::optional<std::pair<std::string, std::string>> maybe_range);

/// boostrap a table, since applying snapshot needs table schema.
TableID bootstrap_table(
/// Boostrap a table.
/// Must be called if:
/// 1. Applying snapshot which needs table schema
/// 2. Doing row2col.
TableID bootstrapTable(
Context & ctx,
KVStore & kvs,
TMTContext & tmt);
TMTContext & tmt,
bool drop_at_first = true);

/// clear tables.
void clear_tables(
Context & ctx,
/// Manually add a region.
void debugAddRegions(
KVStore & kvs,
TMTContext & tmt);
TMTContext & tmt,
std::vector<UInt64> region_ids,
std::vector<std::pair<std::string, std::string>> && ranges);

/// We assume that we generate one command, and immediately commit.
/// normal write to a region.
/// Normal write to a region.
std::tuple<uint64_t, uint64_t> normalWrite(
UInt64 region_id,
std::vector<HandleID> && keys,
Expand All @@ -221,6 +228,14 @@ struct MockRaftStoreProxy : MutexLockWrap
/// Create a compactLog admin command, returns (index, term) of the admin command itself.
std::tuple<uint64_t, uint64_t> compactLog(UInt64 region_id, UInt64 compact_index);

std::tuple<uint64_t, uint64_t> adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest &&, raft_cmdpb::AdminResponse &&);

static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2 = true);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composePrepareMerge(metapb::Region && target, UInt64 min_index);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeCommitMerge(metapb::Region && source, UInt64 commit);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeRollbackMerge(UInt64 commit);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeBatchSplit(std::vector<UInt64> && region_ids, std::vector<std::pair<std::string, std::string>> && ranges, metapb::RegionEpoch old_epoch);

struct Cf
{
Cf(UInt64 region_id_, TableID table_id_, ColumnFamilyType type_);
Expand Down Expand Up @@ -273,16 +288,24 @@ struct MockRaftStoreProxy : MutexLockWrap
uint64_t region_id,
uint64_t to);

void clear()
{
auto _ = genLockGuard();
regions.clear();
}

MockRaftStoreProxy()
{
log = Logger::get("MockRaftStoreProxy");
table_id = 1;
}

// Mock Proxy will drop read index requests to these regions
std::unordered_set<uint64_t> region_id_to_drop;
// Mock Proxy will return error read index response to these regions
std::unordered_set<uint64_t> region_id_to_error;
std::map<uint64_t, MockProxyRegionPtr> regions;
std::list<std::shared_ptr<RawMockReadIndexTask>> tasks;
std::list<std::shared_ptr<RawMockReadIndexTask>> read_index_tasks;
AsyncWaker::Notifier notifier;
TableID table_id;
LoggerPtr log;
Expand Down
Loading