Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persist AlterSchema and AlterOptions to data WAL #166

Merged
merged 7 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 68 additions & 18 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Alter schema logic of instance
//! Alter [Schema] and [TableOptions] logic of instance.

use std::{collections::HashMap, sync::Arc};

use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
use tokio::sync::oneshot;
use wal::{
log_batch::{LogWriteBatch, LogWriteEntry},
manager::WriteContext,
};

use crate::{
instance::{
engine::{
AlterDroppedTable, FlushTable, InvalidOptions, InvalidPreVersion, InvalidSchemaVersion,
OperateByWriteWorker, Result, WriteManifest,
OperateByWriteWorker, Result, WriteManifest, WriteWal,
},
flush_compaction::TableFlushOptions,
write_worker,
write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal},
Instance,
},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
payload::WritePayload,
space::SpaceAndTable,
table::data::TableDataRef,
table_options,
Expand Down Expand Up @@ -73,13 +78,16 @@ impl Instance {
// Validate alter schema request.
self.validate_before_alter(table_data, &request)?;

// Now we can persist and update the schema, since this function is called by
// write worker, so there is no other concurrent writer altering the
// schema.

// First trigger a flush before alter schema, to ensure ensure all wal entries
// with old schema are flushed
let opts = TableFlushOptions {
block_on_write_thread: true,
..Default::default()
};
// We are in write thread now and there is no write request being processed, but
// we need to trigger a flush to ensure all wal entries with old schema
// are flushed, so we won't need to handle them during replaying wal.
self.flush_table_in_worker(worker_local, table_data, opts)
.await
.context(FlushTable {
Expand All @@ -88,20 +96,26 @@ impl Instance {
table_id: table_data.id,
})?;

// Now we can persist and update the schema, since this function is called by
// write worker, so there is no other concurrent writer altering the
// schema.
let meta_update = MetaUpdate::AlterSchema(AlterSchemaMeta {
// Build alter op
let manifest_update = AlterSchemaMeta {
space_id: space_table.space().id,
table_id: table_data.id,
schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
});
};

// Write AlterSchema to Data Wal
let alter_schema_pb = manifest_update.clone().into_pb();
let payload = WritePayload::AlterSchema(&alter_schema_pb);
let mut log_batch = LogWriteBatch::new(space_table.table_data().wal_region_id());
log_batch.push(LogWriteEntry { payload: &payload });
let write_ctx = WriteContext::default();
self.space_store
.manifest
.store_update(meta_update)
.wal_manager
.write(&write_ctx, &log_batch)
.await
.context(WriteManifest {
.map_err(|e| Box::new(e) as _)
.context(WriteWal {
space_id: space_table.space().id,
table: &table_data.name,
table_id: table_data.id,
Expand All @@ -112,6 +126,18 @@ impl Instance {
request.schema
);

// Write to Manifest
let update = MetaUpdate::AlterSchema(manifest_update);
self.space_store
.manifest
.store_update(update)
.await
.context(WriteManifest {
space_id: space_table.space().id,
table: &table_data.name,
table_id: table_data.id,
})?;

// Update schema in memory.
table_data.set_schema(request.schema);

Expand Down Expand Up @@ -203,6 +229,9 @@ impl Instance {
}
);

// AlterOptions doesn't need a flush.

// Generate options after alter op
let current_table_options = table_data.table_options();
info!(
"Instance alter options, space_id:{}, tables:{:?}, old_table_opts:{:?}, options:{:?}",
Expand All @@ -220,15 +249,35 @@ impl Instance {
table_id: table_data.id,
})?;
table_opts.sanitize();
let manifest_update = AlterOptionsMeta {
space_id: space_table.space().id,
table_id: table_data.id,
options: table_opts.clone(),
};

// Now we can persist and update the options, since this function is called by
// write worker, so there is no other concurrent writer altering the
// options.
let meta_update = MetaUpdate::AlterOptions(AlterOptionsMeta {
space_id: space_table.space().id,
table_id: table_data.id,
options: table_opts.clone(),
});

// Write AlterOptions to Data Wal
let alter_options_pb = manifest_update.clone().into_pb();
let payload = WritePayload::AlterOption(&alter_options_pb);
let mut log_batch = LogWriteBatch::new(space_table.table_data().wal_region_id());
log_batch.push(LogWriteEntry { payload: &payload });
let write_ctx = WriteContext::default();
self.space_store
.wal_manager
.write(&write_ctx, &log_batch)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteWal {
space_id: space_table.space().id,
table: &table_data.name,
table_id: table_data.id,
})?;

// Write to Manifest
let meta_update = MetaUpdate::AlterOptions(manifest_update);
self.space_store
.manifest
.store_update(meta_update)
Expand All @@ -239,6 +288,7 @@ impl Instance {
table_id: table_data.id,
})?;

// Update memory status
table_data.set_table_options(worker_local, table_opts);
Ok(())
}
Expand Down
25 changes: 23 additions & 2 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
WriteWal {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Invalid options, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
Expand Down Expand Up @@ -171,6 +185,11 @@ pub enum Error {
backtrace
))]
AlterDroppedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

define_result!(Error);
Expand All @@ -187,7 +206,8 @@ impl From<Error> for table_engine::engine::Error {
Error::WriteManifest { .. } => Self::WriteMeta {
source: Box::new(err),
},
Error::InvalidSchemaVersion { .. }
Error::WriteWal { .. }
| Error::InvalidSchemaVersion { .. }
| Error::InvalidPreVersion { .. }
| Error::CreateTableData { .. }
| Error::AlterDroppedTable { .. }
Expand All @@ -196,7 +216,8 @@ impl From<Error> for table_engine::engine::Error {
| Error::ReadWal { .. }
| Error::ApplyMemTable { .. }
| Error::OperateByWriteWorker { .. }
| Error::FlushTable { .. } => Self::Unexpected {
| Error::FlushTable { .. }
| Error::StoreVersionEdit { .. } => Self::Unexpected {
source: Box::new(err),
},
}
Expand Down
6 changes: 6 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ impl Instance {
})?;
}
}
ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => {
// Ignore records except Data.
//
// - DDL (AlterSchema and AlterOptions) should be recovered
// from Manifest on start.
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/meta/meta_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ pub struct AlterSchemaMeta {
}

impl AlterSchemaMeta {
fn into_pb(self) -> meta_pb::AlterSchemaMeta {
pub(crate) fn into_pb(self) -> meta_pb::AlterSchemaMeta {
let mut target = meta_pb::AlterSchemaMeta::new();
target.set_space_id(self.space_id);
target.set_table_id(self.table_id.as_u64());
Expand Down Expand Up @@ -394,7 +394,7 @@ pub struct AlterOptionsMeta {
}

impl AlterOptionsMeta {
fn into_pb(self) -> meta_pb::AlterOptionsMeta {
pub(crate) fn into_pb(self) -> meta_pb::AlterOptionsMeta {
let mut target = meta_pb::AlterOptionsMeta::new();
target.set_space_id(self.space_id);
target.set_table_id(self.table_id.as_u64());
Expand Down
Loading