Skip to content

Commit

Permalink
Issue percona#28: Change PK storage format to support unpack_info
Browse files Browse the repository at this point in the history
Summary:
This changes the PK storage format so that it can contain unpack_info.

Currently, the storage format looks like this:
  | null bits | records for non-packable or non-PK columns | checksum |

I am changing the format so that it looks like this:
  | null bits | unpack_info | records for non-packable or non-PK columns | checksum |

The `unpack_info` field looks exactly the same as the one in secondary keys. It is a one-byte tag, followed by two bytes of length information, followed by the actual unpack information.

Note that this field may be missing if none of the key columns have a make unpack function set.

The main change that needed to be done to support this is to have the `m_make_unpack_info_func` function be available in both `Rdb_field_packing` for `Rdb_key_def::pack_record` and `Rdb_field_encoder` for `ha_rocksdb::convert_record_to_storage_format`.

Note that this change isn't active since `PRIMARY_FORMAT_VERSION_UNPACK_INFO` is not the latest.

Reviewed By: jkedgar

Differential Revision: https://reviews.facebook.net/D59469
  • Loading branch information
lth authored and inikep committed Apr 9, 2024
1 parent a243c2b commit 679b011
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 114 deletions.
136 changes: 118 additions & 18 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3359,6 +3359,7 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *hton,
m_scan_it_skips_bloom(false), m_tbl_def(nullptr),
m_pk_descr(nullptr), m_key_descr_arr(nullptr),
m_pk_can_be_decoded(false),
m_maybe_unpack_info(false),
m_pk_tuple(nullptr), m_pk_packed_tuple(nullptr),
m_sk_packed_tuple(nullptr), m_sk_tails(nullptr),
m_sk_match_prefix(nullptr), m_sk_match_prefix_buf(nullptr),
Expand All @@ -3382,6 +3383,49 @@ const char **ha_rocksdb::bas_ext() const
}


void ha_rocksdb::write_unpack_info()
{
if (m_pk_descr->m_kv_format_version >=
Rdb_key_def::PRIMARY_FORMAT_VERSION_UNPACK_INFO &&
m_maybe_unpack_info)
{
uint offset= m_null_bytes_in_rec;
// We need to iterate over fields that require unpack_info first, so that
// unpack_info appears at the beginning of the storage record.
uint packed_key_len= m_pk_descr->max_storage_fmt_length();
m_storage_record.reserve(packed_key_len + RDB_UNPACK_HEADER_SIZE);
m_storage_record[offset]= RDB_UNPACK_DATA_TAG;
uint unpack_info_len_offset= offset + 1;
offset+= RDB_UNPACK_HEADER_SIZE;

uint unpack_len= 0;
for (uint i= 0; i < table->s->fields; i++)
{
if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_SOME)
{
continue;
}

Rdb_make_unpack_field& upf= m_encoder_arr[i].m_make_unpack_field;

DBUG_ASSERT(upf.m_make_unpack_info_func != nullptr);

Field *field= table->field[i];
uint len= upf.m_make_unpack_info_func(&upf, field,
reinterpret_cast<uchar *>(&m_storage_record[offset]));
DBUG_ASSERT(len <= (uint)m_encoder_arr[i].m_pack_length_in_rec);
unpack_len += len;
offset+= len;
}
DBUG_ASSERT(offset ==
m_null_bytes_in_rec + unpack_len + RDB_UNPACK_HEADER_SIZE);
m_storage_record.length(offset);
rdb_netbuf_store_uint16(
reinterpret_cast<uchar *>(&m_storage_record[unpack_info_len_offset]),
unpack_len + RDB_UNPACK_HEADER_SIZE);
}
}

/**
Convert record from table->record[0] form into a form that can be written
into rocksdb.
Expand All @@ -3399,11 +3443,16 @@ void ha_rocksdb::convert_record_to_storage_format(
/* All NULL bits are initially 0 */
m_storage_record.fill(m_null_bytes_in_rec, 0);

// Write unpack info (if needed).
write_unpack_info();

for (uint i=0; i < table->s->fields; i++)
{
/* Don't pack decodable PK key parts */
if (m_encoder_arr[i].m_dont_store)
if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL)
{
continue;
}

Field *field= table->field[i];
if (m_encoder_arr[i].maybe_null())
Expand Down Expand Up @@ -3502,8 +3551,8 @@ void ha_rocksdb::setup_read_decoders()

