From e073de209362f87d44e40a9160ede88a11cc712d Mon Sep 17 00:00:00 2001 From: Luqun Lou Date: Thu, 28 Feb 2019 14:56:29 -0800 Subject: [PATCH] Add new class Rdb_converter to expose decode API Upstream commit ID : fb-mysql-5.6.35/e1516e7601e5a721459ab38054a8d75091736f41 PS-6865 : Merge fb-prod201903 Summary: This is first change for Read/Write Path refactor. The purpose of refactor: 1. Allow others to call encode/decode directly by decoupling encode/decode internal data/function with ha_rocksdb * decode: convert RocksDB storage format to Mysql Format * encode: convert Mysql format to RocksDb storage format 2. Allow others to hook up its own encoder/decoder This change is to move decode related data/function into Rdb_converter and ha_rocksdb will call Rdb_converter to decode. To Use class Rdb_converter to decode RocksDb storage data 1. create Rdb_converter instance 2. call setup_field_decoders(bitmap) or set_is_keyread_only 3. call decode(/*IN*/Rdb_key_def, /*IH*/ key_slice, /*IN*/value_slice, /*OUT*/ buffer); Reviewed By: yizhang82 Differential Revision: D14354790 fbshipit-source-id: c5dac92b6c1 --- storage/rocksdb/CMakeLists.txt | 1 + storage/rocksdb/ha_rocksdb.cc | 504 ++++------------------------ storage/rocksdb/ha_rocksdb.h | 77 +---- storage/rocksdb/rdb_converter.cc | 542 +++++++++++++++++++++++++++++++ storage/rocksdb/rdb_converter.h | 159 +++++++++ 5 files changed, 768 insertions(+), 515 deletions(-) create mode 100644 storage/rocksdb/rdb_converter.cc create mode 100644 storage/rocksdb/rdb_converter.h diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt index 4e3b5dbc9931..4b11b4167dee 100644 --- a/storage/rocksdb/CMakeLists.txt +++ b/storage/rocksdb/CMakeLists.txt @@ -205,6 +205,7 @@ SET(ROCKSDB_SOURCES rdb_datadic.cc rdb_datadic.h rdb_cf_options.cc rdb_cf_options.h rdb_cf_manager.cc rdb_cf_manager.h + rdb_converter.cc rdb_converter.h properties_collector.cc properties_collector.h event_listener.cc event_listener.h rdb_i_s.cc rdb_i_s.h diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 405e6046e334..ef8892dfcc01 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -76,6 +76,7 @@ #include "./logger.h" #include "./rdb_cf_manager.h" #include "./rdb_cf_options.h" +#include "./rdb_converter.h" #include "./rdb_datadic.h" #include "./rdb_i_s.h" #include "./rdb_index_merge.h" @@ -5307,7 +5308,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { // appropriately and non-index-only lookup will not read the value) const bool save_keyread_only = m_keyread_only; m_keyread_only = true; - m_key_requested = true; + m_converter->set_is_key_requested(true); if (!index_last(table->record[0])) { Field *field = @@ -5503,14 +5504,13 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton, m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr), m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr), m_tbl_def(nullptr), m_pk_descr(nullptr), m_key_descr_arr(nullptr), - m_pk_can_be_decoded(false), m_maybe_unpack_info(false), + m_pk_can_be_decoded(false), m_pk_tuple(nullptr), m_pk_packed_tuple(nullptr), m_sk_packed_tuple(nullptr), m_end_key_packed_tuple(nullptr), m_sk_match_prefix(nullptr), m_sk_match_prefix_buf(nullptr), m_sk_packed_tuple_old(nullptr), m_dup_sk_packed_tuple(nullptr), m_dup_sk_packed_tuple_old(nullptr), m_pack_buffer(nullptr), - m_lock_rows(RDB_LOCK_NONE), m_keyread_only(false), m_encoder_arr(nullptr), - m_row_checksums_checked(0) + m_lock_rows(RDB_LOCK_NONE), m_keyread_only(false) #if defined(ROCKSDB_INCLUDE_RFR) && ROCKSDB_INCLUDE_RFR , m_in_rpl_delete_rows(false), m_in_rpl_update_rows(false) @@ -5535,9 +5535,9 @@ bool ha_rocksdb::init_with_fields() { const uint key_parts = table_share->key_info[pk].user_defined_key_parts; check_keyread_allowed(m_pk_can_be_decoded, table_share, pk /*PK*/, key_parts - 1, true); - } else + } else { m_pk_can_be_decoded = false; - + } cached_table_flags = table_flags(); DBUG_RETURN(false); /* Ok */ @@ -5642,7 +5642,8 @@ void ha_rocksdb::rocksdb_skip_expired_records(const Rdb_key_def &kd, int ha_rocksdb::convert_record_to_storage_format( const struct update_row_info &row_info, rocksdb::Slice *const packed_rec) { - DBUG_ASSERT_IMP(m_maybe_unpack_info, row_info.new_pk_unpack_info); + bool maybe_unpack_info = m_converter->get_maybe_unpack_info(); + DBUG_ASSERT_IMP(maybe_unpack_info, row_info.new_pk_unpack_info); DBUG_ASSERT(m_pk_descr != nullptr); const rocksdb::Slice &pk_packed_slice = row_info.new_pk_slice; @@ -5650,12 +5651,13 @@ int ha_rocksdb::convert_record_to_storage_format( bool has_ttl = m_pk_descr->has_ttl(); bool has_ttl_column = !m_pk_descr->m_ttl_column.empty(); bool ttl_in_pk = has_ttl_column && (row_info.ttl_pk_offset != UINT_MAX); + int null_bytes_in_record = m_converter->get_null_bytes_in_record(); m_storage_record.length(0); if (has_ttl) { /* If it's a TTL record, reserve space for 8 byte TTL value in front. */ - m_storage_record.fill(ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_in_rec, 0); + m_storage_record.fill(ROCKSDB_SIZEOF_TTL_RECORD + null_bytes_in_record, 0); m_ttl_bytes_updated = false; /* @@ -5713,39 +5715,39 @@ int ha_rocksdb::convert_record_to_storage_format( } } else { /* All NULL bits are initially 0 */ - m_storage_record.fill(m_null_bytes_in_rec, 0); + m_storage_record.fill(null_bytes_in_record, 0); } // If a primary key may have non-empty unpack_info for certain values, // (m_maybe_unpack_info=true), we write the unpack_info block. The block // itself was prepared in Rdb_key_def::pack_record. - if (m_maybe_unpack_info) { + if (maybe_unpack_info) { m_storage_record.append(reinterpret_cast(pk_unpack_info->ptr()), pk_unpack_info->get_current_pos()); } - + auto encoder_arr = m_converter->get_encoder_arr(); for (uint i = 0; i < table->s->fields; i++) { /* Don't pack decodable PK key parts */ - if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { + if (encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { continue; } Field *const field = table->field[i]; - if (m_encoder_arr[i].maybe_null()) { + if (encoder_arr[i].maybe_null()) { char *data = const_cast(m_storage_record.ptr()); if (has_ttl) { data += ROCKSDB_SIZEOF_TTL_RECORD; } if (field->is_null()) { - data[m_encoder_arr[i].m_null_offset] |= m_encoder_arr[i].m_null_mask; + data[encoder_arr[i].m_null_offset] |= encoder_arr[i].m_null_mask; /* Don't write anything for NULL values */ continue; } } - if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB || - m_encoder_arr[i].m_field_type == MYSQL_TYPE_JSON) { + if (encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB || + encoder_arr[i].m_field_type == MYSQL_TYPE_JSON) { my_core::Field_blob *blob = (my_core::Field_blob *)field; /* Get the number of bytes needed to store length*/ const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; @@ -5758,7 +5760,7 @@ int ha_rocksdb::convert_record_to_storage_format( char *data_ptr; memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **)); m_storage_record.append(data_ptr, blob->get_length()); - } else if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) { + } else if (encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) { Field_varstring *const field_var = (Field_varstring *)field; uint data_len; /* field_var->length_bytes is 1 or 2 */ @@ -5825,71 +5827,6 @@ int ha_rocksdb::convert_record_to_storage_format( return HA_EXIT_SUCCESS; } -/* - @brief - Setup which fields will be unpacked when reading rows - - @detail - Three special cases when we still unpack all fields: - - When this table is being updated (m_lock_rows==RDB_LOCK_WRITE). - - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to - read all fields to find whether there is a row checksum at the end. We could - skip the fields instead of decoding them, but currently we do decoding.) - - On index merge as bitmap is cleared during that operation - - @seealso - ha_rocksdb::setup_field_converters() - ha_rocksdb::convert_record_from_storage_format() -*/ -void ha_rocksdb::setup_read_decoders() { - m_decoders_vect.clear(); - m_key_requested = false; - - int last_useful = 0; - int skip_size = 0; - - for (uint i = 0; i < table->s->fields; i++) { - // bitmap is cleared on index merge, but it still needs to decode columns - const bool field_requested = - m_lock_rows == RDB_LOCK_WRITE || m_verify_row_debug_checksums || - bitmap_is_clear_all(table->read_set) || - bitmap_is_set(table->read_set, table->field[i]->field_index); - - // We only need the decoder if the whole record is stored. - if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { - // the field potentially needs unpacking - if (field_requested) { - // the field is in the read set - m_key_requested = true; - } - continue; - } - - if (field_requested) { - // We will need to decode this field - m_decoders_vect.emplace_back(&m_encoder_arr[i], true, skip_size); - last_useful = m_decoders_vect.size(); - skip_size = 0; - } else { - if (m_encoder_arr[i].uses_variable_len_encoding() || - m_encoder_arr[i].maybe_null()) { - // For variable-length field, we need to read the data and skip it - m_decoders_vect.emplace_back(&m_encoder_arr[i], false, skip_size); - skip_size = 0; - } else { - // Fixed-width field can be skipped without looking at it. - // Add appropriate skip_size to the next field. - skip_size += m_encoder_arr[i].m_pack_length_in_rec; - } - } - } - - // It could be that the last few elements are varchars that just do - // skipping. Remove them. - m_decoders_vect.erase(m_decoders_vect.begin() + last_useful, - m_decoders_vect.end()); -} - #if !defined(DBUG_OFF) void dbug_append_garbage_at_end(rocksdb::PinnableSlice *on_disk_rec) { std::string str(on_disk_rec->data(), on_disk_rec->size()); @@ -5915,17 +5852,6 @@ void dbug_modify_rec_varchar12(rocksdb::PinnableSlice *on_disk_rec) { on_disk_rec->PinSelf(rocksdb::Slice(res)); } -void dbug_modify_key_varchar8(String &on_disk_rec) { - std::string res; - // The key starts with index number - res.append(on_disk_rec.ptr(), Rdb_key_def::INDEX_NUMBER_SIZE); - - // Then, a mem-comparable form of a varchar(8) value. - res.append("ABCDE\0\0\0\xFC", 9); - on_disk_rec.length(0); - on_disk_rec.append(res.data(), res.size()); -} - void dbug_create_err_inplace_alter() { my_printf_error(ER_UNKNOWN_ERROR, "Intentional failure in inplace alter occurred.", MYF(0)); @@ -5945,85 +5871,6 @@ int ha_rocksdb::convert_record_from_storage_format( return convert_record_from_storage_format(key, &m_retrieved_record, buf); } -int ha_rocksdb::convert_blob_from_storage_format( - my_core::Field_blob *const blob, Rdb_string_reader *const reader, - bool decode) { - /* Get the number of bytes needed to store length */ - const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; - - const char *data_len_str; - if (!(data_len_str = reader->read(length_bytes))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - memcpy(blob->ptr, data_len_str, length_bytes); - - const uint32 data_len = - blob->get_length(reinterpret_cast(data_len_str), - length_bytes, table->s->db_low_byte_first); - const char *blob_ptr; - if (!(blob_ptr = reader->read(data_len))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) { - // set 8-byte pointer to 0, like innodb does (relevant for 32-bit - // platforms) - memset(blob->ptr + length_bytes, 0, 8); - memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); - } - - return HA_EXIT_SUCCESS; -} - -int ha_rocksdb::convert_varchar_from_storage_format( - my_core::Field_varstring *const field_var, Rdb_string_reader *const reader, - bool decode) { - const char *data_len_str; - if (!(data_len_str = reader->read(field_var->length_bytes))) - return HA_ERR_ROCKSDB_CORRUPT_DATA; - - uint data_len; - /* field_var->length_bytes is 1 or 2 */ - if (field_var->length_bytes == 1) { - data_len = (uchar)data_len_str[0]; - } else { - DBUG_ASSERT(field_var->length_bytes == 2); - data_len = uint2korr(data_len_str); - } - - if (data_len > field_var->field_length) { - /* The data on disk is longer than table DDL allows? */ - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (!reader->read(data_len)) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) { - memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len); - } - - return HA_EXIT_SUCCESS; -} - -int ha_rocksdb::convert_field_from_storage_format( - my_core::Field *const field, Rdb_string_reader *const reader, bool decode, - uint len) { - const char *data_bytes; - if (len > 0) { - if ((data_bytes = reader->read(len)) == nullptr) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) - memcpy(field->ptr, data_bytes, len); - } - - return HA_EXIT_SUCCESS; -} - /* @brief Unpack the record in this->m_retrieved_record and this->m_last_rowkey from @@ -6040,8 +5887,8 @@ int ha_rocksdb::convert_field_from_storage_format( m_retrieved_record). @seealso - ha_rocksdb::setup_read_decoders() Sets up data structures which tell which - columns to decode. + rdb_converter::setup_read_decoders() Sets up data structures which tell + which columns to decode. @return 0 OK @@ -6054,249 +5901,7 @@ int ha_rocksdb::convert_record_from_storage_format( DBUG_ASSERT(key != nullptr); DBUG_ASSERT(buf != nullptr); - Rdb_string_reader reader(value); - - /* - Decode PK fields from the key - */ - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1", - dbug_modify_key_varchar8(m_last_rowkey);); - - const rocksdb::Slice rowkey_slice(m_last_rowkey.ptr(), - m_last_rowkey.length()); - const char *unpack_info = nullptr; - uint16 unpack_info_len = 0; - rocksdb::Slice unpack_slice; - - /* If it's a TTL record, skip the 8 byte TTL value */ - const char *ttl_bytes; - if (m_pk_descr->has_ttl()) { - if ((ttl_bytes = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) { - memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD); - } else { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - } - - /* Other fields are decoded from the value */ - const char *null_bytes = nullptr; - if (m_null_bytes_in_rec && !(null_bytes = reader.read(m_null_bytes_in_rec))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (m_maybe_unpack_info) { - unpack_info = reader.get_current_ptr(); - if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) || - !reader.read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - unpack_info_len = - rdb_netbuf_to_uint16(reinterpret_cast(unpack_info + 1)); - unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len); - - reader.read(unpack_info_len - - Rdb_key_def::get_unpack_header_size(unpack_info[0])); - } - - int err = HA_EXIT_SUCCESS; - if (m_key_requested) { - err = m_pk_descr->unpack_record(table, buf, &rowkey_slice, - unpack_info ? &unpack_slice : nullptr, - false /* verify_checksum */); - } - - if (err != HA_EXIT_SUCCESS) { - return err; - } - - for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) { - const Rdb_field_encoder *const field_dec = it->m_field_enc; - const bool decode = it->m_decode; - const bool isNull = - field_dec->maybe_null() && - ((null_bytes[field_dec->m_null_offset] & field_dec->m_null_mask) != 0); - - Field *const field = table->field[field_dec->m_field_index]; - - /* Skip the bytes we need to skip */ - if (it->m_skip && !reader.read(it->m_skip)) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - uint field_offset = field->ptr - table->record[0]; - uint null_offset = field->null_offset(); - bool maybe_null = field->real_maybe_null(); - field->move_field(buf + field_offset, - maybe_null ? buf + null_offset : nullptr, - field->null_bit); - // WARNING! - Don't return before restoring field->ptr and field->null_ptr! - - if (isNull) { - if (decode) { - /* This sets the NULL-bit of this record */ - field->set_null(); - /* - Besides that, set the field value to default value. CHECKSUM TABLE - depends on this. - */ - memcpy(field->ptr, table->s->default_values + field_offset, - field->pack_length()); - } - } else { - if (decode) { - field->set_notnull(); - } - - if (field_dec->m_field_type == MYSQL_TYPE_BLOB || - field_dec->m_field_type == MYSQL_TYPE_JSON) { - err = convert_blob_from_storage_format((my_core::Field_blob *)field, - &reader, decode); - } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { - err = convert_varchar_from_storage_format( - (my_core::Field_varstring *)field, &reader, decode); - } else { - err = convert_field_from_storage_format( - field, &reader, decode, field_dec->m_pack_length_in_rec); - } - } - - // Restore field->ptr and field->null_ptr - field->move_field(table->record[0] + field_offset, - maybe_null ? table->record[0] + null_offset : nullptr, - field->null_bit); - - if (err != HA_EXIT_SUCCESS) { - return err; - } - } - - if (m_verify_row_debug_checksums) { - if (reader.remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE && - reader.read(1)[0] == RDB_CHECKSUM_DATA_TAG) { - uint32_t stored_key_chksum = - rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE)); - uint32_t stored_val_chksum = - rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE)); - - const ha_checksum computed_key_chksum = - my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size()); - const ha_checksum computed_val_chksum = - my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value), - value->size() - RDB_CHECKSUM_CHUNK_SIZE); - - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", - stored_key_chksum++;); - - if (stored_key_chksum != computed_key_chksum) { - m_pk_descr->report_checksum_mismatch(true, key->data(), key->size()); - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", - stored_val_chksum++;); - if (stored_val_chksum != computed_val_chksum) { - m_pk_descr->report_checksum_mismatch(false, value->data(), - value->size()); - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - m_row_checksums_checked++; - } - if (reader.remaining_bytes()) - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - return HA_EXIT_SUCCESS; -} - -void ha_rocksdb::get_storage_type(Rdb_field_encoder *const encoder, - const uint kp) { - // STORE_SOME uses unpack_info. - if (m_pk_descr->has_unpack_info(kp)) { - DBUG_ASSERT(m_pk_descr->can_unpack(kp)); - encoder->m_storage_type = Rdb_field_encoder::STORE_SOME; - m_maybe_unpack_info = true; - } else if (m_pk_descr->can_unpack(kp)) { - encoder->m_storage_type = Rdb_field_encoder::STORE_NONE; - } -} - -/* - Setup data needed to convert table->record[] to and from record storage - format. - - @seealso - ha_rocksdb::convert_record_to_storage_format, - ha_rocksdb::convert_record_from_storage_format -*/ - -void ha_rocksdb::setup_field_converters() { - uint i; - uint null_bytes = 0; - uchar cur_null_mask = 0x1; - - DBUG_ASSERT(m_encoder_arr == nullptr); -#ifdef HAVE_PSI_INTERFACE - m_encoder_arr = static_cast( - my_malloc(rdb_handler_memory_key, - table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); -#else - m_encoder_arr = static_cast( - my_malloc(PSI_NOT_INSTRUMENTED, - table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); -#endif - if (m_encoder_arr == nullptr) { - return; - } - - for (i = 0; i < table->s->fields; i++) { - Field *const field = table->field[i]; - m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL; - - /* - Check if this field is - - a part of primary key, and - - it can be decoded back from its key image. - If both hold, we don't need to store this field in the value part of - RocksDB's key-value pair. - - If hidden pk exists, we skip this check since the field will never be - part of the hidden pk. - */ - if (!has_hidden_pk(table)) { - KEY *const pk_info = &table->key_info[table->s->primary_key]; - for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) { - /* key_part->fieldnr is counted from 1 */ - if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) { - get_storage_type(&m_encoder_arr[i], kp); - break; - } - } - } - - m_encoder_arr[i].m_field_type = field->real_type(); - m_encoder_arr[i].m_field_index = i; - m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec(); - - if (field->real_maybe_null()) { - m_encoder_arr[i].m_null_mask = cur_null_mask; - m_encoder_arr[i].m_null_offset = null_bytes; - if (cur_null_mask == 0x80) { - cur_null_mask = 0x1; - null_bytes++; - } else - cur_null_mask = cur_null_mask << 1; - } else { - m_encoder_arr[i].m_null_mask = 0; - } - } - - /* Count the last, unfinished NULL-bits byte */ - if (cur_null_mask != 0x1) - null_bytes++; - - m_null_bytes_in_rec = null_bytes; + return m_converter->decode(m_pk_descr, buf, key, value); } int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, @@ -6504,8 +6109,8 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked, "dictionary"); DBUG_RETURN(HA_ERR_ROCKSDB_INVALID_TABLE); } - m_lock_rows = RDB_LOCK_NONE; + m_lock_rows = RDB_LOCK_NONE; m_key_descr_arr = m_tbl_def->m_key_descr_arr; /* @@ -6534,7 +6139,15 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked, */ init_with_fields(); - setup_field_converters(); + /* Initialize decoder */ + m_converter = std::make_shared(ha_thd(), m_tbl_def, table); + + /* + Update m_ttl_bytes address to same as Rdb_converter's m_ttl_bytes. + Remove this code after moving convert_record_to_storage_format() into + Rdb_converter class. + */ + m_ttl_bytes = m_converter->get_ttl_bytes_buffer(); info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); @@ -6542,11 +6155,10 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked, The following load_XXX code calls row decode functions, and they do that without having done ::external_lock() or index_init()/rnd_init(). (Note: this also means we're doing a read when there was no - setup_field_converters() call) + rdb_converter::setup_field_encoders() call) Initialize the necessary variables for them: */ - m_verify_row_debug_checksums = false; /* Load auto_increment value only once on first use. */ if (table->found_next_number_field && m_tbl_def->m_auto_incr_val == 0) { @@ -6571,12 +6183,9 @@ int ha_rocksdb::close(void) { m_pk_descr = nullptr; m_key_descr_arr = nullptr; - + m_converter = nullptr; free_key_buffers(); - my_free(m_encoder_arr); - m_encoder_arr = nullptr; - if (m_table_handler != nullptr) { rdb_open_tables.release_table_handler(m_table_handler); m_table_handler = nullptr; @@ -7882,7 +7491,7 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf, rc = HA_ERR_ROCKSDB_CORRUPT_DATA; } else { rc = kd.unpack_record(table, buf, &rkey, &value, - m_verify_row_debug_checksums); + m_converter->get_verify_row_debug_checksums()); global_stats.covered_secondary_key_lookups.inc(); } } else { @@ -8002,7 +7611,8 @@ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) { m_key_descr_arr[keyno]->covers_lookup(&value, &m_lookup_bitmap); if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) { rc = m_key_descr_arr[keyno]->unpack_record( - table, buf, &key, &value, m_verify_row_debug_checksums); + table, buf, &key, &value, + m_converter->get_verify_row_debug_checksums()); global_stats.covered_secondary_key_lookups.inc(); } else { DEBUG_SYNC(ha_thd(), "rocksdb_concurrent_delete_sk"); @@ -8292,7 +7902,7 @@ int ha_rocksdb::find_icp_matching_index_rec(const bool move_forward, const rocksdb::Slice value = m_scan_it->value(); int err = kd.unpack_record(table, buf, &rkey, &value, - m_verify_row_debug_checksums); + m_converter->get_verify_row_debug_checksums()); if (err != HA_EXIT_SUCCESS) { return err; } @@ -8355,9 +7965,9 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { // Rdb_transaction *tx= get_or_create_tx(table->in_use); // tx->snapshot= nullptr; - const bool save_verify_row_debug_checksums = m_verify_row_debug_checksums; - m_verify_row_debug_checksums = true; - + bool save_verify_row_debug_checksums = + m_converter->get_verify_row_debug_checksums(); + m_converter->set_verify_row_debug_checksums(true); /* For each secondary index, check that we can get a PK value from it */ LogPluginErrMsg(INFORMATION_LEVEL, 0, "CHECKTABLE %s: Checking table %s", table_name, table_name); @@ -8372,16 +7982,17 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { ha_rows rows = 0; ha_rows checksums = 0; if (first_index) - row_checksums_at_start = m_row_checksums_checked; + row_checksums_at_start = m_converter->get_row_checksums_checked(); int res; LogPluginErrMsg(INFORMATION_LEVEL, 0, "CHECKTABLE %s: Checking index %s", table_name, table->key_info[keyno].name); while (1) { - if (!rows) + if (!rows) { res = index_first(table->record[0]); - else + } else { res = index_next(table->record[0]); + } if (res == HA_ERR_END_OF_FILE) break; @@ -8469,7 +8080,8 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { table_name, rows, checksums); if (first_index) { - row_checksums = m_row_checksums_checked - row_checksums_at_start; + row_checksums = + m_converter->get_row_checksums_checked() - row_checksums_at_start; first_index = false; } ha_index_end(); @@ -8482,7 +8094,7 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { } extra(HA_EXTRA_NO_KEYREAD); - m_verify_row_debug_checksums = save_verify_row_debug_checksums; + m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums); /* TODO: we should check also for PK records that are missing in the secondary indexes. @@ -8491,7 +8103,7 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { */ DBUG_RETURN(HA_ADMIN_OK); error: - m_verify_row_debug_checksums = save_verify_row_debug_checksums; + m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums); ha_index_or_rnd_end(); extra(HA_EXTRA_NO_KEYREAD); @@ -10039,7 +9651,10 @@ int ha_rocksdb::rnd_init(bool scan) { DBUG_ENTER_FUNC(); Rdb_transaction *const tx = get_or_create_tx(table->in_use); - setup_read_decoders(); + + // when this table is being updated, decode all fields + m_converter->setup_field_decoders(table->read_set, + m_lock_rows == RDB_LOCK_WRITE); if (scan) { m_rnd_scan_is_new_snapshot = !tx->has_snapshot(); @@ -10205,7 +9820,9 @@ int ha_rocksdb::index_init(uint idx, bool sorted) { Rdb_transaction *const tx = get_or_create_tx(table->in_use); DBUG_ASSERT(tx != nullptr); - setup_read_decoders(); + // when this table is being updated, decode all fields + m_converter->setup_field_decoders(table->read_set, + m_lock_rows == RDB_LOCK_WRITE); if (!m_keyread_only) { m_key_descr_arr[idx]->get_lookup_bitmap(table, &m_lookup_bitmap); @@ -10716,7 +10333,8 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *const thd, THR_LOCK_DATA **to, void ha_rocksdb::read_thd_vars(THD *const thd) { m_store_row_debug_checksums = THDVAR(thd, store_row_debug_checksums); - m_verify_row_debug_checksums = THDVAR(thd, verify_row_debug_checksums); + m_converter->set_verify_row_debug_checksums( + THDVAR(thd, verify_row_debug_checksums)); m_checksums_pct = THDVAR(thd, checksums_pct); } @@ -12196,9 +11814,9 @@ int ha_rocksdb::inplace_populate_sk( is used inside print_keydup_error so that the error message shows the duplicate record. */ - if (index->unpack_record(new_table_arg, new_table_arg->record[0], - &merge_key, &merge_val, - m_verify_row_debug_checksums)) { + if (index->unpack_record( + new_table_arg, new_table_arg->record[0], &merge_key, + &merge_val, m_converter->get_verify_row_debug_checksums())) { /* Should never reach here */ DBUG_ASSERT(0); } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index b206430e8c59..e652ebe036eb 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -73,6 +73,7 @@ namespace myrocks { +class Rdb_converter; class Rdb_key_def; class Rdb_tbl_def; class Rdb_transaction; @@ -173,11 +174,6 @@ class ha_rocksdb : public my_core::handler { */ mutable bool m_pk_can_be_decoded; - /* - true <=> Some fields in the PK may require unpack_info. - */ - bool m_maybe_unpack_info; - uchar *m_pk_tuple; /* Buffer for storing PK in KeyTupleFormat */ uchar *m_pk_packed_tuple; /* Buffer for storing PK in StorageFormat */ // ^^ todo: change it to 'char*'? TODO: ^ can we join this with last_rowkey? @@ -221,10 +217,13 @@ class ha_rocksdb : public my_core::handler { */ uchar *m_pack_buffer; + /* class to convert between Mysql format and RocksDB format*/ + std::shared_ptr m_converter; + /* Pointer to the original TTL timestamp value (8 bytes) during UPDATE. */ - char m_ttl_bytes[ROCKSDB_SIZEOF_TTL_RECORD]; + char *m_ttl_bytes; /* The TTL timestamp value can change if the explicit TTL column is updated. If we detect this when updating the PK, we indicate it here so @@ -348,41 +347,6 @@ class ha_rocksdb : public my_core::handler { void set_last_rowkey(const uchar *const old_data); - /* - Array of table->s->fields elements telling how to store fields in the - record. - */ - Rdb_field_encoder *m_encoder_arr; - - /* Describes instructions on how to decode the field */ - class READ_FIELD { - public: - READ_FIELD(Rdb_field_encoder *field_enc, bool decode, int skip_size) - : m_field_enc(field_enc), m_decode(decode), m_skip(skip_size) {} - /* Points to Rdb_field_encoder describing the field */ - Rdb_field_encoder *m_field_enc; - /* if true, decode the field, otherwise skip it */ - bool m_decode; - /* Skip this many bytes before reading (or skipping) this field */ - int m_skip; - }; - - /* - This tells which table fields should be decoded (or skipped) when - decoding table row from (pk, encoded_row) pair. (Secondary keys are - just always decoded in full currently) - */ - std::vector m_decoders_vect; - - /* - This tells if any field which is part of the key needs to be unpacked and - decoded. - */ - bool m_key_requested = false; - - /* Setup field_decoders based on type of scan and table->read_set */ - void setup_read_decoders(); - /* For the active index, indicates which columns must be covered for the current lookup to be covered. If the bitmap field is null, that means this @@ -390,14 +354,6 @@ class ha_rocksdb : public my_core::handler { */ MY_BITMAP m_lookup_bitmap; - /* - Number of bytes in on-disk (storage) record format that are used for - storing SQL NULL flags. - */ - uint m_null_bytes_in_rec; - - void get_storage_type(Rdb_field_encoder *const encoder, const uint kp); - void setup_field_converters(); int alloc_key_buffers(const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg, bool alloc_alter_buffers = false) @@ -412,12 +368,6 @@ class ha_rocksdb : public my_core::handler { */ Rdb_io_perf m_io_perf; - /* - A counter of how many row checksums were checked for this table. Note that - this does not include checksums for secondary index entries. - */ - my_core::ha_rows m_row_checksums_checked; - /* Update stats */ @@ -431,8 +381,6 @@ class ha_rocksdb : public my_core::handler { */ bool m_store_row_debug_checksums; - /* Same as above but for verifying checksums when reading */ - bool m_verify_row_debug_checksums; int m_checksums_pct; ha_rocksdb(my_core::handlerton *const hton, @@ -516,21 +464,6 @@ class ha_rocksdb : public my_core::handler { dd::Table *to_table_def) override MY_ATTRIBUTE((__warn_unused_result__)); - int convert_blob_from_storage_format(my_core::Field_blob *const blob, - Rdb_string_reader *const reader, - bool decode) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - - int convert_varchar_from_storage_format( - my_core::Field_varstring *const field_var, - Rdb_string_reader *const reader, bool decode) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - - int convert_field_from_storage_format(my_core::Field *const field, - Rdb_string_reader *const reader, - bool decode, uint len) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - int convert_record_from_storage_format(const rocksdb::Slice *const key, const rocksdb::Slice *const value, uchar *const buf) diff --git a/storage/rocksdb/rdb_converter.cc b/storage/rocksdb/rdb_converter.cc new file mode 100644 index 000000000000..0a730fd47f08 --- /dev/null +++ b/storage/rocksdb/rdb_converter.cc @@ -0,0 +1,542 @@ +/* + Copyright (c) 2015, Facebook, Inc. + + This program is f + i the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* This C++ file's header file */ +#include "./rdb_converter.h" + +/* Standard C++ header files */ +#include +#include +#include +#include + +/* MySQL header files */ +#include "./log.h" +#include "./my_stacktrace.h" +#include "./sql_array.h" + +/* MyRocks header files */ +#include "./ha_rocksdb.h" +#include "./rdb_datadic.h" +#include "./rdb_utils.h" +#include "./rdb_psi.h" + +namespace myrocks { + +void dbug_modify_key_varchar8(String *on_disk_rec) { + std::string res; + // The key starts with index number + res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE); + + // Then, a mem-comparable form of a varchar(8) value. + res.append("ABCDE\0\0\0\xFC", 9); + on_disk_rec->length(0); + on_disk_rec->append(res.data(), res.size()); +} + +/* + Initialize Rdb_converter with table data + @thd IN Thread context + @tbl_def IN MyRocks table definition + @table IN Current open table +*/ +Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def, + TABLE *table) + : m_thd(thd), m_tbl_def(tbl_def), m_table(table) { + DBUG_ASSERT(thd != nullptr); + DBUG_ASSERT(tbl_def != nullptr); + DBUG_ASSERT(table != nullptr); + + m_key_requested = false; + m_verify_row_debug_checksums = false; + m_maybe_unpack_info = false; + m_row_checksums_checked = 0; + + setup_field_encoders(); +} + +Rdb_converter::~Rdb_converter() { + my_free(m_encoder_arr); + m_encoder_arr = nullptr; +} + +/* + Decide storage type for each encoder +*/ +void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder, + const uint kp) { + auto pk_descr = + m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)]; + // STORE_SOME uses unpack_info. + if (pk_descr->has_unpack_info(kp)) { + DBUG_ASSERT(pk_descr->can_unpack(kp)); + encoder->m_storage_type = Rdb_field_encoder::STORE_SOME; + m_maybe_unpack_info = true; + } else if (pk_descr->can_unpack(kp)) { + encoder->m_storage_type = Rdb_field_encoder::STORE_NONE; + } +} + +/* + @brief + Setup which fields will be unpacked when reading rows + + @detail + Three special cases when we still unpack all fields: + - When client requires decode_all_fields, such as this table is being + updated (m_lock_rows==RDB_LOCK_WRITE). + - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to + read all fields to find whether there is a row checksum at the end. We could + skip the fields instead of decoding them, but currently we do decoding.) + - On index merge as bitmap is cleared during that operation + + @seealso + Rdb_converter::setup_field_encoders() + Rdb_converter::convert_record_from_storage_format() +*/ +void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map, + bool decode_all_fields) { + m_key_requested = false; + m_decoders_vect.clear(); + int last_useful = 0; + int skip_size = 0; + + for (uint i = 0; i < m_table->s->fields; i++) { + // bitmap is cleared on index merge, but it still needs to decode columns + bool field_requested = + decode_all_fields || m_verify_row_debug_checksums || + bitmap_is_clear_all(field_map) || + bitmap_is_set(field_map, m_table->field[i]->field_index); + + // We only need the decoder if the whole record is stored. + if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { + // the field potentially needs unpacking + if (field_requested) { + // the field is in the read set + m_key_requested = true; + } + continue; + } + + if (field_requested) { + // We will need to decode this field + m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size}); + last_useful = m_decoders_vect.size(); + skip_size = 0; + } else { + if (m_encoder_arr[i].uses_variable_len_encoding() || + m_encoder_arr[i].maybe_null()) { + // For variable-length field, we need to read the data and skip it + m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size}); + skip_size = 0; + } else { + // Fixed-width field can be skipped without looking at it. + // Add appropriate skip_size to the next field. + skip_size += m_encoder_arr[i].m_pack_length_in_rec; + } + } + } + + // It could be that the last few elements are varchars that just do + // skipping. Remove them. + m_decoders_vect.erase(m_decoders_vect.begin() + last_useful, + m_decoders_vect.end()); +} + +void Rdb_converter::setup_field_encoders() { + uint null_bytes = 0; + uchar cur_null_mask = 0x1; + + m_encoder_arr = static_cast( +#ifdef HAVE_PSI_INTERFACE + my_malloc(rdb_handler_memory_key, m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); +#else + my_malloc(PSI_NOT_INSTRUMENTED, m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); +#endif + if (m_encoder_arr == nullptr) { + return; + } + + for (uint i = 0; i < m_table->s->fields; i++) { + Field *const field = m_table->field[i]; + m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL; + + /* + Check if this field is + - a part of primary key, and + - it can be decoded back from its key image. + If both hold, we don't need to store this field in the value part of + RocksDB's key-value pair. + + If hidden pk exists, we skip this check since the field will never be + part of the hidden pk. + */ + if (!Rdb_key_def::table_has_hidden_pk(m_table)) { + KEY *const pk_info = &m_table->key_info[m_table->s->primary_key]; + for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) { + /* key_part->fieldnr is counted from 1 */ + if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) { + get_storage_type(&m_encoder_arr[i], kp); + break; + } + } + } + + m_encoder_arr[i].m_field_type = field->real_type(); + m_encoder_arr[i].m_field_index = i; + m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec(); + + if (field->real_maybe_null()) { + m_encoder_arr[i].m_null_mask = cur_null_mask; + m_encoder_arr[i].m_null_offset = null_bytes; + if (cur_null_mask == 0x80) { + cur_null_mask = 0x1; + null_bytes++; + } else { + cur_null_mask = cur_null_mask << 1; + } + } else { + m_encoder_arr[i].m_null_mask = 0; + } + } + + /* Count the last, unfinished NULL-bits byte */ + if (cur_null_mask != 0x1) { + null_bytes++; + } + + m_null_bytes_in_record = null_bytes; +} + +/* + EntryPoint for Decode: + Decode key slice(if requested) and value slice using built-in field + decoders + @key_def IN key definition to decode + @dst IN,OUT Mysql buffer to fill decoded content + @key_slice IN RocksDB key slice to decode + @value_slice IN RocksDB value slice to decode +*/ +int Rdb_converter::decode(const std::shared_ptr &key_def, + uchar *dst, // address to fill data + const rocksdb::Slice *key_slice, + const rocksdb::Slice *value_slice) { + // Currently only support decode primary key, Will add decode secondary later + DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY || + key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY); + + const rocksdb::Slice *updated_key_slice = key_slice; +#ifndef NDEBUG + String last_rowkey; + last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin); + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1", + { dbug_modify_key_varchar8(&last_rowkey); }); + rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length()); + updated_key_slice = &rowkey_slice; +#endif + return convert_record_from_storage_format(key_def, updated_key_slice, + value_slice, dst); +} + +/* + Convert RocksDb key slice and value slice to Mysql format + @key_def IN key definition to decode + @key_slice IN RocksDB key slice + @value_slice IN RocksDB value slice + @dst OUT MySql format address +*/ +int Rdb_converter::convert_record_from_storage_format( + const std::shared_ptr &pk_def, + const rocksdb::Slice *const key_slice, + const rocksdb::Slice *const value_slice, uchar *const dst) { + Rdb_string_reader reader(value_slice); + + const char *unpack_info = nullptr; + uint16 unpack_info_len = 0; + rocksdb::Slice unpack_slice; + + /* If it's a TTL record, skip the 8 byte TTL value */ + const char *ttl_bytes; + if (pk_def->has_ttl()) { + if ((ttl_bytes = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) { + memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD); + } else { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + } + + /* Other fields are decoded from the value */ + const char *null_bytes = nullptr; + if (m_null_bytes_in_record && + !(null_bytes = reader.read(m_null_bytes_in_record))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (m_maybe_unpack_info) { + unpack_info = reader.get_current_ptr(); + if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) || + !reader.read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + unpack_info_len = + rdb_netbuf_to_uint16(reinterpret_cast(unpack_info + 1)); + unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len); + + reader.read(unpack_info_len - + Rdb_key_def::get_unpack_header_size(unpack_info[0])); + } + + int err = HA_EXIT_SUCCESS; + if (m_key_requested) { + /* + Decode PK fields from the key + */ + err = pk_def->unpack_record(m_table, dst, key_slice, + unpack_info ? &unpack_slice : nullptr, + false /* verify_checksum */); + } + + if (err != HA_EXIT_SUCCESS) { + return err; + } + + err = convert_fields_from_storage_format(pk_def, &reader, dst, null_bytes); + if (err != HA_EXIT_SUCCESS) { + return err; + } + + if (m_verify_row_debug_checksums) { + return verify_row_debug_checksum(pk_def, &reader, key_slice, value_slice); + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert MyRocks Value slice back to Mysql format + @pk_def IN Key definition + @reader IN RocksDB value slice reader + @key IN RocksDB key slice + @value IN RocksDB value slice + @dst OUT Mysql table record buffer address + @null_bytes IN Null bits mark + */ +int Rdb_converter::convert_fields_from_storage_format( + const std::shared_ptr &pk_def, Rdb_string_reader *reader, + uchar *dst, const char *null_bytes) { + int err = HA_EXIT_SUCCESS; + for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) { + const Rdb_field_encoder *const field_dec = it->m_field_enc; + bool decode = it->m_decode; + bool isNull = + field_dec->maybe_null() && + ((null_bytes[field_dec->m_null_offset] & field_dec->m_null_mask) != 0); + + Field *const field = m_table->field[field_dec->m_field_index]; + + /* Skip the bytes we need to skip */ + if (it->m_skip && !reader->read(it->m_skip)) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + uint field_offset = field->ptr - m_table->record[0]; + uint null_offset = field->null_offset(); + bool maybe_null = field->real_maybe_null(); + field->move_field(dst + field_offset, + maybe_null ? dst + null_offset : nullptr, + field->null_bit); + // WARNING! - Don't return before restoring field->ptr and field->null_ptr! + + if (isNull) { + if (decode) { + /* This sets the NULL-bit of this record */ + field->set_null(); + /* + Besides that, set the field value to default value. CHECKSUM TABLE + depends on this. + */ + memcpy(field->ptr, m_table->s->default_values + field_offset, + field->pack_length()); + } + } else { + if (decode) { + field->set_notnull(); + } + + if (field_dec->m_field_type == MYSQL_TYPE_BLOB || + field_dec->m_field_type == MYSQL_TYPE_JSON) { + err = convert_blob_from_storage_format((my_core::Field_blob *)field, + reader, decode); + } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { + err = convert_varchar_from_storage_format( + (my_core::Field_varstring *)field, reader, decode); + } else { + err = convert_field_from_storage_format( + field, reader, decode, field_dec->m_pack_length_in_rec); + } + } + + // Restore field->ptr and field->null_ptr + field->move_field(m_table->record[0] + field_offset, + maybe_null ? m_table->record[0] + null_offset : nullptr, + field->null_bit); + + if (err != HA_EXIT_SUCCESS) { + return err; + } + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert RocksDB value slice into Mysql Record format + @blob IN,OUT Mysql blob field + @reader IN RocksDB value slice reader + @decode IN Control whether decode current field +*/ +int Rdb_converter::convert_blob_from_storage_format( + my_core::Field_blob *const blob, Rdb_string_reader *const reader, + bool decode) { + /* Get the number of bytes needed to store length*/ + const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; + + const char *data_len_str; + if (!(data_len_str = reader->read(length_bytes))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + memcpy(blob->ptr, data_len_str, length_bytes); + + const uint32 data_len = + blob->get_length(reinterpret_cast(data_len_str), + length_bytes, m_table->s->db_low_byte_first); + const char *blob_ptr; + if (!(blob_ptr = reader->read(data_len))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + // set 8-byte pointer to 0, like innodb does (relevant for 32-bit + // platforms) + memset(blob->ptr + length_bytes, 0, 8); + memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert varchar field in RocksDB storage format into Mysql Record format + @field_var IN,OUT Mysql varchar field + @reader IN RocksDB value slice reader + @decode IN Control whether decode current field +*/ +int Rdb_converter::convert_varchar_from_storage_format( + my_core::Field_varstring *const field_var, Rdb_string_reader *const reader, + bool decode) { + const char *data_len_str; + if (!(data_len_str = reader->read(field_var->length_bytes))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + uint data_len; + /* field_var->length_bytes is 1 or 2 */ + if (field_var->length_bytes == 1) { + data_len = (uchar)data_len_str[0]; + } else { + DBUG_ASSERT(field_var->length_bytes == 2); + data_len = uint2korr(data_len_str); + } + + if (data_len > field_var->field_length) { + /* The data on disk is longer than table DDL allows? */ + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (!reader->read(data_len)) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len); + } + + return HA_EXIT_SUCCESS; +} + +/* + Convert field in RocksDB storage format into Mysql Record format + @field IN,OUT Mysql field + @reader IN RocksDB value slice reader + @decode IN Control whether decode current field +*/ +int Rdb_converter::convert_field_from_storage_format( + my_core::Field *const field, Rdb_string_reader *const reader, bool decode, + uint len) { + const char *data_bytes; + if (len > 0) { + if ((data_bytes = reader->read(len)) == nullptr) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + if (decode) { + memcpy(field->ptr, data_bytes, len); + } + } + + return HA_EXIT_SUCCESS; +} + +int Rdb_converter::verify_row_debug_checksum( + const std::shared_ptr &pk_def, Rdb_string_reader *reader, + const rocksdb::Slice *key, const rocksdb::Slice *value) { + if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE && + reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) { + uint32_t stored_key_chksum = + rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE)); + uint32_t stored_val_chksum = + rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE)); + + const ha_checksum computed_key_chksum = + my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size()); + const ha_checksum computed_val_chksum = + my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value), + value->size() - RDB_CHECKSUM_CHUNK_SIZE); + + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;); + + if (stored_key_chksum != computed_key_chksum) { + pk_def->report_checksum_mismatch(true, key->data(), key->size()); + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;); + if (stored_val_chksum != computed_val_chksum) { + pk_def->report_checksum_mismatch(false, value->data(), value->size()); + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + + m_row_checksums_checked++; + } + if (reader->remaining_bytes()) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + return HA_EXIT_SUCCESS; +} + +} // namespace myrocks diff --git a/storage/rocksdb/rdb_converter.h b/storage/rocksdb/rdb_converter.h new file mode 100644 index 000000000000..ff2d3efded2f --- /dev/null +++ b/storage/rocksdb/rdb_converter.h @@ -0,0 +1,159 @@ +/* + Copyright (c) 2018, Facebook, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#pragma once + +/* C++ standard header files */ +#include +#include + +/* MySQL header files */ +#include "./handler.h" /* handler */ +#include "./my_global.h" /* ulonglong */ +#include "./sql_string.h" +#include "./ib_ut0counter.h" + +/* MyRocks header files */ +#include "./ha_rocksdb.h" +#include "./rdb_datadic.h" + +namespace myrocks { +class Rdb_field_encoder; + +/* Describes instructions on how to decode the field */ +class READ_FIELD { + public: + /* Points to Rdb_field_encoder describing the field */ + Rdb_field_encoder *m_field_enc; + /* if true, decode the field, otherwise skip it */ + bool m_decode; + /* Skip this many bytes before reading (or skipping) this field */ + int m_skip; +}; + +/* + Class to convert Mysql format to rocksdb storage format, and vice versa. +*/ +class Rdb_converter { + public: + /* + Initialize converter with table data + */ + Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def, TABLE *table); + Rdb_converter(const Rdb_converter &decoder) = delete; + Rdb_converter &operator=(const Rdb_converter &decoder) = delete; + ~Rdb_converter(); + + void setup_field_decoders(const MY_BITMAP *field_map, + bool decode_all_fields = false); + + int decode(const std::shared_ptr &key_def, uchar *dst, + const rocksdb::Slice *key_slice, + const rocksdb::Slice *value_slice); + + my_core::ha_rows get_row_checksums_checked() const { + return m_row_checksums_checked; + } + bool get_verify_row_debug_checksums() const { + return m_verify_row_debug_checksums; + } + void set_verify_row_debug_checksums(bool verify_row_debug_checksums) { + m_verify_row_debug_checksums = verify_row_debug_checksums; + } + const Rdb_field_encoder *get_encoder_arr() const { return m_encoder_arr; } + int get_null_bytes_in_record() { return m_null_bytes_in_record; } + + void set_is_key_requested(bool key_requested) { + m_key_requested = key_requested; + } + bool get_maybe_unpack_info() const { return m_maybe_unpack_info; } + + char *get_ttl_bytes_buffer() { return m_ttl_bytes; } + + private: + void setup_field_encoders(); + + void get_storage_type(Rdb_field_encoder *const encoder, const uint kp); + + int convert_record_from_storage_format( + const std::shared_ptr &pk_def, + const rocksdb::Slice *const key, const rocksdb::Slice *const value, + uchar *const buf); + + int convert_fields_from_storage_format( + const std::shared_ptr &pk_def, Rdb_string_reader *reader, + uchar *buf, const char *null_bytes); + + int convert_field_from_storage_format(my_core::Field *const field, + Rdb_string_reader *const reader, + bool decode, uint len); + int convert_varchar_from_storage_format( + my_core::Field_varstring *const field_var, + Rdb_string_reader *const reader, bool decode); + + int convert_blob_from_storage_format(my_core::Field_blob *const blob, + Rdb_string_reader *const reader, + bool decode); + + int verify_row_debug_checksum(const std::shared_ptr &pk_def, + Rdb_string_reader *reader, + const rocksdb::Slice *key, + const rocksdb::Slice *value); + + private: + /* + This tells if any field which is part of the key needs to be unpacked and + decoded. + */ + bool m_key_requested; + /* Controls whether verifying checksums during reading, This is updated from + the session variable at the start of each query. */ + bool m_verify_row_debug_checksums; + /* Thread handle*/ + const THD *m_thd; + /* MyRocks table definition*/ + const Rdb_tbl_def *m_tbl_def; + /* The current open table */ + TABLE *m_table; + /* + Number of bytes in on-disk (storage) record format that are used for + storing SQL NULL flags. + */ + int m_null_bytes_in_record; + /* + TRUE <=> Some fields in the PK may require unpack_info. + */ + bool m_maybe_unpack_info; + /* + Pointer to the original TTL timestamp value (8 bytes) during UPDATE. + */ + char m_ttl_bytes[ROCKSDB_SIZEOF_TTL_RECORD]; + /* + Array of table->s->fields elements telling how to store fields in the + record. + */ + Rdb_field_encoder *m_encoder_arr; + /* + Array of request fields telling how to decode data in RocksDB format + */ + std::vector m_decoders_vect; + /* + A counter of how many row checksums were checked for this table. Note that + this does not include checksums for secondary index entries. + */ + my_core::ha_rows m_row_checksums_checked; +}; +} // namespace myrocks