Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/client: s3 adaptor unit test optimization #1227

Merged
merged 1 commit into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,30 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
::curvefs::client::common::S3Info2FsS3Option(s3Info, &fsS3Option);
SetFuseClientS3Option(&opt, fsS3Option);

s3Client_ = std::make_shared<S3ClientImpl>();
s3Client_->Init(opt.s3Opt.s3AdaptrOpt);
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client_.get(),
inodeManager_, mdsClient_);
auto s3Client = std::make_shared<S3ClientImpl>();
s3Client->Init(opt.s3Opt.s3AdaptrOpt);
auto fsCacheManager = std::make_shared<FsCacheManager>(
dynamic_cast<S3ClientAdaptorImpl *>(s3Adaptor_.get()),
opt.s3Opt.s3ClientAdaptorOpt.readCacheMaxByte,
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte);
if (opt.s3Opt.s3ClientAdaptorOpt.diskCacheOpt.diskCacheType !=
baijiaruo marked this conversation as resolved.
Show resolved Hide resolved
DiskCacheType::Disable) {
auto wrapper = std::make_shared<PosixWrapper>();
auto diskCacheRead = std::make_shared<DiskCacheRead>();
auto diskCacheWrite = std::make_shared<DiskCacheWrite>();
auto diskCacheManager = std::make_shared<DiskCacheManager>(
wrapper, diskCacheWrite, diskCacheRead);
auto diskCacheManagerImpl = std::make_shared<DiskCacheManagerImpl>(
diskCacheManager, s3Client.get());
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
diskCacheManagerImpl, true);
} else {
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
nullptr);
}

return ret;
}

Expand Down
11 changes: 3 additions & 8 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
#include <memory>

#include "curvefs/src/client/fuse_client.h"

