Skip to content

Commit

Permalink
Better bloom filter / iterator bounds
Browse files Browse the repository at this point in the history
Summary: We can handle bloom filter case for both forward / reverse order in the same way by finding the longest prefix of eq and initial_pos_slice - this should handle some edge cases better. Also setup iterator bounds using MyRocks helpers when bloom filter can't be used, for faster access.

Reviewed By: lth

Differential Revision: D26025928

fbshipit-source-id: f876ff5bfc4
  • Loading branch information
yizhang82 authored and facebook-github-bot committed Feb 19, 2021
1 parent fbe1dde commit 3bfd0f9
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 80 deletions.
18 changes: 11 additions & 7 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3356,7 +3356,6 @@ class Rdb_transaction {
rocksdb::Status *statuses,
const bool sorted_input) const = 0;


rocksdb::Iterator *get_iterator(
rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter,
bool fill_cache, const rocksdb::Slice &eq_cond_lower_bound,
Expand Down Expand Up @@ -14845,9 +14844,9 @@ bool ha_rocksdb::check_bloom_and_set_bounds(
@param use_all_keys True if all key parts are set with equal conditions.
This is aware of extended keys.
*/
bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
const bool use_all_keys) {
bool ha_rocksdb::can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
const bool use_all_keys) {
bool can_use = false;

if (THDVAR(thd, skip_bloom_filter_on_read)) {
Expand Down Expand Up @@ -15813,9 +15812,14 @@ const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx) {
}

rocksdb::Iterator *rdb_tx_get_iterator(
Rdb_transaction *tx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *const column_family) {
return tx->get_iterator(options, column_family);
Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family,
bool skip_bloom_filter, bool fill_cache,
const rocksdb::Slice &lower_bound_slice,
const rocksdb::Slice &upper_bound_slice, bool read_current,
bool create_snapshot) {
return tx->get_iterator(column_family, skip_bloom_filter, fill_cache,
lower_bound_slice, upper_bound_slice, read_current,
create_snapshot);
}

bool rdb_tx_started(Rdb_transaction *tx) { return tx->is_tx_started(); }
Expand Down
37 changes: 19 additions & 18 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,15 @@ class ha_rocksdb : public my_core::handler {
bool is_ascending(const Rdb_key_def &keydef,
enum ha_rkey_function find_flag) const
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
void setup_iterator_bounds(const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond, size_t bound_len,
uchar *const lower_bound, uchar *const upper_bound,
rocksdb::Slice *lower_bound_slice,
rocksdb::Slice *upper_bound_slice);
bool check_bloom_and_set_bounds(THD *thd, const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
const bool use_all_keys, size_t bound_len,
uchar *const lower_bound,
uchar *const upper_bound,
rocksdb::Slice *lower_bound_slice,
rocksdb::Slice *upper_bound_slice);
static void setup_iterator_bounds(const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
size_t bound_len, uchar *const lower_bound,
uchar *const upper_bound,
rocksdb::Slice *lower_bound_slice,
rocksdb::Slice *upper_bound_slice);
static bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
const bool use_all_keys);
void setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *slice,
const bool use_all_keys, const uint eq_cond_len)
MY_ATTRIBUTE((__nonnull__));
Expand Down Expand Up @@ -664,6 +661,12 @@ class ha_rocksdb : public my_core::handler {
// Note: the following is only used by DS-MRR, so not needed for MyRocks:
// longlong get_memory_buffer_size() const override { return 1024; }

static bool check_bloom_and_set_bounds(
THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond,
const bool use_all_keys, size_t bound_len, uchar *const lower_bound,
uchar *const upper_bound, rocksdb::Slice *lower_bound_slice,
rocksdb::Slice *upper_bound_slice);

private:
// true <=> The scan uses the default MRR implementation, just redirect all
// calls to it
Expand Down Expand Up @@ -1120,8 +1123,10 @@ Rdb_transaction *&get_tx_from_thd(THD *const thd);
const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx);

rocksdb::Iterator *rdb_tx_get_iterator(
Rdb_transaction *tx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *const column_family);
Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family,
bool skip_bloom, bool fill_cache, const rocksdb::Slice &lower_bound_slice,
const rocksdb::Slice &upper_bound_slice, bool read_current = false,
bool create_snapshot = true);

