Skip to content

Commit

Permalink
drop column primitive (#210)
Browse files Browse the repository at this point in the history
* drop column primitive

* option driven api

* remove new code, use old one.

* fmt

* test to new naming

* renaming
  • Loading branch information
cheme authored May 15, 2023
1 parent 084ef6a commit b697c77
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 42 deletions.
30 changes: 27 additions & 3 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
compress::Compress,
db::{check::CheckDisplay, Operation, RcValue},
display::hex,
error::{Error, Result},
error::{try_io, Error, Result},
index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
Expand All @@ -20,6 +20,7 @@ use crate::{
};
use std::{
collections::VecDeque,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -94,7 +95,7 @@ pub struct HashColumn {
col: ColId,
tables: RwLock<Tables>,
reindex: RwLock<Reindex>,
path: std::path::PathBuf,
path: PathBuf,
preimage: bool,
uniform_keys: bool,
collect_stats: bool,
Expand Down Expand Up @@ -325,7 +326,7 @@ impl Column {
}

fn open_table(
path: Arc<std::path::PathBuf>,
path: Arc<PathBuf>,
col: ColId,
tier: u8,
options: &ColumnOptions,
Expand All @@ -335,6 +336,29 @@ impl Column {
let entry_size = SIZES.get(tier as usize).cloned();
ValueTable::open(path, id, entry_size, options, db_version)
}

pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
// It is not specified how read_dir behaves when deleting and iterating in the same loop
// We collect a list of paths to be deleted first.
let mut to_delete = Vec::new();
for entry in try_io!(std::fs::read_dir(&path)) {
let entry = try_io!(entry);
if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
if crate::index::TableId::is_file_name(column, file) ||
crate::table::TableId::is_file_name(column, file)
{
to_delete.push(PathBuf::from(file));
}
}
}

for file in to_delete {
let mut path = path.clone();
path.push(file);
try_io!(std::fs::remove_file(path));
}
Ok(())
}
}

impl HashColumn {
Expand Down
138 changes: 127 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ impl DbInner {
Ok(())
}

fn replay_all_logs(&mut self) -> Result<()> {
fn replay_all_logs(&self) -> Result<()> {
while let Some(id) = self.log.replay_next()? {
log::debug!(target: "parity-db", "Replaying database log {}", id);
while self.enact_logs(true)? {}
Expand Down Expand Up @@ -920,7 +920,7 @@ impl Db {

fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
assert!(options.is_valid());
let mut db = DbInner::open(options, opening_mode)?;
let db = DbInner::open(options, opening_mode)?;
// This needs to be call before log thread: so first reindexing
// will run in correct state.
if let Err(e) = db.replay_all_logs() {
Expand Down Expand Up @@ -1124,24 +1124,70 @@ impl Db {
self.inner.stats()
}

/// Add a new column with options specified by `new_column_options`.
pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
// We open the DB before to check metadata validity and make sure there are no pending WAL
// logs.
// We open the DB before to check metadata validity and make sure there are no pending WAL
// logs.
fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
let db = Db::open(options)?;
let salt = db.inner.options.salt;
drop(db);
Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
}

/// Add a new column with options specified by `new_column_options`.
pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;

options.columns.push(new_column_options);
options.write_metadata_with_version(
&options.path,
&salt.expect("`salt` is always `Some` after opening the DB; qed"),
Some(CURRENT_VERSION),
)?;
options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;

Ok(())
}

/// Remove last column from the database.
/// Db must be close when called.
pub fn drop_last_column(options: &mut Options) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;
let nb_column = options.columns.len();
if nb_column == 0 {
return Ok(())
}
let index = options.columns.len() - 1;
Self::remove_column_files(options, index as u8)?;
options.columns.pop();
options.write_metadata(&options.path, &salt)?;
Ok(())
}

/// Truncate a column from the database, optionally changing its options.
/// Db must be close when called.
pub fn reset_column(
options: &mut Options,
index: u8,
new_options: Option<ColumnOptions>,
) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;
Self::remove_column_files(options, index)?;

if let Some(new_options) = new_options {
options.columns[index as usize] = new_options;
options.write_metadata(&options.path, &salt)?;
}

Ok(())
}

fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
if index as usize >= options.columns.len() {
return Err(Error::IncompatibleColumnConfig {
id: index,
reason: "Column not found".to_string(),
})
}

Column::drop_files(index, options.path.clone())?;
Ok(())
}

#[cfg(feature = "instrumentation")]
pub fn process_reindex(&self) -> Result<()> {
self.inner.process_reindex()?;
Expand Down Expand Up @@ -1175,6 +1221,12 @@ impl Db {

impl Drop for Db {
fn drop(&mut self) {
self.drop_inner()
}
}

impl Db {
fn drop_inner(&mut self) {
self.inner.shutdown();
if let Some(t) = self.log_thread.take() {
if let Err(e) = t.join() {
Expand Down Expand Up @@ -2622,4 +2674,68 @@ mod tests {
assert_eq!(db.inner.columns[0].index_bits(), Some(17));
}
}

#[test]
fn test_remove_column() {
let tmp = tempdir().unwrap();
let db_test_file = EnableCommitPipelineStages::DbFile;
let mut options_db_files = db_test_file.options(tmp.path(), 2);
options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
options_std.salt = options_db_files.salt.clone();

let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();

let payload: Vec<(u8, _, _)> = (0u16..100)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();

db_test_file.run_stages(&db);
drop(db);

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
for (col, key, value) in payload.iter() {
assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
}
drop(db);
Db::reset_column(&mut options_db_files, 1, None).unwrap();

let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
for (col, key, _value) in payload.iter() {
assert_eq!(db.get(*col, key).unwrap(), None);
}

let payload: Vec<(u8, _, _)> = (0u16..10)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();

db_test_file.run_stages(&db);
drop(db);

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
let payload: Vec<(u8, _, _)> = (10u16..100)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();
assert!(db.iter(1).is_err());

drop(db);

let mut col_option = options_std.columns[1].clone();
col_option.btree_index = true;
Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
let payload: Vec<(u8, _, _)> = (0u16..10)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();
assert!(db.iter(1).is_ok());
}
}
2 changes: 1 addition & 1 deletion src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ impl Log {
Ok(false)
}

pub fn replay_next(&mut self) -> Result<Option<u32>> {
pub fn replay_next(&self) -> Result<Option<u32>> {
let mut reading = self.reading.write();
{
if let Some(reading) = reading.take() {
Expand Down
29 changes: 2 additions & 27 deletions src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
Error, Result,
};
/// Database migration.
use std::path::{Path, PathBuf};
use std::path::Path;

const COMMIT_SIZE: usize = 10240;
const OVERWRITE_TMP_PATH: &str = "to_revert_overwrite";
Expand Down Expand Up @@ -163,33 +163,8 @@ pub fn clear_column(path: &Path, column: ColId) -> Result<()> {
return Err(Error::Migration("Invalid column index".into()))
}

// Validate the database by opening. This also makes sure all the logs are enacted,
// so that after deleting a column there are no leftover commits that may write to it.
let mut options = Options::with_columns(path, meta.columns.len() as u8);
options.columns = meta.columns;
options.salt = Some(meta.salt);
let _db = Db::open(&options)?;
drop(_db);
crate::column::Column::drop_files(column, path.to_path_buf())?;

// It is not specified how read_dir behaves when deleting and iterating in the same loop
// We collect a list of paths to be deleted first.
let mut to_delete = Vec::new();
for entry in try_io!(std::fs::read_dir(path)) {
let entry = try_io!(entry);
if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
if crate::index::TableId::is_file_name(column, file) ||
crate::table::TableId::is_file_name(column, file)
{
to_delete.push(PathBuf::from(file));
}
}
}

for file in to_delete {
let mut path = path.to_path_buf();
path.push(file);
try_io!(std::fs::remove_file(path));
}
Ok(())
}

Expand Down

0 comments on commit b697c77

Please sign in to comment.