for (uint i= 0; i < table->s->fields; i++)
{
/* PK decoding is not handled, yet */
if (m_encoder_arr[i].m_dont_store)
// We only need the decoder if the whole record is stored.
if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL)
{
continue;
}
Expand Down Expand Up @@ -3637,23 +3686,46 @@ int ha_rocksdb::convert_record_from_storage_format(const rocksdb::Slice *key,
DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
dbug_modify_key_varchar8(m_last_rowkey););

rocksdb::Slice rowkey_slice(m_last_rowkey.ptr(), m_last_rowkey.length());

const rocksdb::Slice rowkey_slice(m_last_rowkey.ptr(),
m_last_rowkey.length());
const char *unpack_info= nullptr;
uint16 unpack_info_len= 0;
rocksdb::Slice unpack_slice;

if (m_pk_descr->unpack_record(table, buf, &rowkey_slice,
nullptr, m_verify_checksums))
/* Other fields are decoded from the value */
const char *null_bytes= nullptr;
if (m_null_bytes_in_rec && !(null_bytes= reader.read(m_null_bytes_in_rec)))
{
return HA_ERR_INTERNAL_ERROR;
}

/* Other fields are decoded from the value */
const char * UNINIT_VAR(null_bytes);
if (m_null_bytes_in_rec && !(null_bytes= reader.read(m_null_bytes_in_rec)))
if (m_maybe_unpack_info && m_pk_descr->m_kv_format_version >=
Rdb_key_def::PRIMARY_FORMAT_VERSION_UNPACK_INFO)
{
unpack_info= reader.read(RDB_UNPACK_HEADER_SIZE);

if (!unpack_info || unpack_info[0] != RDB_UNPACK_DATA_TAG)
{
return HA_ERR_INTERNAL_ERROR;
}

unpack_info_len= rdb_netbuf_to_uint16(
reinterpret_cast<const uchar *>(unpack_info + 1));
unpack_slice= rocksdb::Slice(unpack_info, unpack_info_len);

reader.read(unpack_info_len - RDB_UNPACK_HEADER_SIZE);
}

if (m_pk_descr->unpack_record(table, buf, &rowkey_slice,
unpack_info ? &unpack_slice : nullptr,
false /* verify_checksum */))
{
return HA_ERR_INTERNAL_ERROR;
}

for (auto it= m_decoders_vect.begin(); it != m_decoders_vect.end(); it++)
{
const FIELD_ENCODER* const field_dec= it->m_field_enc;
const Rdb_field_encoder* const field_dec= it->m_field_enc;
const bool decode= it->m_decode;
bool isNull = field_dec->maybe_null() &&
((null_bytes[field_dec->m_null_offset] & field_dec->m_null_mask) != 0);
Expand Down Expand Up @@ -3813,6 +3885,37 @@ int ha_rocksdb::convert_record_from_storage_format(const rocksdb::Slice *key,
return 0;
}

void ha_rocksdb::get_storage_type(Rdb_field_encoder *encoder, uint kp)
{
if (m_pk_descr->m_kv_format_version >=
Rdb_key_def::PRIMARY_FORMAT_VERSION_UNPACK_INFO)
{
// STORE_SOME uses unpack_info, so our key must be of version
// higher than PRIMARY_FORMAT_VERSION_UNPACK_INFO to use this
// feature.
if (m_pk_descr->has_unpack_info(kp))
{
DBUG_ASSERT(m_pk_descr->can_unpack(kp));
encoder->m_storage_type= Rdb_field_encoder::STORE_SOME;
encoder->m_make_unpack_field=
m_pk_descr->get_make_unpack_field(kp);
m_maybe_unpack_info= true;
}
else if (m_pk_descr->can_unpack(kp))
{
encoder->m_storage_type= Rdb_field_encoder::STORE_NONE;
}
}
else
{
// For old versions, we can only store none if there is no
// unpack_info (and it's unpackable).
if (m_pk_descr->can_unpack(kp) && !m_pk_descr->has_unpack_info(kp))
{
encoder->m_storage_type= Rdb_field_encoder::STORE_NONE;
}
}
}

/*
Setup data needed to convert table->record[] to and from record storage format.
Expand All @@ -3829,8 +3932,8 @@ void ha_rocksdb::setup_field_converters()
uchar cur_null_mask= 0x1;

DBUG_ASSERT(m_encoder_arr == nullptr);
m_encoder_arr= static_cast<FIELD_ENCODER*>(
my_malloc(table->s->fields * sizeof(FIELD_ENCODER), MYF(0)));
m_encoder_arr= static_cast<Rdb_field_encoder*>(
my_malloc(table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
if (m_encoder_arr == nullptr)
{
return;
Expand All @@ -3839,7 +3942,7 @@ void ha_rocksdb::setup_field_converters()
for (i= 0; i < table->s->fields; i++)
{
Field *field= table->field[i];
m_encoder_arr[i].m_dont_store= false;
m_encoder_arr[i].m_storage_type= Rdb_field_encoder::STORE_ALL;

/*
Check if this field is
Expand All @@ -3860,10 +3963,7 @@ void ha_rocksdb::setup_field_converters()
/* key_part->fieldnr is counted from 1 */
if (field->field_index + 1 == pk_info->key_part[kp].fieldnr)
{
if (m_pk_descr->can_unpack(kp))
{
m_encoder_arr[i].m_dont_store= true;
}
get_storage_type(&m_encoder_arr[i], kp);
break;
}
}
Expand Down
53 changes: 15 additions & 38 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ struct Rdb_table_handler
class Rdb_key_def;
class Rdb_tbl_def;
class Rdb_transaction;
class Rdb_field_encoder;

