Skip to content

Commit

Permalink
feat: avoid flush when drop table (apache#1257)
Browse files Browse the repository at this point in the history
## Rationale
When drop a table, flush is unnecessary, and this cause the drop take a
long time.

## Detailed Changes
Mark WAL as deleted directly when drop table.

## Test Plan
Manually
  • Loading branch information
jiacai2050 authored Oct 16, 2023
1 parent fdbf38e commit ad70bc1
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 33 deletions.
37 changes: 19 additions & 18 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

//! Drop table logic of instance

use common_types::MAX_SEQUENCE_NUMBER;
use logger::{info, warn};
use snafu::ResultExt;
use table_engine::engine::DropTableRequest;

use crate::{
instance::{
engine::{FlushTable, Result, WriteManifest},
flush_compaction::{Flusher, TableFlushOptions},
engine::{PurgeWal, Result, WriteManifest},
SpaceStoreRef,
},
manifest::meta_edit::{DropTableMeta, MetaEdit, MetaEditRequest, MetaUpdate},
Expand All @@ -31,12 +31,13 @@ use crate::{
pub(crate) struct Dropper {
pub space: SpaceRef,
pub space_store: SpaceStoreRef,

pub flusher: Flusher,
}

impl Dropper {
/// Drop a table under given space
// TODO: Currently we first delete WAL then manifest, if wal is deleted but
// manifest failed to delete, it could cause the table in a unknown state, we
// should find a better way to deal with this.
pub async fn drop(&self, request: DropTableRequest) -> Result<bool> {
info!("Try to drop table, request:{:?}", request);

Expand All @@ -48,8 +49,6 @@ impl Dropper {
}
};

let mut serial_exec = table_data.serial_exec.lock().await;

if table_data.is_dropped() {
warn!(
"Process drop table command tries to drop a dropped table, table:{:?}",
Expand All @@ -58,19 +57,21 @@ impl Dropper {
return Ok(false);
}

// Fixme(xikai): Trigger a force flush so that the data of the table in the wal
// is marked for deletable. However, the overhead of the flushing can
// be avoided.

let opts = TableFlushOptions::default();
let flush_scheduler = serial_exec.flush_scheduler();
self.flusher
.do_flush(flush_scheduler, &table_data, opts)
// Mark table's WAL for deletable, memtable will also get freed automatically
// when table_data is dropped.
let table_location = table_data.table_location();
let wal_location =
crate::instance::create_wal_location(table_location.id, table_location.shard_info);
// Use max to represent delete all WAL.
// TODO: add a method in wal_manager to delete all WAL with same prefix.
let sequence = MAX_SEQUENCE_NUMBER;
self.space_store
.wal_manager
.mark_delete_entries_up_to(wal_location, sequence)
.await
.context(FlushTable {
space_id: self.space.id,
table: &table_data.name,
table_id: table_data.id,
.context(PurgeWal {
wal_location,
sequence,
})?;

// Store the dropping information into meta
Expand Down
15 changes: 13 additions & 2 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::sync::Arc;

use common_types::schema::Version;
use common_types::{schema::Version, SequenceNumber};
use generic_error::GenericError;
use macros::define_result;
use snafu::{Backtrace, OptionExt, Snafu};
Expand Down Expand Up @@ -237,6 +237,17 @@ pub enum Error {
"Try to create a random partition table in overwrite mode, table:{table}.\nBacktrace:\n{backtrace}",
))]
TryCreateRandomPartitionTableInOverwriteMode { table: String, backtrace: Backtrace },

#[snafu(display(
"Failed to purge wal, wal_location:{:?}, sequence:{}",
wal_location,
sequence
))]
PurgeWal {
wal_location: WalLocation,
sequence: SequenceNumber,
source: wal::manager::Error,
},
}

define_result!(Error);
Expand Down Expand Up @@ -273,6 +284,7 @@ impl From<Error> for table_engine::engine::Error {
| Error::TableNotExist { .. }
| Error::OpenTablesOfShard { .. }
| Error::ReplayWalNoCause { .. }
| Error::PurgeWal { .. }
| Error::ReplayWalWithCause { .. } => Self::Unexpected {
source: Box::new(err),
},
Expand Down Expand Up @@ -368,7 +380,6 @@ impl Instance {
let dropper = Dropper {
space,
space_store: self.space_store.clone(),
flusher: self.make_flusher(),
};

dropper.drop(request).await
Expand Down
12 changes: 11 additions & 1 deletion analytic_engine/src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Metrics {
}

/// MemTable implementation based on skiplist
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone> {
pub struct SkiplistMemTable<A> {
/// Schema of this memtable, is immutable.
schema: Schema,
skiplist: Skiplist<BytewiseComparator, A>,
Expand All @@ -62,6 +62,16 @@ pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone> {
max_time: AtomicI64,
}

impl<A> Drop for SkiplistMemTable<A> {
fn drop(&mut self) {
logger::debug!(
"Drop skiplist memtable, last_seq:{}, schema:{:?}",
self.last_sequence.load(atomic::Ordering::Relaxed),
self.schema
);
}
}

impl<A: Arena<Stats = BasicStats> + Clone> SkiplistMemTable<A> {
fn new(
schema: Schema,
Expand Down
4 changes: 2 additions & 2 deletions components/skiplist/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Node {
}
}

struct SkiplistCore<A: Arena<Stats = BasicStats>> {
struct SkiplistCore<A> {
height: AtomicUsize,
head: NonNull<Node>,
arena: A,
Expand All @@ -220,7 +220,7 @@ struct SkiplistCore<A: Arena<Stats = BasicStats>> {
/// FIXME(yingwen): Modify the skiplist to support arena that supports growth,
/// otherwise it is hard to avoid memory usage not out of the arena capacity
#[derive(Clone)]
pub struct Skiplist<C, A: Arena<Stats = BasicStats> + Clone> {
pub struct Skiplist<C, A> {
core: Arc<SkiplistCore<A>>,
c: C,
}
Expand Down
13 changes: 3 additions & 10 deletions wal/src/table_kv_impl/table_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,12 @@ impl TableUnit {
// may be greater than the `actual last sequence of written logs`.
//
// Such as following case:
// + Write wal logs failed(last sequence stored in memory will increase when write failed).
// + Get last sequence from memory(greater then actual last sequence now).
// + Write wal logs failed(last sequence stored in memory will increase when write failed).
// + Get last sequence from memory(greater then actual last sequence now).
// + Mark the got last sequence as flushed sequence.
let actual_next_sequence = sequence + 1;
if actual_next_sequence < start_sequence {
warn!("TableKv WAL found start_sequence greater than actual_next_sequence,
warn!("TableKv WAL found start_sequence greater than actual_next_sequence,
start_sequence:{start_sequence}, actual_next_sequence:{actual_next_sequence}, table_id:{table_id}, region_id:{region_id}");

break;
Expand Down Expand Up @@ -996,13 +996,6 @@ impl TableUnitWriter {
table_unit_state.table_id, sequence_num, table_unit_meta_table
);

ensure!(
sequence_num < common_types::MAX_SEQUENCE_NUMBER,
SequenceOverflow {
table_id: table_unit_state.table_id,
}
);

let last_sequence = table_unit_state.last_sequence();
if sequence_num > last_sequence {
sequence_num = last_sequence;
Expand Down

0 comments on commit ad70bc1

Please sign in to comment.