Skip to content

Commit

Permalink
Fix null region client after region restored (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 authored Mar 21, 2019
1 parent bb461d0 commit 1a19638
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 30 deletions.
8 changes: 6 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir, Context *, std::vector<RegionID> * regions_to_remove) : region_persister(data_dir), log(&Logger::get("KVStore"))
KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
}

void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove)
{
std::lock_guard<std::mutex> lock(mutex);
region_persister.restore(regions);
region_persister.restore(regions, region_client_create);

// Remove regions which pending_remove = true, those regions still exist because progress crash after persisted and before removal.
if (regions_to_remove != nullptr)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds
class KVStore final : private boost::noncopyable
{
public:
KVStore(const std::string & data_dir, Context * context = nullptr, std::vector<RegionID> * regions_to_remove = nullptr);
KVStore(const std::string & data_dir);
void restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove = nullptr);

RegionPtr getRegion(RegionID region_id);
void traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback);

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta)
RegionPtr new_region;
if (client != nullptr)
new_region = std::make_shared<Region>(
meta, std::make_shared<pingcap::kv::RegionClient>(client->cache, client->client, meta.getRegionVerID()));
meta, [&](pingcap::kv::RegionVerID) { return std::make_shared<pingcap::kv::RegionClient>(client->cache, client->client, meta.getRegionVerID()); });
else
new_region = std::make_shared<Region>(meta);

Expand Down Expand Up @@ -460,14 +460,14 @@ size_t Region::serialize(WriteBuffer & buf)
return total_size;
}

RegionPtr Region::deserialize(ReadBuffer & buf)
RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create)
{
auto version = readBinary2<UInt32>(buf);
if (version != Region::CURRENT_VERSION)
throw Exception("Unexpected region version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION),
ErrorCodes::UNKNOWN_FORMAT_VERSION);

auto region = std::make_shared<Region>(RegionMeta::deserialize(buf));
auto region = std::make_shared<Region>(RegionMeta::deserialize(buf), region_client_create);

auto size = readBinary2<KVMap::size_type>(buf);
for (size_t i = 0; i < size; ++i)
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ class Region : public std::enable_shared_from_this<Region>

explicit Region(const RegionMeta & meta_) : meta(meta_), client(nullptr), log(&Logger::get("Region")) {}

explicit Region(RegionMeta && meta_, pingcap::kv::RegionClientPtr client_)
: meta(std::move(meta_)), client(client_), log(&Logger::get("Region"))
using RegionClientCreateFunc = std::function<pingcap::kv::RegionClientPtr(pingcap::kv::RegionVerID)>;

explicit Region(RegionMeta && meta_, const RegionClientCreateFunc & region_client_create)
: meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID ())), log(&Logger::get("Region"))
{}

explicit Region(const RegionMeta & meta_, const pingcap::kv::RegionClientPtr & client_)
: meta(meta_), client(client_), log(&Logger::get("Region"))
explicit Region(const RegionMeta & meta_, const RegionClientCreateFunc & region_client_create)
: meta(meta_), client(region_client_create(meta.getRegionVerID())), log(&Logger::get("Region"))
{}

TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value);
Expand All @@ -174,7 +176,7 @@ class Region : public std::enable_shared_from_this<Region>
std::unique_ptr<CommittedScanRemover> createCommittedScanRemover(TableID expected_table_id);

size_t serialize(WriteBuffer & buf);
static RegionPtr deserialize(ReadBuffer & buf);
static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create);

void calculateCfCrc32(Crc32 & crc32) const;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/RegionFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ RegionID RegionFile::Reader::hasNext()
return next_region_meta->region_id;
}

RegionPtr RegionFile::Reader::next()
RegionPtr RegionFile::Reader::next(const Region::RegionClientCreateFunc & region_create_func)
{
next_region_offset += next_region_meta->region_size;
return Region::deserialize(data_file_buf);
return Region::deserialize(data_file_buf, region_create_func);
}

void RegionFile::Reader::skipNext()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RegionFile
void checkHash(const std::vector<bool> & use);

RegionID hasNext();
RegionPtr next();
RegionPtr next(const Region::RegionClientCreateFunc & region_create_func);
void skipNext();

const std::vector<PersistMeta> & regionMetas() { return metas; }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ std::vector<bool> valid_regions_in_file(std::vector<RegionFile::Reader::PersistM
return use;
}

