Skip to content

Commit

Permalink
snapshotcloneserver: fixed cancel task lost
Browse files Browse the repository at this point in the history
  • Loading branch information
Wine93 authored and xu-chaojie committed Jul 22, 2021
1 parent d115dd4 commit e39cedf
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 73 deletions.
14 changes: 14 additions & 0 deletions src/snapshotcloneserver/common/snapshotclone_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
namespace curve {
namespace snapshotcloneserver {

using CASFunc = std::function<SnapshotInfo*(SnapshotInfo*)>;

class SnapshotCloneMetaStore {
public:
SnapshotCloneMetaStore() {}
Expand All @@ -60,6 +62,18 @@ class SnapshotCloneMetaStore {
* @return: 0 更新成功/ -1 更新失败
*/
virtual int UpdateSnapshot(const SnapshotInfo &snapinfo) = 0;

/**
* @brief Compare and set snapshot
* @param[in] uuid the uuid for snapshot
* @param[in] cas the function for compare and set snapshot,
* return nullptr if not needed to set snapshot,
* else return the pointer of snapshot to set
* @return 0 if set snapshot success or not needed to set snapshot,
* else return -1
*/
virtual int CASSnapshot(const UUID& uuid, CASFunc cas) = 0;

/**
* 获取指定快照的快照信息
* @param 快照的uuid
Expand Down
29 changes: 29 additions & 0 deletions src/snapshotcloneserver/common/snapshotclone_meta_store_etcd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,35 @@ int SnapshotCloneMetaStoreEtcd::UpdateSnapshot(const SnapshotInfo &info) {
return 0;
}

int SnapshotCloneMetaStoreEtcd::CASSnapshot(const UUID& uuid, CASFunc cas) {
WriteLockGuard guard(snapInfos_mutex);
auto iter = snapInfos_.find(uuid);
auto info = cas(iter == snapInfos_.end() ? nullptr : &(iter->second));
if (nullptr == info) { // Not needed to update snapshot
return 0;
}

int retCode;
std::string value;
auto key = codec_->EncodeSnapshotKey(uuid);
if (!codec_->EncodeSnapshotData(*info, &value)) {
LOG(ERROR) << "EncodeSnapshotData failed, snapshotInfo: " << *info;
return -1;
} else if ((retCode = client_->Put(key, value)) != EtcdErrCode::EtcdOK) {
LOG(ERROR) << "Put snapshotInfo into etcd failed"
<<", errCode: " << retCode << ", snapshotInfo: " << *info;
return -1;
}

if (iter != snapInfos_.end()) {
iter->second = *info;
} else {
snapInfos_.emplace(uuid, *info);
}

return 0;
}

int SnapshotCloneMetaStoreEtcd::GetSnapshotInfo(
const UUID &uuid, SnapshotInfo *info) {
ReadLockGuard guard(snapInfos_mutex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class SnapshotCloneMetaStoreEtcd : public SnapshotCloneMetaStore {

int UpdateSnapshot(const SnapshotInfo &info) override;

int CASSnapshot(const UUID& uuid, CASFunc cas) override;

int GetSnapshotInfo(const UUID &uuid, SnapshotInfo *info) override;

int GetSnapshotList(const std::string &filename,
Expand Down
70 changes: 46 additions & 24 deletions src/snapshotcloneserver/snapshot/snapshot_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void SnapshotCoreImpl::HandleCreateSnapshotTask(
UUID uuid = task->GetUuid();
uint64_t seqNum = info->GetSeqNum();
bool existIndexData = false;
if (kUnInitializeSeqNum == seqNum) {
if (kUnInitializeSeqNum == seqNum) {
ret = CreateSnapshotOnCurvefs(fileName, info, task);
if (ret < 0) {
LOG(ERROR) << "CreateSnapshotOnCurvefs error, "
Expand Down Expand Up @@ -212,11 +212,7 @@ void SnapshotCoreImpl::HandleCreateSnapshotTask(
task->SetProgress(kProgressCreateSnapshotOnCurvefsComplete);
task->UpdateMetric();
if (task->IsCanceled()) {
ret = StartCancel(task);
if (kErrCodeSuccess == ret) {
CancelAfterCreateSnapshotOnCurvefs(task);
}
return;
return CancelAfterCreateSnapshotOnCurvefs(task);
}

ChunkIndexData indexData;
Expand Down Expand Up @@ -270,11 +266,7 @@ void SnapshotCoreImpl::HandleCreateSnapshotTask(
}

if (task->IsCanceled()) {
ret = StartCancel(task);
if (kErrCodeSuccess == ret) {
CancelAfterCreateChunkIndexData(task);
}
return;
return CancelAfterCreateChunkIndexData(task);
}

FileSnapMap fileSnapshotMap;
Expand Down Expand Up @@ -320,11 +312,8 @@ void SnapshotCoreImpl::HandleCreateSnapshotTask(
task->UpdateMetric();

if (task->IsCanceled()) {
ret = StartCancel(task);
if (kErrCodeSuccess == ret) {
CancelAfterTransferSnapshotData(task, indexData, fileSnapshotMap);
}
return;
return CancelAfterTransferSnapshotData(
task, indexData, fileSnapshotMap);
}

ret = DeleteSnapshotOnCurvefs(*info);
Expand All @@ -337,12 +326,8 @@ void SnapshotCoreImpl::HandleCreateSnapshotTask(

LockGuard lockGuard(task->GetLockRef());
if (task->IsCanceled()) {
// Cancel的逻辑与前面一致
ret = StartCancel(task);
if (kErrCodeSuccess == ret) {
CancelAfterTransferSnapshotData(task, indexData, fileSnapshotMap);
}
return;
return CancelAfterTransferSnapshotData(
task, indexData, fileSnapshotMap);
}

HandleCreateSnapshotSuccess(task);
Expand Down Expand Up @@ -572,9 +557,20 @@ int SnapshotCoreImpl::CreateSnapshotOnCurvefs(
info->SetStripeCount(snapInfo.stripeCount);
info->SetCreateTime(snapInfo.ctime);

ret = metaStore_->UpdateSnapshot(*info);
auto compareAndSet = [&](SnapshotInfo* snapinfo) {
if (nullptr != snapinfo) {
auto status = snapinfo->GetStatus();
if (info->GetStatus() != status) {
info->SetStatus(status);
}
}
return info;
};

auto uuid = info->GetUuid();
ret = metaStore_->CASSnapshot(uuid, compareAndSet);
if (ret < 0) {
LOG(ERROR) << "UpdateSnapshot error, "
LOG(ERROR) << "CASSnapshot error, "
<< " ret = " << ret
<< ", fileName = " << fileName
<< ", uuid = " << task->GetUuid();
Expand Down Expand Up @@ -1159,6 +1155,32 @@ int SnapshotCoreImpl::HandleCancelUnSchduledSnapshotTask(
return kErrCodeSuccess;
}


int SnapshotCoreImpl::HandleCancelScheduledSnapshotTask(
std::shared_ptr<SnapshotTaskInfo> task) {
LockGuard lockGuard(task->GetLockRef());

if (task->IsFinish()) {
return kErrCodeCannotCancelFinished;
}

auto ret = StartCancel(task);
if (kErrCodeSuccess == ret) {
task->Cancel();
} else {
auto& snapInfo = task->GetSnapshotInfo();
LOG(ERROR) << "HandleCancelSchduledSnapshotTask failed: "
<< ", ret = " << ret
<< ", uuid = " << snapInfo.GetUuid()
<< ", fileName = " << snapInfo.GetFileName()
<< ", snapshotName = " << snapInfo.GetSnapshotName()
<< ", seqNum = " << snapInfo.GetSeqNum()
<< ", createTime = " << snapInfo.GetCreateTime();
}

return ret;
}

} // namespace snapshotcloneserver
} // namespace curve

13 changes: 13 additions & 0 deletions src/snapshotcloneserver/snapshot/snapshot_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ class SnapshotCore {

virtual int HandleCancelUnSchduledSnapshotTask(
std::shared_ptr<SnapshotTaskInfo> task) = 0;

/**
* @brief Handle cancel snapshot task which is scheduled
* @param[in] task pointer to snapshot task
* @return kErrCodeCannotCancelFinished if task has finished,
* kErrCodeSuccess if cancel success,
* else return kErrCodeInternalError
*/
virtual int HandleCancelScheduledSnapshotTask(
std::shared_ptr<SnapshotTaskInfo> task) = 0;
};

class SnapshotCoreImpl : public SnapshotCore {
Expand Down Expand Up @@ -224,6 +234,9 @@ class SnapshotCoreImpl : public SnapshotCore {
int HandleCancelUnSchduledSnapshotTask(
std::shared_ptr<SnapshotTaskInfo> task) override;

int HandleCancelScheduledSnapshotTask(
std::shared_ptr<SnapshotTaskInfo> task) override;

private:
/**
* @brief 构建快照文件映射
Expand Down
6 changes: 1 addition & 5 deletions src/snapshotcloneserver/snapshot/snapshot_task_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ int SnapshotTaskManager::CancelTask(const TaskIdType &taskId) {
auto it = taskMap_.find(taskId);
if (it != taskMap_.end()) {
auto taskInfo = it->second->GetTaskInfo();
LockGuard lockGuard(taskInfo->GetLockRef());
if (!taskInfo->IsFinish()) {
taskInfo->Cancel();
return kErrCodeSuccess;
}
return core_->HandleCancelScheduledSnapshotTask(taskInfo);
}
return kErrCodeCannotCancelFinished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ int FakeSnapshotCloneMetaStore::UpdateSnapshot(const SnapshotInfo &info) {
return 0;
}

int FakeSnapshotCloneMetaStore::CASSnapshot(const UUID& uuid, CASFunc cas) {
fiu_return_on(
"test/integration/snapshotcloneserver/FakeSnapshotCloneMetaStore.UpdateSnapshot", -1); // NOLINT
std::lock_guard<std::mutex> guard(snapInfos_mutex);
auto iter = snapInfos_.find(uuid);
if (iter == snapInfos_.end()) {
return -1;
}

auto info = cas(&(iter->second));
if (nullptr != info) {
iter->second = *info;
}

return 0;
}

int FakeSnapshotCloneMetaStore::GetSnapshotInfo(
const UUID &uuid, SnapshotInfo *info) {
std::lock_guard<std::mutex> guard(snapInfos_mutex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FakeSnapshotCloneMetaStore : public SnapshotCloneMetaStore {
int AddSnapshot(const SnapshotInfo &snapinfo) override;
int DeleteSnapshot(const UUID &uuid) override;
int UpdateSnapshot(const SnapshotInfo &snapinfo) override;
int CASSnapshot(const UUID& uuid, CASFunc cas) override;
int GetSnapshotInfo(const UUID &uuid, SnapshotInfo *info) override;
int GetSnapshotList(const std::string &filename,
std::vector<SnapshotInfo> *v) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ TEST_F(SnapshotCloneServerTest, TestCancelAndMakeSnaphotConcurrent) {
CancelSnapshot(testUser1_, testFile1_, uuid1);
isCancel = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::this_thread::sleep_for(std::chrono::seconds(30));
continue;
} else if (info1.GetSnapshotInfo().GetStatus() == Status::done) {
success1 = false;
Expand Down
4 changes: 4 additions & 0 deletions test/snapshotcloneserver/mock_snapshot_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,17 @@ class MockSnapshotCore : public SnapshotCore {

MOCK_METHOD1(HandleCancelUnSchduledSnapshotTask,
int(std::shared_ptr<SnapshotTaskInfo> task));

MOCK_METHOD1(HandleCancelScheduledSnapshotTask,
int(std::shared_ptr<SnapshotTaskInfo> task));
};

class MockSnapshotCloneMetaStore : public SnapshotCloneMetaStore {
public:
MOCK_METHOD1(AddSnapshot, int(const SnapshotInfo &snapinfo));
MOCK_METHOD1(DeleteSnapshot, int(const UUID &uuid));
MOCK_METHOD1(UpdateSnapshot, int(const SnapshotInfo &snapinfo));
MOCK_METHOD2(CASSnapshot, int(const UUID&, CASFunc));
MOCK_METHOD2(GetSnapshotInfo,
int(const UUID &uuid, SnapshotInfo *info));
MOCK_METHOD2(GetSnapshotList,
Expand Down
Loading

0 comments on commit e39cedf

Please sign in to comment.