-
Notifications
You must be signed in to change notification settings - Fork 171
/
Copy pathblob_index_merge_operator.h
95 lines (85 loc) · 2.79 KB
/
blob_index_merge_operator.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once
#include "rocksdb/merge_operator.h"
#include "blob_file_set.h"
namespace rocksdb {
namespace titandb {
class BlobIndexMergeOperator : public MergeOperator {
public:
BlobIndexMergeOperator() = default;
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
Status s;
if (merge_in.existing_value && merge_in.value_type == kTypeValue) {
merge_out->new_type = kTypeValue;
merge_out->existing_operand = *merge_in.existing_value;
return true;
}
BlobIndex existing_index;
bool existing_index_valid = false;
if (merge_in.existing_value) {
assert(merge_in.value_type == kTypeBlobIndex);
Slice copy = *merge_in.existing_value;
s = existing_index.DecodeFrom(©);
if (!s.ok()) {
return false;
}
existing_index_valid = !BlobIndex::IsDeletionMarker(existing_index);
}
if (!existing_index_valid) {
// this must be a deleted key
merge_out->new_type = kTypeBlobIndex;
merge_out->new_value.clear();
BlobIndex::EncodeDeletionMarkerTo(&merge_out->new_value);
return true;
}
MergeBlobIndex index;
BlobIndex merge_index;
SequenceNumber latest_sequence;
bool filled = false;
for (auto operand : merge_in.operand_list) {
s = index.DecodeFrom(&operand);
if (!s.ok()) {
return false;
}
// if any merge is sourced from base index, then the base index must
// be stale.
if (existing_index_valid &&
index.source_file_number == existing_index.file_number) {
existing_index_valid = false;
}
// if base index is still valid, merges must be sourced from older
// modifies.
if (!existing_index_valid) {
if (!filled) {
merge_index = index;
latest_sequence = index.sequence;
filled = true;
} else if (latest_sequence <= index.sequence) {
// notice the equal case here, merge on merge is possible
latest_sequence = index.sequence;
merge_index = index;
} else {
// the merge comes from an older modify.
}
}
}
merge_out->new_type = kTypeBlobIndex;
if (existing_index_valid) {
merge_out->existing_operand = *merge_in.existing_value;
} else {
assert(filled);
merge_out->new_value.clear();
merge_index.EncodeTo(&merge_out->new_value);
}
return true;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
return false;
}
const char* Name() const override { return "BlobGCOperator"; }
};
} // namespace titandb
} // namespace rocksdb