Skip to content

Commit

Permalink
Refactor reader->writer (#2248)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored Nov 16, 2024
1 parent f9463c8 commit 1dacf47
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 40 deletions.
67 changes: 27 additions & 40 deletions src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,31 @@ Status Storage::ReaderToAdmin() {
return Status::OK();
}

Status Storage::ReaderToWriter() {
if (compact_processor_ != nullptr) {
UnrecoverableError("compact processor was initialized before.");
}

compact_processor_ = MakeUnique<CompactionProcessor>(new_catalog_.get(), txn_mgr_.get());
compact_processor_->Start();

periodic_trigger_thread_->Stop();
i64 compact_interval = config_ptr_->CompactInterval() > 0 ? config_ptr_->CompactInterval() : 0;
i64 optimize_interval = config_ptr_->OptimizeIndexInterval() > 0 ? config_ptr_->OptimizeIndexInterval() : 0;
// i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0;
i64 full_checkpoint_interval_sec = config_ptr_->FullCheckpointInterval() > 0 ? config_ptr_->FullCheckpointInterval() : 0;
i64 delta_checkpoint_interval_sec = config_ptr_->DeltaCheckpointInterval() > 0 ? config_ptr_->DeltaCheckpointInterval() : 0;
periodic_trigger_thread_->full_checkpoint_trigger_ = MakeShared<CheckpointPeriodicTrigger>(full_checkpoint_interval_sec, wal_mgr_.get(), true);
periodic_trigger_thread_->delta_checkpoint_trigger_ = MakeShared<CheckpointPeriodicTrigger>(delta_checkpoint_interval_sec, wal_mgr_.get(), false);
periodic_trigger_thread_->compact_segment_trigger_ = MakeShared<CompactSegmentPeriodicTrigger>(compact_interval, compact_processor_.get());
periodic_trigger_thread_->optimize_index_trigger_ = MakeShared<OptimizeIndexPeriodicTrigger>(optimize_interval, compact_processor_.get());
periodic_trigger_thread_->Start();

std::unique_lock<std::mutex> lock(mutex_);
current_storage_mode_ = StorageMode::kWritable;
return Status::OK();
}

ResultCacheManager *Storage::result_cache_manager() const noexcept {
if (config_ptr_->ResultCache() != "on") {
return nullptr;
Expand Down Expand Up @@ -471,47 +496,9 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
case StorageMode::kReadable: {
UnrecoverableError("Attempt to set storage mode from Readable to Readable");
}
default: {
break;
}
}

if (target_mode == StorageMode::kWritable) {

if (config_ptr_->StorageType() == StorageType::kMinio) {
Status status = VirtualStore::CreateBucket();
if (!status.ok()) {
return status;
}
}

if (compact_processor_ != nullptr) {
UnrecoverableError("compact processor was initialized before.");
case StorageMode::kWritable: {
return ReaderToWriter();
}

compact_processor_ = MakeUnique<CompactionProcessor>(new_catalog_.get(), txn_mgr_.get());
compact_processor_->Start();

periodic_trigger_thread_->Stop();
i64 compact_interval = config_ptr_->CompactInterval() > 0 ? config_ptr_->CompactInterval() : 0;
i64 optimize_interval = config_ptr_->OptimizeIndexInterval() > 0 ? config_ptr_->OptimizeIndexInterval() : 0;
// i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0;
i64 full_checkpoint_interval_sec = config_ptr_->FullCheckpointInterval() > 0 ? config_ptr_->FullCheckpointInterval() : 0;
i64 delta_checkpoint_interval_sec = config_ptr_->DeltaCheckpointInterval() > 0 ? config_ptr_->DeltaCheckpointInterval() : 0;
periodic_trigger_thread_->full_checkpoint_trigger_ =
MakeShared<CheckpointPeriodicTrigger>(full_checkpoint_interval_sec, wal_mgr_.get(), true);
periodic_trigger_thread_->delta_checkpoint_trigger_ =
MakeShared<CheckpointPeriodicTrigger>(delta_checkpoint_interval_sec, wal_mgr_.get(), false);
periodic_trigger_thread_->compact_segment_trigger_ =
MakeShared<CompactSegmentPeriodicTrigger>(compact_interval, compact_processor_.get());
periodic_trigger_thread_->optimize_index_trigger_ =
MakeShared<OptimizeIndexPeriodicTrigger>(optimize_interval, compact_processor_.get());
periodic_trigger_thread_->Start();
}

{
std::unique_lock<std::mutex> lock(mutex_);
current_storage_mode_ = target_mode;
}
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public:
// Used for follower and learner
Status InitToReader();
Status ReaderToAdmin();
Status ReaderToWriter();
Status UnInitFromReader();

void AttachCatalog(const FullCatalogFileInfo &full_ckp_info, const Vector<DeltaCatalogFileInfo> &delta_ckp_infos);
Expand Down

0 comments on commit 1dacf47

Please sign in to comment.