const char *const rocksdb_hton_name= "ROCKSDB";

Expand Down Expand Up @@ -285,6 +286,11 @@ class ha_rocksdb: public my_core::handler
*/
mutable bool m_pk_can_be_decoded;

/*
TRUE <=> Some fields in the PK may require unpack_info.
*/
bool m_maybe_unpack_info;

uchar *m_pk_tuple; /* Buffer for storing PK in KeyTupleFormat */
uchar *m_pk_packed_tuple; /* Buffer for storing PK in StorageFormat */
// ^^ todo: change it to 'char*'? TODO: ^ can we join this with last_rowkey?
Expand Down Expand Up @@ -428,53 +434,18 @@ class ha_rocksdb: public my_core::handler

void set_last_rowkey(const uchar *old_data);

/*
Descriptor telling how to decode/encode a field to on-disk record storage
format. Not all information is in the structure yet, but eventually we
want to have as much as possible there to avoid virtual calls.
For encoding/decoding of index tuples, see Rdb_key_def.
*/
typedef struct st_field_encoder
{
/*
This is set to true for columns of Primary Key that can be decoded from
their mem-comparable form.
Since we can get them from the key part of RocksDB key->value pair, we
don't need to store them in the value part.
*/
bool m_dont_store;

uint m_null_offset;
uint16 m_field_index;

uchar m_null_mask; // 0 means the field cannot be null

my_core::enum_field_types m_field_type;

uint m_pack_length_in_rec;

bool maybe_null() const { return m_null_mask != 0; }

bool uses_variable_len_encoding() const
{
return (m_field_type == MYSQL_TYPE_BLOB ||
m_field_type == MYSQL_TYPE_VARCHAR);
}
} FIELD_ENCODER;

/*
Array of table->s->fields elements telling how to store fields in the
record.
*/
FIELD_ENCODER *m_encoder_arr;
Rdb_field_encoder *m_encoder_arr;

/* Describes instructions on how to decode the field */
class READ_FIELD
{
public:
/* Points to FIELD_ENCODER describing the field */
FIELD_ENCODER* m_field_enc;
/* Points to Rdb_field_encoder describing the field */
Rdb_field_encoder* m_field_enc;
/* if true, decode the field, otherwise skip it */
bool m_decode;
/* Skip this many bytes before reading (or skipping) this field */
Expand All @@ -497,6 +468,7 @@ class ha_rocksdb: public my_core::handler
*/
uint m_null_bytes_in_rec;

void get_storage_type(Rdb_field_encoder *encoder, uint kp);
void setup_field_converters();

// the buffer size should be at least 2*Rdb_key_def::INDEX_NUMBER_SIZE
Expand All @@ -518,6 +490,11 @@ class ha_rocksdb: public my_core::handler
*/
void update_stats(void);

/*
Helper for convert_record_to_storage_format for writing unpack_info.
*/
void write_unpack_info(void);

public:
/*
Controls whether writes include checksums. This is updated from the session variable
Expand Down
Loading

0 comments on commit 679b011

Please sign in to comment.