Skip to content

Commit

Permalink
curvefs/client: s3 adaptor unit test optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
baijiaruo1 committed Apr 2, 2022
1 parent 0f1a454 commit 367a97c
Show file tree
Hide file tree
Showing 19 changed files with 4,411 additions and 3,128 deletions.
28 changes: 24 additions & 4 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,30 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
::curvefs::client::common::S3Info2FsS3Option(s3Info, &fsS3Option);
SetFuseClientS3Option(&opt, fsS3Option);

s3Client_ = std::make_shared<S3ClientImpl>();
s3Client_->Init(opt.s3Opt.s3AdaptrOpt);
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client_.get(),
inodeManager_, mdsClient_);
auto s3Client = std::make_shared<S3ClientImpl>();
s3Client->Init(opt.s3Opt.s3AdaptrOpt);
auto fsCacheManager = std::make_shared<FsCacheManager>(
dynamic_cast<S3ClientAdaptorImpl *>(s3Adaptor_.get()),
opt.s3Opt.s3ClientAdaptorOpt.readCacheMaxByte,
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte);
if (opt.s3Opt.s3ClientAdaptorOpt.diskCacheOpt.diskCacheType !=
DiskCacheType::Disable) {
auto wrapper = std::make_shared<PosixWrapper>();
auto diskCacheRead = std::make_shared<DiskCacheRead>();
auto diskCacheWrite = std::make_shared<DiskCacheWrite>();
auto diskCacheManager = std::make_shared<DiskCacheManager>(
wrapper, diskCacheWrite, diskCacheRead);
auto diskCacheManagerImpl = std::make_shared<DiskCacheManagerImpl>(
diskCacheManager, s3Client.get());
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
diskCacheManagerImpl, true);
} else {
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
nullptr);
}

return ret;
}

Expand Down
11 changes: 3 additions & 8 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
#include <memory>

#include "curvefs/src/client/fuse_client.h"

