Skip to content

Commit

Permalink
dcompact_worker: ROCKS_LOG_DEBUG for top stages
Browse files Browse the repository at this point in the history
  • Loading branch information
rockeet committed Jan 12, 2024
1 parent f6bbde8 commit 1e35877
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions tools/dcompact/dcompact_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ int RunCompact(FILE* in) const {
WARN("env TOPLINGDB_CACHE_SST_FILE_ITER is true, sst would not be closed asap!");
}
mut_dbo.max_open_files = 1; // capacity is not strict
ROCKS_LOG_DEBUG(info_log, "Initialize workspace VersionSet");
shared_ptr<Cache> table_cache = NewLRUCache(mut_dbo.max_open_files);
BlockCacheTracer* block_cache_tracer = nullptr;
const std::shared_ptr<IOTracer> io_tracer(nullptr);
Expand Down Expand Up @@ -1156,6 +1157,7 @@ int RunCompact(FILE* in) const {
&edit, &mutex, null_dbdir, new_descriptor_log, &cfo);
VERIFY_STATUS_OK(s);
}
ROCKS_LOG_DEBUG(info_log, "Add compaction input SSTs to VersionSet");
auto cfd = versions->GetColumnFamilySet()->GetColumnFamily(params.cf_id);
VERIFY_S(nullptr != cfd, "cf: id = %d name = %s", params.cf_id, params.cf_name);
VERIFY_S_EQ(params.cf_name, cfd->GetName());
Expand Down Expand Up @@ -1194,6 +1196,7 @@ int RunCompact(FILE* in) const {
onelevel.level, attempt_dbname, params.DebugString());
onelevel.files = std::move(populated_files);
}
ROCKS_LOG_DEBUG(info_log, "Construct Compaction object");
std::string trim_ts = "";
Compaction compaction(storage_info,
*cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), mut_dbo, inputs,
Expand Down Expand Up @@ -1231,6 +1234,7 @@ int RunCompact(FILE* in) const {
std::atomic<bool> manual_compaction_canceled{false};
#endif
BlobFileCompletionCallback* blob_callback = nullptr;
ROCKS_LOG_DEBUG(info_log, "Construct CompactionJob object");
CompactionJob compaction_job(
params.job_id, &compaction, imm_dbo, mut_dbo, file_options,
versions.get(),
Expand Down Expand Up @@ -1260,6 +1264,7 @@ int RunCompact(FILE* in) const {
#endif
blob_callback);

ROCKS_LOG_DEBUG(info_log, "Calling compaction_job.Prepare()");
compaction_job.Prepare();
MutexUnlock();
VERIFY_S_EQ(compaction.GetSmallestUserKey(), params.smallest_user_key);
Expand All @@ -1273,8 +1278,10 @@ int RunCompact(FILE* in) const {
input_version->props_of_all_tables_[std::move(fpath)] = props;
}
}
ROCKS_LOG_DEBUG(info_log, "Calling ShowCompactionParams() for start");
const std::string start_time = StrDateTimeNow();
ShowCompactionParams(params, cfd->current(), cfd, &start_time);
ROCKS_LOG_DEBUG(info_log, "Calling compaction_job.Run()");
{
Status s1 = compaction_job.Run();
IOStatus s2 = compaction_job.io_status();
Expand Down Expand Up @@ -1313,6 +1320,7 @@ auto writeObjResult = [&]{
SetResultSerDe1(cfo, table_properties_collector_factories[i]);
}
};
ROCKS_LOG_DEBUG(info_log, "Collecting compaction result");
vector<vector<const FileMetaData*> > output_files;
compaction_job.GetSubCompactOutputs(&output_files);
results->output_files.resize(output_files.size());
Expand Down Expand Up @@ -1342,6 +1350,7 @@ auto writeObjResult = [&]{
imm_dbo.statistics->GetAggregated(results->statistics.tickers,
results->statistics.histograms);
auto t1 = pf.now();
ROCKS_LOG_DEBUG(info_log, "Writing rpc.result");
results->work_time_usec = pf.us(t0, t1);
try {
string outFname = MakePath(attempt_dir, "rpc.results");
Expand All @@ -1365,6 +1374,7 @@ auto writeObjResult = [&]{
fclose(out); // must close before write compact_done_file
//INFO("after fclose(out)");
auto t2 = pf.now();
ROCKS_LOG_DEBUG(info_log, "Calling compaction_job.Install()");
{
MutexLock();
Status s = compaction_job.Install(*cfd->GetLatestMutableCFOptions());
Expand All @@ -1384,8 +1394,10 @@ auto writeObjResult = [&]{
// compact end time
auto t3 = pf.now();
const std::string end_time = StrDateTimeNow();
ROCKS_LOG_DEBUG(info_log, "Calling ShowCompactionParams() for end");
ShowCompactionParams(params, cfd->current(), cfd, &start_time, &end_time, pf.sf(t0, t3));
if (!shutting_down.load(std::memory_order_acquire)) {
ROCKS_LOG_DEBUG(info_log, "Write compact.done");
std::string compact_done_file = attempt_dir + "/compact.done";
int fd = ::creat(compact_done_file.c_str(), 0644);
if (fd < 0) {
Expand All @@ -1408,6 +1420,7 @@ auto writeObjResult = [&]{
}
//INFO("after close(compact.done)");
if (!FEE_URL.empty()) {
ROCKS_LOG_DEBUG(info_log, "Report fee");
DcompactFeeReport fee;
fee.provider = CLOUD_PROVIDER;
fee.dbId = params.db_id;
Expand All @@ -1429,6 +1442,7 @@ auto writeObjResult = [&]{
}
}
if (terark::getEnvBool("DEL_WORKER_TEMP_DB", false)) {
ROCKS_LOG_DEBUG(info_log, "Deleting %s", attempt_dbname.c_str());
std::error_code ec;
std::filesystem::remove_all(attempt_dbname, ec);
// if cur attempt is last attempt, DeleteDir will success
Expand All @@ -1443,6 +1457,8 @@ auto writeObjResult = [&]{
attempt_dbname, params.output_level, pf.sf(t0,t1), pf.mf(t1,t2), pf.mf(t2,t3),
SizeToString(inputBytes[0]), SizeToString(inputBytes[1]));
}
ROCKS_LOG_DEBUG(info_log, "Compaction done");
info_log->Flush();

return 0;
}
Expand Down

0 comments on commit 1e35877

Please sign in to comment.