#include "curvefs/src/client/s3/client_s3_cache_manager.h"
namespace curvefs {
namespace client {

class FuseS3Client : public FuseClient {
public:
FuseS3Client()
: FuseClient(),
s3Adaptor_(std::make_shared<S3ClientAdaptorImpl>()),
s3Client_(nullptr) {}
s3Adaptor_(std::make_shared<S3ClientAdaptorImpl>()) {}

FuseS3Client(const std::shared_ptr<MdsClient> &mdsClient,
const std::shared_ptr<MetaServerClient> &metaClient,
Expand All @@ -45,8 +44,7 @@ class FuseS3Client : public FuseClient {
const std::shared_ptr<S3ClientAdaptor> &s3Adaptor)
: FuseClient(mdsClient, metaClient,
inodeManager, dentryManager),
s3Adaptor_(s3Adaptor),
s3Client_(nullptr) {}
s3Adaptor_(s3Adaptor) {}

CURVEFS_ERROR Init(const FuseClientOption &option) override;

Expand Down Expand Up @@ -85,9 +83,6 @@ class FuseS3Client : public FuseClient {
private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;

// s3 client
std::shared_ptr<S3ClientImpl> s3Client_;
};


Expand Down
63 changes: 29 additions & 34 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
namespace curvefs {

namespace client {

CURVEFS_ERROR
S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) {
S3ClientAdaptorImpl::Init(
const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
chunkSize_ = option.chunkSize;
Expand All @@ -57,34 +60,10 @@ S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
fsCacheManager_ = std::make_shared<FsCacheManager>(
this, option.readCacheMaxByte, option.writeCacheMaxByte);
waitIntervalSec_.Init(option.intervalSec * 1000);
LOG(INFO) << "S3ClientAdaptorImpl Init. block size:" << blockSize_
<< ", chunk size: " << chunkSize_
<< ", prefetchBlocks: " << prefetchBlocks_
<< ", prefetchExecQueueNum: " << prefetchExecQueueNum_
<< ", intervalSec: " << option.intervalSec
<< ", flushIntervalSec: " << option.flushIntervalSec
<< ", writeCacheMaxByte: " << option.writeCacheMaxByte
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
toStop_.store(false, std::memory_order_release);
bgFlushThread_ = Thread(&S3ClientAdaptorImpl::BackGroundFlush, this);

fsCacheManager_ = fsCacheManager;
diskCacheManagerImpl_ = diskCacheManagerImpl;
baijiaruo marked this conversation as resolved.
Show resolved Hide resolved
if (HasDiskCache()) {
std::shared_ptr<PosixWrapper> wrapper =
std::make_shared<PosixWrapper>();
std::shared_ptr<DiskCacheRead> diskCacheRead =
std::make_shared<DiskCacheRead>();
std::shared_ptr<DiskCacheWrite> diskCacheWrite =
std::make_shared<DiskCacheWrite>();
std::shared_ptr<DiskCacheManager> diskCacheManager =
std::make_shared<DiskCacheManager>(wrapper, diskCacheWrite,
diskCacheRead);
diskCacheManagerImpl_ =
std::make_shared<DiskCacheManagerImpl>(diskCacheManager, client);
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
LOG(ERROR) << "Init disk cache failed";
return CURVEFS_ERROR::INTERNAL;
Expand All @@ -100,7 +79,21 @@ S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
}
}
}
if (startBackGround) {
toStop_.store(false, std::memory_order_release);
bgFlushThread_ = Thread(&S3ClientAdaptorImpl::BackGroundFlush, this);
}

LOG(INFO) << "S3ClientAdaptorImpl Init. block size:" << blockSize_
<< ", chunk size: " << chunkSize_
<< ", prefetchBlocks: " << prefetchBlocks_
<< ", prefetchExecQueueNum: " << prefetchExecQueueNum_
<< ", intervalSec: " << option.intervalSec
<< ", flushIntervalSec: " << option.flushIntervalSec
<< ", writeCacheMaxByte: " << option.writeCacheMaxByte
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
return CURVEFS_ERROR::OK;
}

Expand All @@ -117,9 +110,11 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,
VLOG(6) << "pendingReq_ is: " << pendingReq_;
uint64_t pendingReq = pendingReq_.load(std::memory_order_seq_cst);
fsCacheManager_->DataCacheByteInc(length);
if ((fsCacheManager_->GetDataCacheSize() + pendingReq * fuseMaxSize_) >=
fsCacheManager_->GetDataCacheMaxSize()) {
LOG(INFO) << "write cache is full, wait flush";
uint64_t size = fsCacheManager_->GetDataCacheSize();
uint64_t maxSize = fsCacheManager_->GetDataCacheMaxSize();
if ((size + pendingReq * fuseMaxSize_) >= maxSize) {
LOG(INFO) << "write cache is full, wait flush. size:" << size
<< ", maxSize:" << maxSize;
fsCacheManager_->WaitFlush();
}
}
Expand Down
41 changes: 19 additions & 22 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ class S3ClientAdaptor {
* @brief Initailize s3 client
* @param[in] options the options for s3 client
*/
virtual CURVEFS_ERROR Init(const S3ClientAdaptorOption& option,
S3Client* client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) = 0;
virtual CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround = false) = 0;
/**
* @brief write data to s3
* @param[in] options the options for s3 client
Expand All @@ -94,14 +97,20 @@ class S3ClientAdaptor {
class S3ClientAdaptorImpl : public S3ClientAdaptor {
public:
S3ClientAdaptorImpl() {}
virtual ~S3ClientAdaptorImpl() {}
virtual ~S3ClientAdaptorImpl() {
LOG(INFO) << "delete S3ClientAdaptorImpl";
}
/**
* @brief Initailize s3 client
* @param[in] options the options for s3 client
*/
CURVEFS_ERROR Init(const S3ClientAdaptorOption& option, S3Client* client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient);
CURVEFS_ERROR
Init(const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
bool startBackGround = false);
/**
* @brief write data to s3
* @param[in] options the options for s3 client
Expand All @@ -125,7 +134,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
return fsCacheManager_;
}
uint32_t GetFlushInterval() { return flushIntervalSec_; }
S3Client *GetS3Client() { return client_; }
std::shared_ptr<S3Client> GetS3Client() { return client_; }
uint32_t GetPrefetchBlocks() {
return prefetchBlocks_;
}
Expand Down Expand Up @@ -173,18 +182,6 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
}
void InitMetrics(const std::string &fsName);
void CollectMetrics(InterfaceMetric *interface, int count, uint64_t start);

// for test
void InitForTest(
std::shared_ptr<DiskCacheManagerImpl> diskcacheManagerImpl,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<InodeCacheManager> inodeManager) {
diskCacheManagerImpl_ = diskcacheManagerImpl;
fsCacheManager_ = fsCacheManager;
inodeManager_ = inodeManager;
chunkSize_ = 4 * 1024 * 1024;
}

void SetDiskCache(DiskCacheType type) {
diskCacheType_ = type;
}
Expand Down Expand Up @@ -213,7 +210,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::shared_ptr<S3Metric> s3Metric_;

private:
S3Client* client_;
std::shared_ptr<S3Client> client_;
uint64_t blockSize_;
uint64_t chunkSize_;
uint32_t fuseMaxSize_;
Expand Down
10 changes: 10 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,15 @@ void ChunkCacheManager::UpdateWriteCacheMap(uint64_t oldChunkPos,
(void)ret;
}

void ChunkCacheManager::AddWriteDataCacheForTest(DataCachePtr dataCache) {
WriteLockGuard writeLockGuard(rwLockWrite_);

dataWCacheMap_.emplace(dataCache->GetChunkPos(), dataCache);
s3ClientAdaptor_->GetFsCacheManager()->DataCacheNumInc();
s3ClientAdaptor_->GetFsCacheManager()->DataCacheByteInc(
dataCache->GetActualLen());
}

DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
ChunkCacheManagerPtr chunkCacheManager, uint64_t chunkPos,
uint64_t len, const char *data)
Expand Down Expand Up @@ -1832,6 +1841,7 @@ void DataCache::Write(uint64_t chunkPos, uint64_t len, const char *data,
void DataCache::Truncate(uint64_t size) {
uint64_t blockSize = s3ClientAdaptor_->GetBlockSize();
uint32_t pageSize = s3ClientAdaptor_->GetPageSize();
assert(size <= len_);

curve::common::LockGuard lg(mtx_);
uint64_t truncatePos = chunkPos_ + size;
Expand Down
Loading