Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng committed Oct 27, 2023
1 parent cad3d8f commit f57b39b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
18 changes: 11 additions & 7 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1943,9 +1943,10 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
// check request and get info
TabletSharedPtr tablet;
DataDir* dest_store = nullptr;

auto status = _check_migrate_request(storage_medium_migrate_req, tablet, &dest_store);
if (status.ok()) {

bool need_deal = true;
auto status = _check_migrate_request(storage_medium_migrate_req, tablet, &dest_store, &need_deal);
if (status.ok() && need_deal) {
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = StorageEngine::instance()->execute_task(&engine_task);
}
Expand Down Expand Up @@ -1973,7 +1974,8 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac

Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store) {
DataDir** dest_store,
bool* need_deal) {
int64_t tablet_id = req.tablet_id;
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
Expand Down Expand Up @@ -2011,8 +2013,10 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
return Status::InternalError("tablet is already on specified path {}",
tablet->data_dir()->path());
LOG_WARNING("tablet is already on specified path")
.tag("path", tablet->data_dir()->path());
*need_deal = false;
return Status::OK();
}

// check local disk capacity
Expand All @@ -2021,7 +2025,7 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}

*need_deal = true;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class StorageMediumMigrateTaskPool : public TaskWorkerPool {
public:
StorageMediumMigrateTaskPool(ExecEnv* env, ThreadModel thread_model);
Status _check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet,
DataDir** dest_store);
DataDir** dest_store, bool* need_deal);
void _storage_medium_migrate_worker_thread_callback();

DISALLOW_COPY_AND_ASSIGN(StorageMediumMigrateTaskPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -615,6 +616,15 @@ private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws Sched
}
}

public void updateDestPathHash(TabletSchedCtx tabletCtx) {
// find dest replica
Optional<Replica> destReplica = tabletCtx.getReplicas()
.stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny();
if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
destReplica.get().setPathHash(tabletCtx.getDestPathHash());
}
}

public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
PathSlot pathSlot = backendsWorkingSlots.get(beId);
if (pathSlot == null) {
Expand Down Expand Up @@ -1642,6 +1652,7 @@ public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrati
// if we have a success task, then stat must be refreshed before schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
updateDestPathHash(tabletCtx);
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
} else {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
Expand Down

0 comments on commit f57b39b

Please sign in to comment.