void RegionPersister::restore(RegionMap & regions)
void RegionPersister::restore(RegionMap & regions, const Region::RegionClientCreateFunc & region_client_create)
{
std::lock_guard<std::mutex> persist_lock(persist_mutex);
std::lock_guard<std::mutex> map_lock(region_map_mutex);
Expand Down Expand Up @@ -171,7 +171,7 @@ void RegionPersister::restore(RegionMap & regions)
{
if (use[index])
{
regions.emplace(region_id, reader.next());
regions.emplace(region_id, reader.next(region_client_create));
file->addRegion(region_id, metas[index].region_size);
}
else
Expand Down Expand Up @@ -251,7 +251,7 @@ bool RegionPersister::gc()
{
if (use[index] && migrate_region_ids.count(region_id))
{
auto region = reader.next();
auto region = reader.next([](pingcap::kv::RegionVerID) -> pingcap::kv::RegionClientPtr { return nullptr; });
auto region_size = gc_file_writer.write(region);
{
std::lock_guard<std::mutex> map_lock(region_map_mutex);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RegionPersister final : private boost::noncopyable

void drop(UInt64 region_id);
void persist(const RegionPtr & region);
void restore(RegionMap &);
void restore(RegionMap &, const Region::RegionClientCreateFunc &);
bool gc();

// For tests.
Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ namespace DB
{

TMTContext::TMTContext(Context & context, std::vector<String> addrs)
: kvstore(std::make_shared<KVStore>(context.getPath() + "kvstore/", &context, &regions_to_remove)),
: kvstore(std::make_shared<KVStore>(context.getPath() + "kvstore/")),
region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)),
schema_syncer(std::make_shared<HttpJsonSchemaSyncer>()),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
: static_cast<pingcap::pd::IClient *>(new pingcap::pd::Client(addrs))),
region_cache(std::make_shared<pingcap::kv::RegionCache>(pd_client))
region_cache(std::make_shared<pingcap::kv::RegionCache>(pd_client)),
rpc_client(std::make_shared<pingcap::kv::RpcClient>())
{
kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr {
return this->createRegionClient(id);
}, &regions_to_remove);
for (RegionID id : regions_to_remove)
kvstore->removeRegion(id, &context);
regions_to_remove.clear();
Expand Down Expand Up @@ -44,12 +48,16 @@ void TMTContext::setPDClient(pingcap::pd::ClientPtr rhs)
pd_client = rhs;
}

pingcap::kv::RegionClientPtr TMTContext::createRegionClient(pingcap::kv::RegionVerID region_version_id) const
{
std::lock_guard<std::mutex> lock(mutex);
return pd_client->isMock() ? nullptr : std::make_shared<pingcap::kv::RegionClient>(region_cache, rpc_client, region_version_id);
}

pingcap::kv::RegionCachePtr TMTContext::getRegionCache() const { return region_cache; }

pingcap::kv::RpcClientPtr TMTContext::getRpcClient()
{
if (rpc_client == nullptr)
rpc_client = std::make_shared<pingcap::kv::RpcClient>();
return rpc_client;
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class TMTContext
pingcap::pd::ClientPtr getPDClient() const;
void setPDClient(pingcap::pd::ClientPtr);

pingcap::kv::RegionClientPtr createRegionClient(pingcap::kv::RegionVerID region_version_id) const;

pingcap::kv::RegionCachePtr getRegionCache() const;

pingcap::kv::RpcClientPtr getRpcClient();
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Storages/Transaction/applySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context)
const auto & state = request.state();
pingcap::kv::RegionClientPtr region_client = nullptr;
auto meta = RegionMeta(state.peer(), state.region(), state.apply_state());
if (context) {
auto & tmt_ctx = context->getTMTContext();
auto pd_client = tmt_ctx.getPDClient();
if (!pd_client->isMock()) {
auto region_cache = tmt_ctx.getRegionCache();
region_client = std::make_shared<pingcap::kv::RegionClient>(region_cache, tmt_ctx.getRpcClient(), meta.getRegionVerID());
Region::RegionClientCreateFunc region_client_create = [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr {
if (context)
{
auto & tmt_ctx = context->getTMTContext();
return tmt_ctx.createRegionClient(id);
}
}
auto region = std::make_shared<Region>(meta, region_client);
return nullptr;
};
auto region = std::make_shared<Region>(meta, region_client_create);

LOG_INFO(log, "Region " << region->id() << " apply snapshot " << region->toString(true));

Expand Down

0 comments on commit 1a19638

Please sign in to comment.