Skip to content

Commit

Permalink
curvefs/client: bug fix about memcache
Browse files Browse the repository at this point in the history
1. When running the vdbech task, the memory usage of the mount point is very high
2. When running the vdbech task, the mount will coredump or the task will have data inconsistency
3. The memcache server fails over and then restarts and cannot be used by the client anymore

Signed-off-by: ilixiaocui <ilixiaocui@163.com>
  • Loading branch information
ilixiaocui authored and opencurveadmin committed Mar 2, 2023
1 parent 5ccb537 commit 9d5a18a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 33 deletions.
28 changes: 11 additions & 17 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ namespace client {
KVClientManager *g_kvClientManager = nullptr;
KVClientMetric *g_kvClientMetric = nullptr;

#define ONRETURN(TYPE, RES, KEY, ERRORLOG) \
#define ONRETURN(TYPE, RES) \
if (RES) { \
g_kvClientMetric->kvClient##TYPE.qps.count << 1; \
VLOG(9) << "##TYPE key = " << KEY << " OK"; \
} else { \
g_kvClientMetric->kvClient##TYPE.eps.count << 1; \
LOG(ERROR) << "##TYPE key = " << KEY << " error = " << ERRORLOG; \
}

bool KVClientManager::Init(const KVClientManagerOpt &config,
Expand All @@ -63,27 +61,23 @@ void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {
std::string error_log;
auto res =
client_->Set(task->key, task->value, task->length, &error_log);
ONRETURN(Set, res, task->key, error_log);
ONRETURN(Set, res);

task->done(task);
});
}

bool KVClientManager::Get(std::shared_ptr<GetKvCacheContext> task) {
assert(nullptr != task->value);
return Get(task->key, task->value, task->offset, task->length);
}

void KVClientManager::Get(std::shared_ptr<GetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency);

bool KVClientManager::Get(const std::string &key, char *value, uint64_t offset,
uint64_t length) {
LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency);
std::string error_log;
task->res = client_->Get(task->key, task->value, task->offset,
task->length, &error_log);
ONRETURN(Get, task->res);

assert(nullptr != value);
std::string error_log;
auto res = client_->Get(key, value, offset, length, &error_log);
ONRETURN(Get, res, key, error_log);
return res;
task->done(task);
});
}

} // namespace client
Expand Down
22 changes: 14 additions & 8 deletions curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace client {

class KVClientManager;
class SetKVCacheTask;
class GetKVCacheTask;
using curve::common::TaskThreadPool;
using curvefs::client::common::KVClientManagerOpt;

Expand All @@ -52,6 +53,8 @@ extern KVClientMetric *g_kvClientMetric;

typedef std::function<void(const std::shared_ptr<SetKVCacheTask> &)>
SetKVCacheDone;
typedef std::function<void(const std::shared_ptr<GetKVCacheTask> &)>
GetKVCacheDone;

struct SetKVCacheTask {
std::string key;
Expand All @@ -60,16 +63,22 @@ struct SetKVCacheTask {
SetKVCacheDone done;
SetKVCacheTask() = default;
SetKVCacheTask(const std::string &k, const char *val, const uint64_t len)
: key(k), value(val), length(len) {}
: key(k), value(val), length(len) {
done = [](const std::shared_ptr<SetKVCacheTask> &) {};
}
};

struct GetKvCacheContext {
struct GetKVCacheTask {
const std::string &key;
char *value;
uint64_t offset;
uint64_t length;
GetKvCacheContext(const std::string &k, char *v, uint64_t off, uint64_t len)
: key(k), value(v), offset(off), length(len) {}
bool res;
GetKVCacheDone done;
GetKVCacheTask(const std::string &k, char *v, uint64_t off, uint64_t len)
: key(k), value(v), offset(off), length(len), res(false) {
done = [](const std::shared_ptr<GetKVCacheTask> &) {};
}
};

class KVClientManager {
Expand All @@ -87,10 +96,7 @@ class KVClientManager {
*/
void Set(std::shared_ptr<SetKVCacheTask> task);

bool Get(const std::string &key, char *value, uint64_t offset,
uint64_t length);

bool Get(std::shared_ptr<GetKvCacheContext> task);
void Get(std::shared_ptr<GetKVCacheTask> task);

private:
void Uninit();
Expand Down
15 changes: 11 additions & 4 deletions curvefs/src/client/kvclient/memcache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ class MemCachedClient : public KVClient {
return false;
}
}

memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_DISTRIBUTION,
MEMCACHED_DISTRIBUTION_CONSISTENT);
memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, 5);
memcached_behavior_set(client_,
MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS, 1);

return PushServer();
}
Expand All @@ -117,9 +114,11 @@ class MemCachedClient : public KVClient {
auto res = memcached_set(tcli, key.c_str(), key.length(), value,
value_len, 0, 0);
if (MEMCACHED_SUCCESS == res) {
VLOG(9) << "Set key = " << key << " OK";
return true;
}
*errorlog = ResError(res);
LOG(ERROR) << "Set key = " << key << " error = " << *errorlog;
return false;
}

Expand All @@ -135,13 +134,21 @@ class MemCachedClient : public KVClient {
memcached_return_t ue;
char *res = memcached_get(tcli, key.c_str(), key.length(),
&value_length, &flags, &ue);
if (res != nullptr && value) {
if (MEMCACHED_SUCCESS == ue && res != nullptr && value &&
value_length >= length) {
VLOG(9) << "Get key = " << key << " OK";
memcpy(value, res + offset, length);
free(res);
return true;
}

*errorlog = ResError(ue);
if (ue != MEMCACHED_NOTFOUND) {
LOG(ERROR) << "Get key = " << key << " error = " << *errorlog
<< ", get_value_len = " << value_length
<< ", expect_value_len = " << length;
}

return false;
}

Expand Down
11 changes: 10 additions & 1 deletion curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,16 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name,
return false;
}

return g_kvClientManager->Get(name, databuf, offset, length);
auto task = std::make_shared<GetKVCacheTask>(name, databuf, offset, length);
CountDownEvent event(1);
task->done = [&](const std::shared_ptr<GetKVCacheTask> &task) {
event.Signal();
return;
};
g_kvClientManager->Get(task);
event.Wait();

return task->res;
}

bool FileCacheManager::ReadKVRequestFromS3(const std::string &name,
Expand Down
14 changes: 11 additions & 3 deletions curvefs/test/client/client_memcache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class MemCachedTest : public ::testing::Test {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
retry++;
} while (1);
LOG(INFO) << "=============== memcache start ok";
}

void TearDown() {
Expand Down Expand Up @@ -115,11 +116,18 @@ TEST_F(MemCachedTest, MultiThreadTask) {
// get
for (int i = 0; i < 5; i++) {
workers.emplace_back([&, i]() {
CountDownEvent taskEnvent(1);
char *result = new char[4];
auto context = std::make_shared<GetKvCacheContext>(kvstr[i].first,
result, 0, 4);
ASSERT_EQ(true, manager_.Get(context));
auto task =
std::make_shared<GetKVCacheTask>(kvstr[i].first, result, 0, 4);
task->done =
[&taskEnvent](const std::shared_ptr<GetKVCacheTask> &task) {
taskEnvent.Signal();
};
manager_.Get(task);
taskEnvent.Wait();
ASSERT_EQ(0, memcmp(result, kvstr[i].second.c_str(), 4));
ASSERT_TRUE(task->res);
});
}
for (auto &iter : workers) {
Expand Down

0 comments on commit 9d5a18a

Please sign in to comment.