Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Import sst files #5495

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ set(SOURCES
db/flush_job.cc
db/flush_scheduler.cc
db/forward_iterator.cc
db/import_column_family_job.cc
db/internal_stats.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ TESTS = \
plain_table_db_test \
comparator_db_test \
external_sst_file_test \
import_column_family_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
Expand Down Expand Up @@ -575,6 +576,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
import_column_family_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
Expand Down Expand Up @@ -1272,6 +1274,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

import_column_family_test: db/import_column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ cpp_library(
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/import_column_family_job.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",
Expand Down
9 changes: 9 additions & 0 deletions db/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

private:
friend class DB;
Expand Down
122 changes: 122 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/import_column_family_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/job_context.h"
Expand Down Expand Up @@ -3877,6 +3878,127 @@ Status DBImpl::IngestExternalFiles(
return status;
}

Status DBImpl::CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
assert(*handle == nullptr);
std::string cf_comparator_name = options.comparator->Name();
if (cf_comparator_name != metadata.db_comparator_name) {
return Status::InvalidArgument("Comparator name mismatch");
}

// Create column family.
auto status = CreateColumnFamily(options, column_family_name, handle);
if (!status.ok()) {
return status;
}

// Import sst files from metadata.
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
import_options, metadata.files);

SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Don't import files when there is a bg_error
status = error_handler_.GetBGError();
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();

if (status.ok()) {
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number =
versions_->FetchAddFileNumber(metadata.files.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
}
}
dummy_sv_ctx.Clean();

if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}

if (status.ok()) {
SuperVersionContext sv_context(true /*create_superversion*/);
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

num_running_ingest_file_++;
assert(!cfd->IsDropped());
status = import_job.Run();

// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
}
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}
}
// mutex_ is unlocked here

sv_context.Clean();
}

{
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
}

import_job.Cleanup(status);
if (!status.ok()) {
DropColumnFamily(*handle);
DestroyColumnFamilyHandle(*handle);
*handle = nullptr;
}
return status;
}

Status DBImpl::VerifyChecksum() {
Status s;
std::vector<ColumnFamilyData*> cfd_list;
Expand Down
11 changes: 10 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/import_column_family_job.h"
#include "db/internal_stats.h"
#include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h"
Expand Down Expand Up @@ -324,6 +325,13 @@ class DBImpl : public DB {
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) override;

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) override;

virtual Status VerifyChecksum() override;

using DB::StartTrace;
Expand Down Expand Up @@ -1783,7 +1791,8 @@ class DBImpl : public DB {

std::string db_absolute_path_;

// Number of running IngestExternalFile() calls.
// Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
// calls.
// REQUIRES: mutex held
int num_running_ingest_file_;

Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

private:
friend class DB;

Expand Down
10 changes: 10 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,16 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented");
}

using DB::CreateColumnFamilyWithImport;
virtual Status CreateColumnFamilyWithImport(
const ColumnFamilyOptions& /*options*/,
const std::string& /*column_family_name*/,
const ImportColumnFamilyOptions& /*import_options*/,
const ExportImportFilesMetaData& /*metadata*/,
ColumnFamilyHandle** /*handle*/) override {
return Status::NotSupported("Not implemented.");
}

Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
Expand Down
Loading