#include "curvefs/src/client/s3/client_s3_cache_manager.h"
namespace curvefs {
namespace client {

class FuseS3Client : public FuseClient {
public:
FuseS3Client()
: FuseClient(),
s3Adaptor_(std::make_shared<S3ClientAdaptorImpl>()),
s3Client_(nullptr) {}
s3Adaptor_(std::make_shared<S3ClientAdaptorImpl>()) {}

FuseS3Client(const std::shared_ptr<MdsClient> &mdsClient,
const std::shared_ptr<MetaServerClient> &metaClient,
Expand All @@ -45,8 +44,7 @@ class FuseS3Client : public FuseClient {
const std::shared_ptr<S3ClientAdaptor> &s3Adaptor)
: FuseClient(mdsClient, metaClient,
inodeManager, dentryManager),
s3Adaptor_(s3Adaptor),
s3Client_(nullptr) {}
s3Adaptor_(s3Adaptor) {}

CURVEFS_ERROR Init(const FuseClientOption &option) override;

Expand Down Expand Up @@ -85,9 +83,6 @@ class FuseS3Client : public FuseClient {
private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;

// s3 client
std::shared_ptr<S3ClientImpl> s3Client_;
};


Expand Down
106 changes: 89 additions & 17 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,44 @@
namespace curvefs {

namespace client {
/*
CURVEFS_ERROR
S3ClientAdaptorImpl::Init(
const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl) {
auto ret = Init(option, client, inodeManager, mdsClient);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "Init s3 client adaptor failed, ret: " << ret;
return ret;
}
if (HasDiskCache()) {
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
LOG(ERROR) << "Init disk cache failed";
return CURVEFS_ERROR::INTERNAL;
}
// init rpc send exec-queue
downloadTaskQueues_.resize(prefetchExecQueueNum_);
for (auto &q : downloadTaskQueues_) {
int rc = bthread::execution_queue_start(
&q, nullptr, &S3ClientAdaptorImpl::ExecAsyncDownloadTask, this);
if (rc != 0) {
LOG(ERROR) << "Init AsyncRpcQueues failed";
return CURVEFS_ERROR::INTERNAL;
}
}
}
return CURVEFS_ERROR::OK;
}
CURVEFS_ERROR
S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) {
S3ClientAdaptorImpl::Init(
const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
chunkSize_ = option.chunkSize;
Expand Down Expand Up @@ -73,18 +106,41 @@ S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
toStop_.store(false, std::memory_order_release);
bgFlushThread_ = Thread(&S3ClientAdaptorImpl::BackGroundFlush, this);
return CURVEFS_ERROR::OK;
}
*/
CURVEFS_ERROR
S3ClientAdaptorImpl::Init(
const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
chunkSize_ = option.chunkSize;
pageSize_ = option.pageSize;
if (chunkSize_ % blockSize_ != 0) {
LOG(ERROR) << "chunkSize:" << chunkSize_
<< " is not integral multiple for the blockSize:"
<< blockSize_;
return CURVEFS_ERROR::INVALIDPARAM;
}
fuseMaxSize_ = option.fuseMaxSize;
prefetchBlocks_ = option.prefetchBlocks;
prefetchExecQueueNum_ = option.prefetchExecQueueNum;
diskCacheType_ = option.diskCacheOpt.diskCacheType;
memCacheNearfullRatio_ = option.nearfullRatio;
throttleBaseSleepUs_ = option.baseSleepUs;
flushIntervalSec_ = option.flushIntervalSec;
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
fsCacheManager_ = fsCacheManager;
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (HasDiskCache()) {
std::shared_ptr<PosixWrapper> wrapper =
std::make_shared<PosixWrapper>();
std::shared_ptr<DiskCacheRead> diskCacheRead =
std::make_shared<DiskCacheRead>();
std::shared_ptr<DiskCacheWrite> diskCacheWrite =
std::make_shared<DiskCacheWrite>();
std::shared_ptr<DiskCacheManager> diskCacheManager =
std::make_shared<DiskCacheManager>(wrapper, diskCacheWrite,
diskCacheRead);
diskCacheManagerImpl_ =
std::make_shared<DiskCacheManagerImpl>(diskCacheManager, client);
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
LOG(ERROR) << "Init disk cache failed";
return CURVEFS_ERROR::INTERNAL;
Expand All @@ -100,7 +156,21 @@ S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
}
}
}
if (startBackGround) {
toStop_.store(false, std::memory_order_release);
bgFlushThread_ = Thread(&S3ClientAdaptorImpl::BackGroundFlush, this);
}

LOG(INFO) << "S3ClientAdaptorImpl Init. block size:" << blockSize_
<< ", chunk size: " << chunkSize_
<< ", prefetchBlocks: " << prefetchBlocks_
<< ", prefetchExecQueueNum: " << prefetchExecQueueNum_
<< ", intervalSec: " << option.intervalSec
<< ", flushIntervalSec: " << option.flushIntervalSec
<< ", writeCacheMaxByte: " << option.writeCacheMaxByte
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
return CURVEFS_ERROR::OK;
}

Expand All @@ -117,9 +187,11 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,
VLOG(6) << "pendingReq_ is: " << pendingReq_;
uint64_t pendingReq = pendingReq_.load(std::memory_order_seq_cst);
fsCacheManager_->DataCacheByteInc(length);
if ((fsCacheManager_->GetDataCacheSize() + pendingReq * fuseMaxSize_) >=
fsCacheManager_->GetDataCacheMaxSize()) {
LOG(INFO) << "write cache is full, wait flush";
uint64_t size = fsCacheManager_->GetDataCacheSize();
uint64_t maxSize = fsCacheManager_->GetDataCacheMaxSize();
if ((size + pendingReq * fuseMaxSize_) >= maxSize) {
LOG(INFO) << "write cache is full, wait flush. size:" << size
<< ", maxSize:" << maxSize;
fsCacheManager_->WaitFlush();
}
}
Expand Down
61 changes: 39 additions & 22 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,23 @@ class S3ClientAdaptor {
* @brief Initailize s3 client
* @param[in] options the options for s3 client
*/
virtual CURVEFS_ERROR Init(const S3ClientAdaptorOption& option,
S3Client* client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) = 0;
/*
virtual CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<DiskCacheManagerImpl> diskcacheManagerImpl) = 0;
virtual CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) = 0;*/
virtual CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround = false) = 0;
/**
* @brief write data to s3
* @param[in] options the options for s3 client
Expand All @@ -94,14 +107,30 @@ class S3ClientAdaptor {
class S3ClientAdaptorImpl : public S3ClientAdaptor {
public:
S3ClientAdaptorImpl() {}
virtual ~S3ClientAdaptorImpl() {}
virtual ~S3ClientAdaptorImpl() {
LOG(INFO) << "delete S3ClientAdaptorImpl";
}
/**
* @brief Initailize s3 client
* @param[in] options the options for s3 client
*/
CURVEFS_ERROR Init(const S3ClientAdaptorOption& option, S3Client* client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient);
/*
CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl);
CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient);*/
CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround = false);
/**
* @brief write data to s3
* @param[in] options the options for s3 client
Expand All @@ -125,7 +154,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
return fsCacheManager_;
}
uint32_t GetFlushInterval() { return flushIntervalSec_; }
S3Client *GetS3Client() { return client_; }
std::shared_ptr<S3Client> GetS3Client() { return client_; }
uint32_t GetPrefetchBlocks() {
return prefetchBlocks_;
}
Expand Down Expand Up @@ -173,18 +202,6 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
}
void InitMetrics(const std::string &fsName);
void CollectMetrics(InterfaceMetric *interface, int count, uint64_t start);

// for test
void InitForTest(
std::shared_ptr<DiskCacheManagerImpl> diskcacheManagerImpl,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<InodeCacheManager> inodeManager) {
diskCacheManagerImpl_ = diskcacheManagerImpl;
fsCacheManager_ = fsCacheManager;
inodeManager_ = inodeManager;
chunkSize_ = 4 * 1024 * 1024;
}

void SetDiskCache(DiskCacheType type) {
diskCacheType_ = type;
}
Expand Down Expand Up @@ -213,7 +230,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::shared_ptr<S3Metric> s3Metric_;

private:
S3Client* client_;
std::shared_ptr<S3Client> client_;
uint64_t blockSize_;
uint64_t chunkSize_;
uint32_t fuseMaxSize_;
Expand Down
10 changes: 10 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,15 @@ void ChunkCacheManager::UpdateWriteCacheMap(uint64_t oldChunkPos,
(void)ret;
}

void ChunkCacheManager::AddWriteDataCacheForTest(DataCachePtr dataCache) {
WriteLockGuard writeLockGuard(rwLockWrite_);

dataWCacheMap_.emplace(dataCache->GetChunkPos(), dataCache);
s3ClientAdaptor_->GetFsCacheManager()->DataCacheNumInc();
s3ClientAdaptor_->GetFsCacheManager()->DataCacheByteInc(
dataCache->GetActualLen());
}

DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
ChunkCacheManagerPtr chunkCacheManager, uint64_t chunkPos,
uint64_t len, const char *data)
Expand Down Expand Up @@ -1832,6 +1841,7 @@ void DataCache::Write(uint64_t chunkPos, uint64_t len, const char *data,
void DataCache::Truncate(uint64_t size) {
uint64_t blockSize = s3ClientAdaptor_->GetBlockSize();
uint32_t pageSize = s3ClientAdaptor_->GetPageSize();
assert(size <= len_);

curve::common::LockGuard lg(mtx_);
uint64_t truncatePos = chunkPos_ + size;
Expand Down
Loading

0 comments on commit 367a97c

Please sign in to comment.