diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 3a5f9f6b2966..998024a66617 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -836,7 +836,7 @@ class Primary_key_comparator : public rocksdb::Comparator /* The following is not needed by RocksDB, but conceptually should be here: */ static ulong get_hashnr(const char *key, size_t key_len); - const char* Name() const { return "RocksDB_SE_v3.5"; } + const char* Name() const { return "RocksDB_SE_v3.6"; } //TODO: advanced funcs: // - FindShortestSeparator @@ -853,7 +853,7 @@ class Reverse_comparator : public rocksdb::Comparator { return -bytewise_compare(a,b); } - const char* Name() const { return "rev:RocksDB_SE_v3.5"; } + const char* Name() const { return "rev:RocksDB_SE_v3.6"; } void FindShortestSeparator(std::string* start, const rocksdb::Slice& limit) const {} void FindShortSuccessor(std::string* key) const {} }; diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc index 96396a13a08e..2f8299dd3dfe 100644 --- a/storage/rocksdb/rdb_datadic.cc +++ b/storage/rocksdb/rdb_datadic.cc @@ -173,8 +173,14 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) m_key_parts= key_info->actual_key_parts; if (keyno != tbl->s->primary_key && (key_info->flags & HA_NOSAME)) { - // From SQL layer's point of view, Unique secondary indexes do not - // have primary key columns at the end. Internally, they do. + /* + From SQL layer's point of view, Unique secondary indexes do not + have primary key columns at the end. Internally, they do. + + The loop below will attempt to attach all of the PK columns at the end + of the key, skipping those that are already present in the explicit FK + definition. + */ m_key_parts += n_pk_key_parts; unique_secondary_index= true; } @@ -191,42 +197,68 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) int unpack_len= 0; KEY_PART_INFO *key_part= key_info->key_part; int max_part_len= 0; + bool simulating_extkey_for_unique= false; + uint dst_i= 0; /* this loop also loops over the 'extended key' tail */ - for (uint i= 0; i < m_key_parts; i++) + for (uint src_i= 0; src_i < m_key_parts; src_i++) { Field *field= key_part->field; + if (simulating_extkey_for_unique) + { + /* Check if this field is already present in the key definition */ + bool found= false; + for (uint j= 0; j < key_info->actual_key_parts; j++) + { + if (field->field_index == key_info->key_part[j].field->field_index) + { + found= true; + break; + } + } + + if (found) + { + key_part++; + continue; + } + } + if (field->real_maybe_null()) max_len +=1; // NULL-byte - pack_info[i].setup(field); - pack_info[i].unpack_data_offset= unpack_len; + pack_info[dst_i].setup(field); + pack_info[dst_i].unpack_data_offset= unpack_len; if (pk_info) { - pk_part_no[i]= -1; + pk_part_no[dst_i]= -1; for (uint j= 0; j < n_pk_key_parts; j++) { if (field->field_index == pk_info->key_part[j].field->field_index) { - pk_part_no[i]= j; + pk_part_no[dst_i]= j; break; } } } - max_len += pack_info[i].max_image_len; - unpack_len += pack_info[i].unpack_data_len; + max_len += pack_info[dst_i].max_image_len; + unpack_len += pack_info[dst_i].unpack_data_len; - max_part_len= std::max(max_part_len, pack_info[i].max_image_len); + max_part_len= std::max(max_part_len, pack_info[dst_i].max_image_len); key_part++; /* For "unique" secondary indexes, pretend they have "index extensions" */ - if (unique_secondary_index && i+1 == key_info->actual_key_parts) + if (unique_secondary_index && src_i+1 == key_info->actual_key_parts) { + simulating_extkey_for_unique= true; key_part= pk_info->key_part; } + + dst_i++; } + m_key_parts= dst_i; maxlength= max_len; unpack_data_len= unpack_len; } @@ -372,20 +404,6 @@ void RDBSE_KEYDEF::successor(uchar *packed_tuple, uint len) } } -static Field *get_field_by_keynr(TABLE *tbl, KEY *key_info, uint part) -{ - if (part < key_info->actual_key_parts) - { - return key_info->key_part[part].field; - } - else - { - uint pk= tbl->s->primary_key; - KEY *pk_info= &tbl->key_info[pk]; - return pk_info->key_part[part - key_info->actual_key_parts].field; - } -} - /* Get index columns from the record and pack them into mem-comparable form. @@ -419,7 +437,6 @@ uint RDBSE_KEYDEF::pack_record(TABLE *tbl, { uchar *tuple= packed_tuple; uchar *unpack_end= unpack_info; - KEY *key_info= &tbl->key_info[keyno]; store_index_number(tuple, index_number); tuple += INDEX_NUMBER_SIZE; @@ -436,7 +453,7 @@ uint RDBSE_KEYDEF::pack_record(TABLE *tbl, for (uint i=0; i < n_key_parts; i++) { - Field *field= get_field_by_keynr(tbl, key_info, i); + Field *field= tbl->field[pack_info[i].fieldnr]; // Old Field methods expected the record pointer to be at tbl->record[0]. // The quick and easy way to fix this was to pass along the offset @@ -583,8 +600,6 @@ int RDBSE_KEYDEF::unpack_record(TABLE *table, uchar *buf, const rocksdb::Slice *packed_key, const rocksdb::Slice *unpack_info) { - KEY * const key_info= &table->key_info[keyno]; - Stream_reader reader(packed_key); const uchar * const unpack_ptr= (const uchar*)unpack_info->data(); @@ -603,7 +618,7 @@ int RDBSE_KEYDEF::unpack_record(TABLE *table, uchar *buf, for (uint i= 0; i < m_key_parts ; i++) { Field_pack_info *fpi= &pack_info[i]; - Field *field= get_field_by_keynr(table, key_info, i); + Field *field= table->field[pack_info[i].fieldnr]; if (fpi->unpack_func) { @@ -985,6 +1000,7 @@ bool Field_pack_info::setup(Field *field) int res= false; enum_field_types type= field->real_type(); + fieldnr= field->field_index; maybe_null= field->real_maybe_null(); make_unpack_info_func= NULL; unpack_func= NULL; diff --git a/storage/rocksdb/rdb_datadic.h b/storage/rocksdb/rdb_datadic.h index 09c4aa43d810..5a5153a8cfa8 100644 --- a/storage/rocksdb/rdb_datadic.h +++ b/storage/rocksdb/rdb_datadic.h @@ -440,6 +440,9 @@ class Field_pack_info */ index_field_skip_t skip_func; + /* Number of field in the table */ + int fieldnr; + bool setup(Field *field); };