Skip to content

Commit

Permalink
WITH CHERRY PICK CONFLICTS THAT NEED FIXING !!!!!! Prevent flush entr…
Browse files Browse the repository at this point in the history
…y that has a delete entry (#411)

currently during memtable flush, if a key has a match key in the
delete range table and this record has no snapshot related to it,
we still write it with its value to SST file.
This feature keeps only the delete record and reduce SST size for later
compaction.
  • Loading branch information
ayulas authored and udi-speedb committed Nov 19, 2023
1 parent e41758d commit d4e53a0
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 17 deletions.
7 changes: 7 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ To use this feature, pass allow_delays_and_stalls = true to the ctor of WBM (ren
setup delay requests starting from (start_delay_percent * _buffer_size) / 100 (default value is 70) (start_delay_percent is another WBM ctor parameter). Changes to the WBM's memory are tracked in WriteBufferManager::ReserveMem and FreeMem.
Once the WBM reached its capacity, writes will be stopped using the old ShouldStall() and WBMStallWrites(). (#423)

* Prevent flush entry followed delete operations
currently during memtable flush , if key has a match key in the
delete range table and this record has no snapshot related to it,
we still write it with its value to SST file.
This feature keeps only the delete record and reduce SST size for later compaction.
(#411)

### Enhancements
* CI: add a workflow for building and publishing jar to maven central (#507)
* LOG: Compaction job traces - report cf name and job id (#511)
Expand Down
83 changes: 82 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,35 @@ Status BuildTable(
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
true /* must_count_input_entries */,
/*compaction=*/nullptr, compaction_filter.get(),
<<<<<<< HEAD
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);

const size_t ts_sz = ucmp->timestamp_size();
const bool strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

std::string key_after_flush_buf;
=======
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low,
ioptions.use_clean_delete_during_flush);
const InternalKeyComparator& icmp = tboptions.internal_comparator;
auto range_del_it = range_del_agg->NewIterator();
range_del_it->SeekToFirst();
Slice last_tombstone_start_user_key{};
>>>>>>> 594f177b6... Prevent flush entry that has a delete entry (#411)
c_iter.SeekToFirst();
RangeTombstone tombstone;
std::pair<InternalKey, Slice> kv;
if (range_del_it->Valid()) {
tombstone = range_del_it->Tombstone();
kv = tombstone.Serialize();
}

for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
<<<<<<< HEAD
Slice key_after_flush = key;
// If user defined timestamps will be stripped from user key after flush,
// the in memory version of the key act logically the same as one with a
Expand All @@ -232,6 +249,64 @@ Status BuildTable(
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key_after_flush, value);
=======
auto internal_key = InternalKey(key, ikey.sequence, ikey.type);
// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
if (ioptions.use_clean_delete_during_flush &&
tboptions.reason == TableFileCreationReason::kFlush &&
ikey.type == kTypeValue) {
bool was_skipped = false;
while (range_del_it->Valid()) {
if (icmp.Compare(kv.first, internal_key) > 0) {
// the record smaller than the current range delete iter proceed as
// usual
break;
}
if ((icmp.Compare(kv.first, internal_key) <= 0) &&
(icmp.Compare(internal_key, tombstone.SerializeEndKey()) <= 0)) {
// the key is in delete range... check if we can skip it...
if (c_iter.CanBeSkipped()) {
was_skipped = true;
}
break;
} else {
// the record is above the current range delete iter. need progress
// range delete iter and check again. first update the current range
// delete iter for boundaries
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end,
tombstone.seq_, icmp);
if (version) {
if (last_tombstone_start_user_key.empty() ||
ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
range_del_it->start_key()) <
0) {
SizeApproximationOptions approx_opts;
approx_opts.files_size_error_margin = 0.1;
meta->compensated_range_deletion_size +=
versions->ApproximateSize(
approx_opts, version, kv.first.Encode(),
tombstone_end.Encode(), 0 /* start_level */,
-1 /* end_level */, TableReaderCaller::kFlush);
}
last_tombstone_start_user_key = range_del_it->start_key();
}
range_del_it->Next();
if (range_del_it->Valid()) {
tombstone = range_del_it->Tombstone();
kv = tombstone.Serialize();
}
}
}
if (was_skipped) {
continue;
}
}
s = output_validator.Add(key, value);
>>>>>>> 594f177b6... Prevent flush entry that has a delete entry (#411)
if (!s.ok()) {
break;
}
Expand All @@ -257,17 +332,23 @@ Status BuildTable(
}

if (s.ok()) {
<<<<<<< HEAD
auto range_del_it = range_del_agg->NewIterator();
Slice last_tombstone_start_user_key{};
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
=======
for (; range_del_it->Valid(); range_del_it->Next()) {
tombstone = range_del_it->Tombstone();
kv = tombstone.Serialize();
>>>>>>> 594f177b6... Prevent flush entry that has a delete entry (#411)
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
tboptions.internal_comparator);
icmp);
if (version) {
if (last_tombstone_start_user_key.empty() ||
ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
Expand Down
1 change: 1 addition & 0 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions,
//
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown.

extern Status BuildTable(
const std::string& dbname, VersionSet* versions,
const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions,
Expand Down
73 changes: 71 additions & 2 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CompactionIterator::CompactionIterator(
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno)
const SequenceNumber preclude_last_level_min_seqno, bool use_skip_delete)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
Expand All @@ -46,9 +46,15 @@ CompactionIterator::CompactionIterator(
manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
<<<<<<< HEAD
must_count_input_entries, compaction_filter, shutting_down, info_log,
full_history_ts_low, preserve_time_min_seqno,
preclude_last_level_min_seqno) {}
=======
compaction_filter, shutting_down, info_log, full_history_ts_low,
preserve_time_min_seqno, preclude_last_level_min_seqno,
use_skip_delete) {}
>>>>>>> 594f177b6... Prevent flush entry that has a delete entry (#411)

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
Expand All @@ -66,8 +72,14 @@ CompactionIterator::CompactionIterator(
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
const SequenceNumber preserve_time_min_seqno,
<<<<<<< HEAD
const SequenceNumber preclude_last_level_min_seqno)
: input_(input, cmp, must_count_input_entries),
=======
const SequenceNumber preclude_last_level_min_seqno, bool use_skip_delete)
: input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()),
>>>>>>> 594f177b6... Prevent flush entry that has a delete entry (#411)
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),
Expand Down Expand Up @@ -110,7 +122,8 @@ CompactionIterator::CompactionIterator(
cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()),
preserve_time_min_seqno_(preserve_time_min_seqno),
preclude_last_level_min_seqno_(preclude_last_level_min_seqno) {
preclude_last_level_min_seqno_(preclude_last_level_min_seqno),
use_skip_delete_(use_skip_delete) {
assert(snapshots_ != nullptr);
assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_);

Expand Down Expand Up @@ -455,6 +468,58 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
return true;
}

bool CompactionIterator::CanBeSkipped() {
if (!use_skip_delete_) {
return false;
}
key_ = input_.key();
value_ = input_.value();

// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find the earliest
// snapshot that is affected by this kv.

current_user_key_sequence_ = ikey_.sequence;
SequenceNumber last_snapshot = current_user_key_snapshot_;
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
current_user_key_snapshot_ =
visible_at_tip_
? earliest_snapshot_
: findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);

const bool is_timestamp_eligible_for_gc =
(timestamp_size_ == 0 ||
(full_history_ts_low_ && cmp_with_history_ts_low_ < 0));

if (prev_snapshot == 0 ||
DefinitelyNotInSnapshot(ikey_.sequence, prev_snapshot)) {
if (!is_timestamp_eligible_for_gc) {
// We cannot drop as timestamp is enabled, and
// timestamp of this key is greater than or equal to
// *full_history_ts_low_. .
return false;
} else if (DefinitelyInSnapshot(ikey_.sequence,
earliest_write_conflict_snapshot_) ||
(earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_))) {
// Found a matching value, we can drop the value.
// It is safe to drop record since we've already
// outputted a key in this snapshot, or there is no earlier
// snapshot
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_obsolete;
return true;
}
}

if (last_snapshot == current_user_key_snapshot_ ||
(last_snapshot > 0 && last_snapshot < current_user_key_snapshot_)) {
++iter_stats_.num_record_drop_hidden;
return true;
}
return false;
}

void CompactionIterator::NextFromInput() {
at_next_ = false;
validity_info_.Invalidate();
Expand Down Expand Up @@ -694,6 +759,10 @@ void CompactionIterator::NextFromInput() {
// try to compact out as much as we can in these cases.
// We will report counts on these anomalous cases.
//
// Optomization 4:
// Skip followed value key by a delete entry. note that the delete entry
// remains...
//
// Note: If timestamp is enabled, then record will be eligible for
// deletion, only if, along with above conditions (Rule 1 and Rule 2)
// full_history_ts_low_ is specified and timestamp for that key is less
Expand Down
8 changes: 6 additions & 2 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ class CompactionIterator {
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber,
bool use_skip_delete = false);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
Expand All @@ -234,7 +235,8 @@ class CompactionIterator {
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber,
bool use_skip_delete = false);

~CompactionIterator();

Expand Down Expand Up @@ -271,6 +273,7 @@ class CompactionIterator {
return output_to_penultimate_level_;
}
Status InputStatus() const { return input_.status(); }
bool CanBeSkipped();

bool IsDeleteRangeSentinelKey() const { return is_range_del_; }

Expand Down Expand Up @@ -502,6 +505,7 @@ class CompactionIterator {
// min seqno to preclude the data from the last level, if the key seqno larger
// than this, it will be output to penultimate level
const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
bool use_skip_delete_;

void AdvanceInputIter() { input_.Next(); }

Expand Down
12 changes: 6 additions & 6 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
ioptions.logger, column_family_id)),
range_del_table_(SkipListFactory().CreateMemTableRep(
del_table_(SkipListFactory().CreateMemTableRep(
comparator_, &arena_, nullptr /* transform */, ioptions.logger,
column_family_id)),
is_range_del_table_empty_(true),
Expand Down Expand Up @@ -160,7 +160,7 @@ MemTable::~MemTable() {
size_t MemTable::ApproximateMemoryUsage() {
autovector<size_t> usages = {
arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
range_del_table_->ApproximateMemoryUsage(),
del_table_->ApproximateMemoryUsage(),
ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
size_t total_usage = 0;
for (size_t usage : usages) {
Expand Down Expand Up @@ -197,7 +197,7 @@ bool MemTable::ShouldFlushNow() {
// If arena still have room for new block allocation, we can safely say it
// shouldn't flush.
auto allocated_memory = table_->ApproximateMemoryUsage() +
range_del_table_->ApproximateMemoryUsage() +
del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();

approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
Expand Down Expand Up @@ -375,7 +375,7 @@ class MemTableIterator : public InternalIterator {
logger_(mem.moptions_.info_log),
ts_sz_(mem.ts_sz_) {
if (use_range_del_table) {
iter_ = mem.range_del_table_->GetIterator(arena);
iter_ = mem.del_table_->GetIterator(arena);
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
!read_options.auto_prefix_mode) {
// Auto prefix mode is not implemented in memtable yet.
Expand Down Expand Up @@ -620,7 +620,7 @@ port::RWMutex* MemTable::GetLock(const Slice& key) {
MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey) {
uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
entry_count += del_table_->ApproximateNumEntries(start_ikey, end_ikey);
if (entry_count == 0) {
return {0, 0};
}
Expand Down Expand Up @@ -717,7 +717,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
val_size + moptions_.protection_bytes_per_key;
char* buf = nullptr;
std::unique_ptr<MemTableRep>& table =
type == kTypeRangeDeletion ? range_del_table_ : table_;
type == kTypeRangeDeletion ? del_table_ : table_;
KeyHandle handle = table->Allocate(encoded_len, &buf);

char* p = EncodeVarint32(buf, internal_key_size);
Expand Down
7 changes: 3 additions & 4 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ class MemTable {
// used by MemTableListVersion::MemoryAllocatedBytesExcludingLast
size_t MemoryAllocatedBytes() const {
return table_->ApproximateMemoryUsage() +
range_del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();
del_table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
}

// Returns a vector of unique random memtable entries of size 'sample_size'.
Expand Down Expand Up @@ -588,7 +587,7 @@ class MemTable {
AllocTracker mem_tracker_;
ConcurrentArena arena_;
std::unique_ptr<MemTableRep> table_;
std::unique_ptr<MemTableRep> range_del_table_;
std::unique_ptr<MemTableRep> del_table_;
std::atomic_bool is_range_del_table_empty_;

// Total data size of all data inserted
Expand Down Expand Up @@ -653,7 +652,7 @@ class MemTable {
// writes with sequence number smaller than seq are flushed.
SequenceNumber atomic_flush_seqno_;

// keep track of memory usage in table_, arena_, and range_del_table_.
// keep track of memory usage in table_, arena_, and del_table_.
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;

Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ DECLARE_int32(continuous_verification_interval);
DECLARE_int32(get_property_one_in);
DECLARE_string(file_checksum_impl);
DECLARE_bool(use_dynamic_delay);
DECLARE_bool(use_clean_delete_during_flush);

// Options for transaction dbs.
// Use TransactionDB (a.k.a. Pessimistic Transaction DB)
Expand Down
Loading

0 comments on commit d4e53a0

Please sign in to comment.