rocksdb::Status rdb_tx_get(Rdb_transaction *tx,
rocksdb::ColumnFamilyHandle *const column_family,
Expand Down Expand Up @@ -1158,10 +1163,6 @@ inline void rocksdb_smart_next(bool seek_backward,
// https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
bool is_valid_iterator(rocksdb::Iterator *scan_it);

bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
const rocksdb::Slice &eq_cond,
const bool use_all_keys);

bool rdb_tx_started(Rdb_transaction *tx);

extern std::atomic<uint64_t> rocksdb_select_bypass_executed;
Expand Down
114 changes: 59 additions & 55 deletions storage/rocksdb/nosql_access.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,23 +659,17 @@ class select_exec {
}

bool start() {
// We need get ReadOptions from tx in case the snapshot was already
// created by someone else
m_ro = rdb_tx_acquire_snapshot(m_tx);
rdb_tx_acquire_snapshot(m_tx);

return false;
}

void set_seek_mode(bool use_bloom) {
if (use_bloom) {
m_ro.prefix_same_as_start = true;
} else {
m_ro.total_order_seek = true;
}
}

rocksdb::Iterator *get_iterator(rocksdb::ColumnFamilyHandle *cf) {
return rdb_tx_get_iterator(m_tx, m_ro, cf);
rocksdb::Iterator *get_iterator(rocksdb::ColumnFamilyHandle *cf,
bool use_bloom,
const rocksdb::Slice &lower_bound,
const rocksdb::Slice &upper_bound) {
return rdb_tx_get_iterator(m_tx, cf, !use_bloom, true /* fill_cache */,
lower_bound, upper_bound);
}

rocksdb::Status get(rocksdb::ColumnFamilyHandle *cf,
Expand Down Expand Up @@ -705,7 +699,6 @@ class select_exec {
private:
THD *m_thd;
Rdb_transaction *m_tx;
rocksdb::ReadOptions m_ro;
};

struct key_index_tuple_writer {
Expand Down Expand Up @@ -747,7 +740,8 @@ class select_exec {
const Field *field, Item *item);
bool pack_cond(uint key_part_no, const sql_cond &cond, bool is_start = true);
bool send_row();
bool use_bloom_filter(rocksdb::Slice eq_slice);

bool setup_iterator(txn_wrapper *txn, const rocksdb::Slice &eq_slice);

bool handle_killed() {
if (m_thd->killed) {
Expand Down Expand Up @@ -845,6 +839,12 @@ class select_exec {
// Whether the range query (begin, end) is inclusive in begin/end
bool m_start_inclusive;
bool m_end_inclusive;

// Upper and lower bounds for iterators
std::vector<uchar> m_lower_bound_buf;
std::vector<uchar> m_upper_bound_buf;
rocksdb::Slice m_lower_bound_slice;
rocksdb::Slice m_upper_bound_slice;
};

bool select_exec::pack_index_tuple(uint key_part_no, Rdb_string_writer *writer,
Expand Down Expand Up @@ -1222,10 +1222,6 @@ bool INLINE_ATTR select_exec::scan_where() {
return false;
}

bool select_exec::use_bloom_filter(rocksdb::Slice eq_slice) {
return can_use_bloom_filter(m_thd, *m_key_def, eq_slice, m_use_full_key);
}

/*
Scan all the fields that going to be unpacked in SELECT, and figure out
do we need to just unpack the index or need to unpack the value as well
Expand Down Expand Up @@ -1485,20 +1481,36 @@ bool INLINE_ATTR select_exec::run_pk_point_query(txn_wrapper *txn) {
return false;
}

bool INLINE_ATTR select_exec::setup_iterator(txn_wrapper *txn,
const rocksdb::Slice &eq_slice) {
// Bounds needs to be least Rdb_key_def::INDEX_NUMBER_SIZE
size_t bound_len =
std::max<size_t>(eq_slice.size(), Rdb_key_def::INDEX_NUMBER_SIZE);
m_lower_bound_buf.reserve(bound_len);
m_upper_bound_buf.reserve(bound_len);
bool use_bloom = ha_rocksdb::check_bloom_and_set_bounds(
m_thd, *m_key_def, eq_slice, m_use_full_key, bound_len,
m_lower_bound_buf.data(), m_upper_bound_buf.data(), &m_lower_bound_slice,
&m_upper_bound_slice);
rocksdb::Iterator *it = txn->get_iterator(
m_key_def->get_cf(), use_bloom, m_lower_bound_slice, m_upper_bound_slice);
if (it == nullptr) {
return true;
}
m_scan_it.reset(it);
return false;
}

bool INLINE_ATTR select_exec::run_sk_point_query(txn_wrapper *txn) {
auto cf = m_key_def->get_cf();
for (auto &writer : m_key_index_tuples) {
if (unlikely(handle_killed())) {
return true;
}

rocksdb::Slice key_slice = writer.get_key_slice();
txn->set_seek_mode(use_bloom_filter(writer.get_eq_slice()));
m_scan_it.reset(txn->get_iterator(cf));
if (m_scan_it == nullptr) {
if (unlikely(setup_iterator(txn, key_slice))) {
return true;
}

m_scan_it->Seek(key_slice);

auto rkey_slice = m_scan_it->key();
Expand Down Expand Up @@ -1656,12 +1668,8 @@ bool INLINE_ATTR select_exec::run_range_query(txn_wrapper *txn) {
// If the start key is not full key - either it is missing some columns
// or it is a secondary key (which by definition is never full as it
// has PK at the end). This impacts the seeking behavior below
bool partial_start_key =
!m_index_is_pk || start_key_slice.size() == eq_slice.size();

// If we need to start at a prefix (unbounded) that is "shorter" than
// the end key
bool is_start_prefix_key = start_key_slice.size() == eq_slice.size();
bool partial_start_key = !m_index_is_pk || is_start_prefix_key;

// Range query need to support 4 combinations
// * forward cf, ascending order
Expand All @@ -1687,8 +1695,6 @@ bool INLINE_ATTR select_exec::run_range_query(txn_wrapper *txn) {
rocksdb::Slice end_pos_slice(reinterpret_cast<char *>(end_pos.data()),
end_key_slice.size());
if (m_parser.is_order_desc()) {
int eq_bytes_changed = 0;

// In reverse order, both forward cf (SeekForPrev) and reverse cf
// (Seek) have the same matrix:
// PK, A >= n --> start at n
Expand All @@ -1698,11 +1704,7 @@ bool INLINE_ATTR select_exec::run_range_query(txn_wrapper *txn) {
// >= Prefix n --> start at succ(n)
if (m_start_inclusive && partial_start_key) {
// SK A >= n and prefix
int bytes_changed =
m_key_def->successor(initial_pos.data(), initial_pos.size());
if (is_start_prefix_key) {
eq_bytes_changed = bytes_changed;
}
m_key_def->successor(initial_pos.data(), initial_pos.size());
} else if (m_index_is_pk && !m_start_inclusive) {
// PK A > n
m_key_def->predecessor(initial_pos.data(), initial_pos.size());
Expand All @@ -1721,16 +1723,6 @@ bool INLINE_ATTR select_exec::run_range_query(txn_wrapper *txn) {
m_key_def->successor(end_pos.data(), end_pos.size());
}
}

txn->set_seek_mode(use_bloom_filter(
rocksdb::Slice(eq_slice.data(), eq_slice.size() - eq_bytes_changed)));
m_scan_it.reset(txn->get_iterator(m_key_def->get_cf()));
if (m_scan_it == nullptr) {
return true;
}

rocksdb_smart_seek(!m_key_def->m_is_reverse_cf, m_scan_it.get(),
initial_pos_slice);
} else {
// In forward order, both forward cf (SeekForPrev) and reverse cf
// (Seek) have the same matrix:
Expand Down Expand Up @@ -1764,18 +1756,30 @@ bool INLINE_ATTR select_exec::run_range_query(txn_wrapper *txn) {
m_key_def->successor(end_pos.data(), end_pos.size());
}
}
}

txn->set_seek_mode(
use_bloom_filter(rocksdb::Slice(eq_slice.data(), eq_slice.size())));
m_scan_it.reset(txn->get_iterator(m_key_def->get_cf()));
if (m_scan_it == nullptr) {
return true;
}

rocksdb_smart_seek(m_key_def->m_is_reverse_cf, m_scan_it.get(),
initial_pos_slice);
// During seek, if we were to use bloom filter, we need to find the
// largest common prefix between start / eq (prefix). This is needed
// to ensure seek w/ bloom filter can correctly locate the key in
// a descending scan. For example, for such a query on SK where we
// need to issue SeekForPrev('a-c'):
// SELECT * WHERE A = a and B = b ORDER BY C DESC
// (a-b) <--- eq / prefix
// a-b-a-PK
// ...
// a-b-z
// a-b-z-PK <--- SeekForPrev('a-c') lands here
// (a-c) <--- start slice
// So in order to seek correctly, prefix bloom filter can only use 'a'
// but not 'ac' since we want to actuall land on largest 'a-b'
rocksdb::Slice prefix_slice(initial_pos_slice.data(),
initial_pos_slice.difference_offset(eq_slice));
if (unlikely(setup_iterator(txn, prefix_slice))) {
return true;
}

rocksdb_smart_seek(reverse_seek, m_scan_it.get(), initial_pos_slice);

// Make sure the slice is alive as we'll point into the slice during
// unpacking
while (true) {
Expand Down

0 comments on commit 3bfd0f9

Please sign in to comment.