Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTracing PR4] Store FSWritableFilePtr object in WritableFileWriter #7193

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {

Statistics* const statistics = immutable_cf_options_->statistics;

std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, *file_options_,
env_, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_path, *file_options_, env_,
nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));

std::unique_ptr<BlobLogWriter> blob_log_writer(
new BlobLogWriter(std::move(file_writer), env_, statistics,
Expand Down
8 changes: 5 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ Status BuildTable(
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
bool paranoid_file_checks, InternalStats* internal_stats,
TableFileCreationReason reason, IOStatus* io_status,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
const std::shared_ptr<IOTracer>& io_tracer, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time, const std::string& db_id,
Expand Down Expand Up @@ -143,8 +144,9 @@ Status BuildTable(
file->SetWriteLifeTimeHint(write_hint);

file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, env, ioptions.statistics,
ioptions.listeners, ioptions.file_checksum_gen_factory));
std::move(file), fname, file_options, env, io_tracer,
ioptions.statistics, ioptions.listeners,
ioptions.file_checksum_gen_factory));

builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ extern Status BuildTable(
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
IOStatus* io_status, EventLogger* event_logger = nullptr, int job_id = 0,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
Expand Down
9 changes: 5 additions & 4 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ CompactionJob::CompactionJob(
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
io_tracer_(io_tracer),
fs_(db_options.fs, io_tracer),
file_options_for_read_(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
Expand Down Expand Up @@ -1592,10 +1593,10 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, file_options_,
env_, db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get()));
sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), fname, file_options_, env_, io_tracer_,
db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get()));

// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class CompactionJob {
const FileOptions file_options_;

Env* env_;
std::shared_ptr<IOTracer> io_tracer_;
FileSystemPtr fs_;
// env_option optimized for compaction table reads
FileOptions file_options_for_read_;
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
db_id_, db_session_id_);
io_tracer_, db_id_, db_session_id_);
FileMetaData file_meta;

TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
Expand Down Expand Up @@ -359,7 +359,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri, db_id_, db_session_id_));
thread_pri, io_tracer_, db_id_, db_session_id_));
jobs.back()->PickMemTable();
}

Expand Down
17 changes: 9 additions & 8 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, file_options, env_, nullptr /* stats */,
immutable_db_options_.listeners));
std::move(file), manifest, file_options, env_, io_tracer_,
nullptr /* stats */, immutable_db_options_.listeners));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
Expand Down Expand Up @@ -1330,9 +1330,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutable_cf_options.sample_for_compression,
mutable_cf_options.compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, db_id_, db_session_id_);
io_tracer_, &event_logger_, job_id, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
db_id_, db_session_id_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
Expand Down Expand Up @@ -1436,9 +1437,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
lfile->SetPreallocationBlockSize(preallocate_block_size);

const auto& listeners = immutable_db_options_.listeners;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
env_, nullptr /* stats */, listeners));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options, env_, io_tracer_,
nullptr /* stats */, listeners));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);
Expand Down
37 changes: 18 additions & 19 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,21 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const uint64_t* max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::string& db_id,
const std::string& db_session_id)
FlushJob::FlushJob(
const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
Expand Down Expand Up @@ -126,7 +124,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false),
thread_pri_(thread_pri) {
thread_pri_(thread_pri),
io_tracer_(io_tracer) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
Expand Down Expand Up @@ -400,7 +399,7 @@ Status FlushJob::WriteLevel0Table() {
output_compression_, mutable_cf_options_.sample_for_compression,
mutable_cf_options_.compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, &io_s, event_logger_,
TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
creation_time, oldest_key_time, write_hint, current_time, db_id_,
db_session_id_);
Expand Down
5 changes: 4 additions & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class FlushJob {
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::string& db_id = "",
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id = "",
const std::string& db_session_id = "");

~FlushJob();
Expand Down Expand Up @@ -161,6 +162,8 @@ class FlushJob {
bool pick_memtable_called;
Env::Priority thread_pri_;
IOStatus io_status_;

const std::shared_ptr<IOTracer> io_tracer_;
};

} // namespace ROCKSDB_NAMESPACE
66 changes: 33 additions & 33 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ TEST_F(FlushJobTest, Empty) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
Expand Down Expand Up @@ -216,14 +216,14 @@ TEST_F(FlushJobTest, NonEmpty) {

EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);

HistogramData hist;
FileMetaData file_meta;
Expand Down Expand Up @@ -278,14 +278,14 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
assert(memtable_ids.size() == num_mems);
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
&flush_memtable_id, env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
HistogramData hist;
FileMetaData file_meta;
mutex_.Lock();
Expand Down Expand Up @@ -357,7 +357,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
false /* sync_output_directory */, false /* write_manifest */,
Env::Priority::USER));
Env::Priority::USER, nullptr /*IOTracer*/));
k++;
}
HistogramData hist;
Expand Down Expand Up @@ -466,14 +466,14 @@ TEST_F(FlushJobTest, Snapshots) {

EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots,
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());
Expand Down
10 changes: 5 additions & 5 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,11 @@ class Repairer {
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber,
snapshot_checker, kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
0 /* file_creation_time */, "DB Repairer" /* db_id */,
db_session_id_);
TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
"DB Repairer" /* db_id */, db_session_id_);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
Expand Down
Loading