diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 82745ab8317c..9c1119e05042 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -30,6 +30,7 @@ #include "rdb_rowmods.h" #include "rocksdb/table.h" +#include void dbug_dump_database(rocksdb::DB *db); @@ -44,6 +45,128 @@ handlerton *rocksdb_hton; rocksdb::DB *rdb= NULL; +rocksdb::ColumnFamilyOptions default_cf_opts; + + +/* + We need a column family manager. Its functions: + - create column families (synchronized, don't create the same twice) + - keep count in each column family. + = the count is kept on-disk. + = there are no empty CFs. initially count=1. + = then, when doing DDL, we increase or decrease it. + (atomicity is maintained by being in the same WriteBatch with DDLs) + = if DROP discovers that now count=0, it removes the CF. + + Current state is: + - CFs are created in a synchronized way. We can't remove them, yet. +*/ + +class Column_family_manager +{ + typedef std::map ColumnFamilyHandleMap; + + ColumnFamilyHandleMap cf_map; + + rocksdb::ColumnFamilyHandle *default_cf; + + mysql_mutex_t cfm_mutex; +public: + /* + This is called right after the DB::Open() call. The parameters describe column + families that are present in the database. The first CF is the default CF. + */ + void init(std::vector *names, + std::vector *handles); + void cleanup(); + + /* Used by CREATE TABLE. name=NULL means use default column family */ + rocksdb::ColumnFamilyHandle* get_or_create_cf(const char *name); + + /* Used by table open */ + rocksdb::ColumnFamilyHandle* get_cf(const char *name); + + void drop_cf(); +}; + +Column_family_manager cf_manager; + + +void Column_family_manager::init(std::vector *names, + std::vector *handles) +{ + mysql_mutex_init(NULL, &cfm_mutex, MY_MUTEX_INIT_FAST); + DBUG_ASSERT(names->size() == handles->size()); + DBUG_ASSERT(names->size() > 0); + + default_cf= (*handles)[0]; + for (size_t i = 0; i < names->size(); ++i) + cf_map[(*names)[i]]= (*handles)[i]; +} + + +void Column_family_manager::cleanup() +{ + ColumnFamilyHandleMap::iterator it; + for (it= cf_map.begin(); it!=cf_map.end(); it++) + { + delete it->second; + } + mysql_mutex_destroy(&cfm_mutex); +} + + +rocksdb::ColumnFamilyHandle* +Column_family_manager::get_or_create_cf(const char *name) +{ + rocksdb::ColumnFamilyHandle* cf_handle; + ColumnFamilyHandleMap::iterator it; + + mysql_mutex_lock(&cfm_mutex); + if (name == NULL) + { + cf_handle= default_cf; + } + else if ((it= cf_map.find(name)) != cf_map.end()) + cf_handle= it->second; + else + { + /* Create a Column Family. */ + std::string cf_name(name); + rocksdb::Status s= rdb->CreateColumnFamily(default_cf_opts, name, + &cf_handle); + if (s.ok()) + cf_map[cf_name]= cf_handle; + else + cf_handle= NULL; + } + mysql_mutex_unlock(&cfm_mutex); + + return cf_handle; +} + + +rocksdb::ColumnFamilyHandle* +Column_family_manager::get_cf(const char *name) +{ + rocksdb::ColumnFamilyHandle* cf_handle; + ColumnFamilyHandleMap::iterator it; + + mysql_mutex_lock(&cfm_mutex); + if (name == NULL) + { + cf_handle= default_cf; + } + else if ((it= cf_map.find(name)) != cf_map.end()) + cf_handle= it->second; + else + cf_handle= NULL; + mysql_mutex_unlock(&cfm_mutex); + + return cf_handle; +} + + Table_ddl_manager ddl_manager; LockTable row_locks; @@ -193,7 +316,7 @@ class Primary_key_comparator : public rocksdb::Comparator /* The following is not needed by RocksDB, but conceptually should be here: */ static ulong get_hashnr(const char *key, size_t key_len); - const char* Name() const { return "RocksDB_SE_v2.2"; } + const char* Name() const { return "RocksDB_SE_v3.0"; } //TODO: advanced funcs: // - FindShortestSeparator @@ -313,11 +436,11 @@ class Rdb_transaction { if (iter.is_tombstone()) { - batch.Delete(iter.key()); + batch.Delete(iter.cf_handle(), iter.key()); } else { - batch.Put(iter.key(), iter.value()); + batch.Put(iter.cf_handle(), iter.key(), iter.value()); } } rocksdb::Status s= rdb->Write(rocksdb::WriteOptions(), &batch); @@ -495,20 +618,66 @@ static int rocksdb_init_func(void *p) Primary_key_comparator::get_hashnr); rocksdb_stats= rocksdb::CreateDBStatistics(); - rocksdb::Options main_opts; - main_opts.create_if_missing = true; - main_opts.comparator= &primary_key_comparator; - main_opts.statistics= rocksdb_stats; - main_opts.write_buffer_size= rocksdb_write_buffer_size; - main_opts.target_file_size_base= rocksdb_target_file_size_base; + std::string rocksdb_db_name= "./rocksdb"; + + std::vector cf_names; + + rocksdb::DBOptions db_opts; + db_opts.create_if_missing = true; + db_opts.statistics= rocksdb_stats; + + rocksdb::Status status; + + status= rocksdb::DB::ListColumnFamilies(db_opts, rocksdb_db_name, + &cf_names); + if (!status.ok()) + { + /* + When we start on an empty datadir, ListColumnFamilies returns IOError, + and RocksDB doesn't provide any way to check what kind of error it was. + Checking system errno happens to work right now. + */ + if (status.IsIOError() && errno == ENOENT) + { + sql_print_information("RocksDB: column families not found, starting new"); + } + else + { + std::string err_text= status.ToString(); + sql_print_error("RocksDB: Error listing column families: %s", err_text.c_str()); + DBUG_RETURN(1); + } + } + else + sql_print_information("RocksDB: %ld column families found", cf_names.size()); + + std::vector cf_descr; + std::vector cf_handles; + + default_cf_opts.comparator= &primary_key_comparator; + default_cf_opts.write_buffer_size= rocksdb_write_buffer_size; + default_cf_opts.target_file_size_base= rocksdb_target_file_size_base; rocksdb::BlockBasedTableOptions table_options; table_options.block_cache = rocksdb::NewLRUCache(rocksdb_block_cache_size); - main_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + default_cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); - rocksdb::Status status; - status= rocksdb::DB::Open(main_opts, "./rocksdb", &rdb); + // TODO: is RocksDB's default CF always named "default"? + if (cf_names.size() == 0) + cf_names.push_back("default"); + + for (size_t i = 0; i < cf_names.size(); ++i) + { + cf_descr.push_back(rocksdb::ColumnFamilyDescriptor(cf_names[i], + default_cf_opts)); + } + + rocksdb::Options main_opts(db_opts, default_cf_opts); + status= rocksdb::DB::Open(main_opts, rocksdb_db_name, cf_descr, + &cf_handles, &rdb); + + cf_manager.init(&cf_names, &cf_handles); if (!status.ok()) { @@ -541,6 +710,8 @@ static int rocksdb_done_func(void *p) row_locks.cleanup(); ddl_manager.cleanup(); + cf_manager.cleanup(); + delete rdb; rdb= NULL; @@ -956,7 +1127,15 @@ int ha_rocksdb::open(const char *name, int mode, uint test_if_locked) pk_key_parts= table->key_info[table->s->primary_key].user_defined_key_parts; uint key_len= table->key_info[table->s->primary_key].key_length; + + const char *comment= table->key_info[table->s->primary_key].comment.str; + rocksdb::ColumnFamilyHandle *cf_handle; + if (!(cf_handle= cf_manager.get_cf(comment))) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + pk_descr->setup(table); // move this into get_share() ?? + pk_descr->set_cf_handle(cf_handle); + uint packed_key_len= pk_descr->max_storage_fmt_length(); if (!(pk_tuple= (uchar*)my_malloc(key_len, MYF(0))) || @@ -970,7 +1149,12 @@ int ha_rocksdb::open(const char *name, int mode, uint test_if_locked) if (i == table->s->primary_key) /* Primary key was processed above */ continue; + comment= table->key_info[i].comment.str; + if (!(cf_handle= cf_manager.get_cf(comment))) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + key_descr[i]->setup(table); // move this into get_share() ?? + key_descr[i]->set_cf_handle(cf_handle); uint packed_len= key_descr[i]->max_storage_fmt_length(); if (packed_len > max_packed_sec_key_len) @@ -1083,7 +1267,16 @@ int ha_rocksdb::create_key_defs(TABLE *table_arg, const char *db_table, uint len for (i= 0; i < table_arg->s->keys; i++) { - if (!(key_descr[i]= new RDBSE_KEYDEF(ddl_manager.get_next_number(), i))) + /* + index comment has Column Family name. If there was no comment, we get + NULL, and it means use default column family. + */ + const char *comment= table_arg->key_info[i].comment.str; + rocksdb::ColumnFamilyHandle* cf_handle= cf_manager.get_or_create_cf(comment); + if (!cf_handle) + goto error; + + if (!(key_descr[i]= new RDBSE_KEYDEF(ddl_manager.get_next_number(), i, cf_handle))) goto error; } pk_descr= key_descr[table_arg->s->primary_key]; @@ -1382,7 +1575,7 @@ int ha_rocksdb::index_read_map(uchar *buf, const uchar *key, This will open the iterator and position it at a record that's equal or greater than the lookup tuple. */ - setup_index_scan(&slice); + setup_index_scan(kd->get_cf(), &slice); bool move_forward= true; @@ -1691,7 +1884,8 @@ int ha_rocksdb::get_row_by_rowid(uchar *buf, const char *rowid, uint rowid_size) bool found; if (!trx->changes.Get(key_slice, &retrieved_record, &found)) { - rocksdb::Status s= rdb->Get(options, key_slice, &retrieved_record); + rocksdb::Status s= rdb->Get(options, pk_descr->get_cf(), key_slice, + &retrieved_record); found= !s.IsNotFound(); } @@ -1765,7 +1959,7 @@ int ha_rocksdb::index_first(uchar *buf) key_descr[active_index]->get_infimum_key(key, &key_size); rocksdb::Slice index_key((const char*)key, key_size); - setup_index_scan(&index_key); + setup_index_scan(key_descr[active_index]->get_cf(), &index_key); skip_scan_it_next_call= TRUE; rc= index_next(buf); @@ -1785,7 +1979,7 @@ int ha_rocksdb::index_last(uchar *buf) key_descr[active_index]->get_supremum_key(key, &key_size); rocksdb::Slice index_key((const char*)key, key_size); - setup_index_scan(&index_key); + setup_index_scan(key_descr[active_index]->get_cf(), &index_key); if (!scan_it->Valid()) { @@ -1881,7 +2075,8 @@ int ha_rocksdb::write_row(uchar *buf) bool found; if (!trx->changes.Get(key_slice, &retrieved_record, &found)) { - rocksdb::Status s= rdb->Get(options, key_slice, &retrieved_record); + rocksdb::Status s= rdb->Get(options, pk_descr->get_cf(), key_slice, + &retrieved_record); found= !s.IsNotFound(); } @@ -1913,7 +2108,7 @@ int ha_rocksdb::write_row(uchar *buf) rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, packed_size); rocksdb::Slice value_slice((const char*)sec_key_tails, tail_size); - trx->changes.Put(secondary_key_slice, value_slice); + trx->changes.Put(key_descr[i]->get_cf(), secondary_key_slice, value_slice); trx->n_writes++; } } @@ -1925,7 +2120,7 @@ int ha_rocksdb::write_row(uchar *buf) rocksdb::Slice value_slice; convert_record_to_storage_format(&value_slice); - trx->changes.Put(key_slice, value_slice); + trx->changes.Put(pk_descr->get_cf(), key_slice, value_slice); trx->n_writes++; if (!do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size)) @@ -1943,7 +2138,8 @@ int ha_rocksdb::write_row(uchar *buf) Open a cursor and position it at the passed record */ -void ha_rocksdb::setup_index_scan(rocksdb::Slice *slice) +void ha_rocksdb::setup_index_scan(rocksdb::ColumnFamilyHandle *cf, + rocksdb::Slice *slice) { Rdb_transaction *trx= get_or_create_trx(table->in_use); @@ -1956,7 +2152,7 @@ void ha_rocksdb::setup_index_scan(rocksdb::Slice *slice) rocksdb::ReadOptions options; if (!lock_rows) options.snapshot= trx->snapshot; - rocksdb::Iterator* rocksdb_it= rdb->NewIterator(options); + rocksdb::Iterator* rocksdb_it= rdb->NewIterator(options, cf); scan_it= new Apply_changes_iter; scan_it->init(&trx->changes, rocksdb_it); } @@ -1979,7 +2175,7 @@ int ha_rocksdb::rnd_init(bool scan) pk_descr->get_infimum_key(pk_packed_tuple, &key_size); rocksdb::Slice table_key((const char*)pk_packed_tuple, key_size); - setup_index_scan(&table_key); + setup_index_scan(pk_descr->get_cf(), &table_key); skip_scan_it_next_call= true; } else @@ -2150,7 +2346,7 @@ int ha_rocksdb::delete_row(const uchar *buf) rocksdb::Slice key_slice(last_rowkey.ptr(), last_rowkey.length()); Rdb_transaction *trx= get_or_create_trx(table->in_use); trx->prepare_for_write(); - trx->changes.Delete(key_slice); + trx->changes.Delete(pk_descr->get_cf(), key_slice); DBUG_ASSERT(buf == table->record[0]); // Delete the record for every secondary index @@ -2163,7 +2359,7 @@ int ha_rocksdb::delete_row(const uchar *buf) NULL, NULL); rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, packed_size); - trx->changes.Delete(secondary_key_slice); + trx->changes.Delete(key_descr[i]->get_cf(), secondary_key_slice); } } @@ -2243,7 +2439,6 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) { DBUG_ENTER("ha_rocksdb::update_row"); DBUG_ASSERT(lock_rows); - /* old_data points to record we're updating. It is the same as the record we've just read (for multi-table UPDATE, too, because SQL layer will make @@ -2251,6 +2446,9 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) */ DBUG_ASSERT(new_data == table->record[0]); + //psergey-todo: how come we don't get locks for the 'old_data'? because + // we've already taken them when reading?? + /* Get a rowkey fo the new record */ uint packed_size= pk_descr->pack_record(table, new_data, pk_packed_tuple, NULL, NULL); @@ -2290,7 +2488,7 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) trx->prepare_for_write(); trx->add_lock(lock); - trx->changes.Delete(old_key_slice); + trx->changes.Delete(pk_descr->get_cf(), old_key_slice); } trx->prepare_for_write(); @@ -2311,7 +2509,7 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) NULL, NULL); rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, packed_size); - trx->changes.Delete(secondary_key_slice); + trx->changes.Delete(key_descr[i]->get_cf(), secondary_key_slice); } // Then, Put(). @@ -2322,7 +2520,8 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, packed_size); rocksdb::Slice value_slice((const char*)sec_key_tails, tail_size); - trx->changes.Put(secondary_key_slice, value_slice); + trx->changes.Put(key_descr[i]->get_cf(), secondary_key_slice, + value_slice); } } } @@ -2332,7 +2531,7 @@ int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) /* Write the table record */ rocksdb::Slice value_slice; convert_record_to_storage_format(&value_slice); - trx->changes.Put(key_slice, value_slice); + trx->changes.Put(pk_descr->get_cf(), key_slice, value_slice); DBUG_RETURN(0); } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 99a9ffcfef56..34e74eab622c 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -134,7 +134,8 @@ class ha_rocksdb: public handler int create_key_defs(TABLE *table_arg, const char *db_table, uint len); int secondary_index_read(int keyno, uchar *buf); - void setup_index_scan(rocksdb::Slice *slice); + void setup_index_scan(rocksdb::ColumnFamilyHandle *cf, + rocksdb::Slice *slice); int get_row_by_rowid(uchar *buf, const char *pk_tuple, uint pk_tuple_size); void update_auto_incr_val(); diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc index aedf5a684270..590afe39c9d2 100644 --- a/storage/rocksdb/rdb_datadic.cc +++ b/storage/rocksdb/rdb_datadic.cc @@ -947,9 +947,9 @@ bool Table_ddl_manager::init(rocksdb::DB *rdb_dict) /* We can't fully initialize RDBSE_KEYDEF object here, because full initialization requires that there is an open TABLE* where we could - look at Field* objects and set max_length and other attributes. + look at Field* objects and set max_length and other attributes */ - tdef->key_descr[keyno]= new RDBSE_KEYDEF(index_number, keyno); + tdef->key_descr[keyno]= new RDBSE_KEYDEF(index_number, keyno, NULL); /* Keep track of what was the last index number we saw */ if (max_number < index_number) diff --git a/storage/rocksdb/rdb_datadic.h b/storage/rocksdb/rdb_datadic.h index 50f75b27b8f7..436193bd5f03 100644 --- a/storage/rocksdb/rdb_datadic.h +++ b/storage/rocksdb/rdb_datadic.h @@ -188,8 +188,10 @@ class RDBSE_KEYDEF return maxlength; } - RDBSE_KEYDEF(uint indexnr_arg, uint keyno_arg) : + RDBSE_KEYDEF(uint indexnr_arg, uint keyno_arg, + rocksdb::ColumnFamilyHandle* cf_handle_arg) : index_number(indexnr_arg), + cf_handle(cf_handle_arg), pk_part_no(NULL), pack_info(NULL), keyno(keyno_arg), @@ -206,6 +208,12 @@ class RDBSE_KEYDEF }; void setup(TABLE *table); + void set_cf_handle(rocksdb::ColumnFamilyHandle* cf_handle_arg) + { + cf_handle= cf_handle_arg; + } + + rocksdb::ColumnFamilyHandle *get_cf() { return cf_handle; } private: @@ -214,6 +222,8 @@ class RDBSE_KEYDEF uchar index_number_storage_form[INDEX_NUMBER_SIZE]; + rocksdb::ColumnFamilyHandle* cf_handle; + friend class RDBSE_TABLE_DEF; // for index_number above /* Number of key parts in the primary key*/ diff --git a/storage/rocksdb/rdb_rowmods.cc b/storage/rocksdb/rdb_rowmods.cc index 1d2b79c49616..f680978ae4e5 100644 --- a/storage/rocksdb/rdb_rowmods.cc +++ b/storage/rocksdb/rdb_rowmods.cc @@ -133,8 +133,8 @@ int Row_table::compare_rows(const void* arg, const void *a, const void *b) return res; } - -bool Row_table::Put(rocksdb::Slice& key, rocksdb::Slice& val) +bool Row_table::Put(rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice& key, + rocksdb::Slice& val) { uchar *data = (uchar*)alloc_root(&mem_root, ROW_DATA_SIZE + key.size() + val.size()); @@ -142,6 +142,7 @@ bool Row_table::Put(rocksdb::Slice& key, rocksdb::Slice& val) ROW_DATA *rdata= (ROW_DATA*)data; rdata->key_len= key.size(); rdata->value_len= val.size(); + rdata->cf= cf; rdata->stmt_id= stmt_id; rdata->prev_version= NULL; memcpy(data + ROW_DATA_SIZE, key.data(), key.size()); @@ -174,12 +175,13 @@ bool Row_table::Put(rocksdb::Slice& key, rocksdb::Slice& val) Put a tombstone into the table */ -bool Row_table::Delete(rocksdb::Slice& key) +bool Row_table::Delete(rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice& key) { uchar *data = (uchar*)alloc_root(&mem_root, ROW_DATA_SIZE + key.size()); ROW_DATA *rdata= (ROW_DATA*)data; rdata->key_len= key.size(); rdata->value_len= DATA_IS_TOMBSTONE; + rdata->cf= cf; rdata->stmt_id= stmt_id; rdata->prev_version= NULL; memcpy(data + ROW_DATA_SIZE, key.data(), key.size()); @@ -361,3 +363,11 @@ rocksdb::Slice Row_table_iter::value() return rocksdb::Slice(((char*)row) + ROW_DATA_SIZE + row->key_len, row->value_len); } + + +rocksdb::ColumnFamilyHandle *Row_table_iter::cf_handle() +{ + DBUG_ASSERT(Valid()); + ROW_DATA *row= *row_ptr; + return row->cf; +} diff --git a/storage/rocksdb/rdb_rowmods.h b/storage/rocksdb/rdb_rowmods.h index d1ba4e7ce7a7..f160067ee97e 100644 --- a/storage/rocksdb/rdb_rowmods.h +++ b/storage/rocksdb/rdb_rowmods.h @@ -24,6 +24,8 @@ typedef struct st_row_data /* Can have a special value: DATA_IS_TOMBSTONE */ size_t value_len; + rocksdb::ColumnFamilyHandle *cf; + /* Previous version */ struct st_row_data *prev_version; @@ -84,6 +86,8 @@ class Row_table_iter bool is_tombstone(); rocksdb::Slice key(); rocksdb::Slice value(); + + rocksdb::ColumnFamilyHandle *cf_handle(); }; @@ -115,8 +119,9 @@ class Row_table void reinit(); /* Operations to put a row, or a tombstone */ - bool Put(rocksdb::Slice& key, rocksdb::Slice& val); - bool Delete(rocksdb::Slice& key); + bool Put(rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice& key, + rocksdb::Slice& val); + bool Delete(rocksdb::ColumnFamilyHandle *cf, rocksdb::Slice& key); /* Lookup may find nothing, find row, of find a tombstone */ bool Get(rocksdb::Slice &key, std::string *record, bool *found);