Skip to content

Commit

Permalink
Merge pull request #60040 from vitlibar/use-multiple-threads-while-re…
Browse files Browse the repository at this point in the history
…ading-metadata-for-restore

Use multiple threads while reading the metadata of tables to restore
  • Loading branch information
vitlibar authored Feb 28, 2024
2 parents a04630e + 0df2a64 commit 926295f
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 216 deletions.
109 changes: 15 additions & 94 deletions src/Backups/BackupsWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ enum class BackupsWorker::ThreadPoolId
/// "RESTORE ASYNC" waits in background while the data of all tables are restored.
RESTORE_ASYNC,

/// Restores the data of tables.
RESTORE_TABLES_DATA,
/// Restores from backups.
RESTORE,
};


Expand Down Expand Up @@ -320,13 +320,13 @@ class BackupsWorker::ThreadPools

case ThreadPoolId::RESTORE_ASYNC:
case ThreadPoolId::RESTORE_ASYNC_ON_CLUSTER:
case ThreadPoolId::RESTORE_TABLES_DATA:
case ThreadPoolId::RESTORE:
{
metric_threads = CurrentMetrics::RestoreThreads;
metric_active_threads = CurrentMetrics::RestoreThreadsActive;
metric_active_threads = CurrentMetrics::RestoreThreadsScheduled;
max_threads = num_restore_threads;
use_queue = (thread_pool_id != ThreadPoolId::RESTORE_TABLES_DATA);
use_queue = (thread_pool_id != ThreadPoolId::RESTORE);
break;
}
}
Expand All @@ -347,7 +347,7 @@ class BackupsWorker::ThreadPools
auto wait_sequence = {
ThreadPoolId::RESTORE_ASYNC_ON_CLUSTER,
ThreadPoolId::RESTORE_ASYNC,
ThreadPoolId::RESTORE_TABLES_DATA,
ThreadPoolId::RESTORE,
ThreadPoolId::BACKUP_ASYNC_ON_CLUSTER,
ThreadPoolId::BACKUP_ASYNC,
ThreadPoolId::BACKUP_COPY_FILES,
Expand Down Expand Up @@ -975,7 +975,7 @@ void BackupsWorker::doRestore(
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context, getThreadPool(ThreadPoolId::RESTORE), {}};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
Expand Down Expand Up @@ -1004,103 +1004,24 @@ void BackupsWorker::doRestore(
{
restore_query->setCurrentDatabase(current_database);

/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
auto after_task_callback = [&]
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}

/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), getThreadPool(ThreadPoolId::RESTORE_TABLES_DATA), context->getProcessListElement());
maybeSleepForTesting();
setNumFilesAndSize(restore_id, backup->getNumFiles(), backup->getTotalSize(), backup->getNumEntries(),
backup->getUncompressedSize(), backup->getCompressedSize(), backup->getNumReadFiles(), backup->getNumReadBytes());
};

/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
/// Restore from the backup.
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context, getThreadPool(ThreadPoolId::RESTORE), after_task_callback};
restorer.run(RestorerFromBackup::RESTORE);
}

LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED);
}


void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool, QueryStatusPtr process_list_element)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;

auto thread_group = CurrentThread::getGroup();

for (auto & task : tasks)
{
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}

auto job = [&]()
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
CurrentThread::detachFromGroupIfNotDetached();
);

try
{
if (thread_group)
CurrentThread::attachToGroup(thread_group);

setThreadName("RestoreWorker");

{
std::lock_guard lock{mutex};
if (exception)
return;
}

if (process_list_element)
process_list_element->checkTimeLimit();

std::move(task)();

maybeSleepForTesting();

setNumFilesAndSize(
restore_id,
backup->getNumFiles(),
backup->getTotalSize(),
backup->getNumEntries(),
backup->getUncompressedSize(),
backup->getCompressedSize(),
backup->getNumReadFiles(),
backup->getNumReadBytes());
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};

thread_pool.scheduleOrThrowOnError(job);
}

{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}


void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, const String & query_id,
bool internal, QueryStatusPtr process_list_element, BackupStatus status)
{
Expand Down
Loading

0 comments on commit 926295f

Please sign in to comment.