Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: refactor shard version logic #1286

Merged
merged 8 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.21"
ceresdbproto = "1.0.22"
codec = { path = "components/codec" }
notifier = { path = "components/notifier" }
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand All @@ -114,21 +113,22 @@ generic_error = { path = "components/generic_error" }
hash_ext = { path = "components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
lazy_static = "1.4.0"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
lazy_static = "1.4.0"
logger = { path = "components/logger" }
lru = "0.7.6"
macros = { path = "components/macros" }
message_queue = { path = "components/message_queue" }
meta_client = { path = "meta_client" }
metric_ext = { path = "components/metric_ext" }
notifier = { path = "components/notifier" }
object_store = { path = "components/object_store" }
panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
Expand Down
16 changes: 4 additions & 12 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,12 @@ impl Schema for SchemaImpl {
// Do real create table.
// Partition table is not stored in ShardTableManager.
if request.params.partition_info.is_none() {
let shard =
self.shard_set
.get(request.shard_id)
.with_context(|| schema::CreateTable {
request: request.clone(),
msg: "shard not found".to_string(),
})?;

// TODO: seems unnecessary?
let _ = shard
.find_table(&request.params.schema_name, &request.params.table_name)
let _ = self
.shard_set
.get(request.shard_id)
.with_context(|| schema::CreateTable {
request: request.clone(),
msg: "table not found in shard".to_string(),
msg: "shard not found".to_string(),
})?;
}
let request = request.into_engine_create_request(None, self.schema_id);
Expand Down
154 changes: 94 additions & 60 deletions cluster/src/shard_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use catalog::{
},
table_operator::TableOperator,
};
use common_types::table::ShardVersion;
use generic_error::BoxError;
use logger::info;
use snafu::ResultExt;
Expand Down Expand Up @@ -219,24 +220,16 @@ impl ShardOperator {
Ok(())
}

pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
pub async fn create_table(&self, ctx: CreateTableContext) -> Result<ShardVersion> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator create table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having created table.
{
let mut data = self.data.write().unwrap();
data.try_insert_table(ctx.updated_table_info)
.box_err()
.with_context(|| CreateTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Create the table by operator afterwards.
let (table_engine, partition_info) = match table_info.partition_info.clone() {
Some(v) => (ctx.partition_table_engine.clone(), Some(v)),
Expand Down Expand Up @@ -275,30 +268,37 @@ impl ShardOperator {
})?;

info!(
"ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is created by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name,
);

Ok(())
let latest_version = {
let mut data = self.data.write().unwrap();
data.try_create_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CreateTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?
};

info!(
"ShardOperator create table sequentially finish, shard_id:{}, shard_version:{}, table:{}",
shard_info.id, shard_info.version, table_info.name,
);

Ok(latest_version)
}

pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
pub async fn drop_table(&self, ctx: DropTableContext) -> Result<ShardVersion> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator drop table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having dropped table.
{
let mut data = self.data.write().unwrap();
data.try_remove_table(ctx.updated_table_info)
.box_err()
.with_context(|| DropTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Drop the table by operator afterwards.
let drop_table_request = DropTableRequest {
catalog_name: ctx.catalog,
Expand All @@ -319,31 +319,41 @@ impl ShardOperator {
})?;

info!(
"ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is dropped, shard_id:{}, table:{}",
shard_info.id, table_info.name,
);

Ok(())
// Update the shard info after the table is dropped.
let latest_version = {
let mut data = self.data.write().unwrap();
data.try_drop_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| DropTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?
};

info!(
"ShardOperator drop table sequentially finish, latest_version:{latest_version}, shard_id:{}, old_shard_version:{}, table:{}",
shard_info.id,
shard_info.version,
table_info.name,
);

Ok(latest_version)
}

pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!(
"ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator open table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
shard_info.id,
table_info.name,
);

// FIXME: maybe should insert table from cluster after having opened table.
{
let mut data = self.data.write().unwrap();
data.try_insert_table(ctx.updated_table_info)
.box_err()
.with_context(|| OpenTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

// Open the table by operator afterwards.
// Open the table by operator.
let open_table_request = OpenTableRequest {
catalog_name: ctx.catalog,
schema_name: table_info.schema_name.clone(),
Expand All @@ -366,28 +376,34 @@ impl ShardOperator {
})?;

info!(
"ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
"ShardOperator table is opened by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}

pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
let shard_info = ctx.updated_table_info.shard_info.clone();
let table_info = ctx.updated_table_info.table_info.clone();

info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}");

// FIXME: maybe should remove table from cluster after having closed table.
// Update the shard info after the table is opened.
{
let mut data = self.data.write().unwrap();
data.try_remove_table(ctx.updated_table_info)
data.try_open_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CloseTableWithCause {
.with_context(|| OpenTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

info!(
"ShardOperator open table sequentially finish, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}

pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
let shard_info = &ctx.updated_table_info.shard_info;
let table_info = &ctx.updated_table_info.table_info;

info!("ShardOperator close table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", shard_info.id, table_info.name);

// Close the table by catalog manager afterwards.
let close_table_request = CloseTableRequest {
catalog_name: ctx.catalog,
Expand All @@ -409,7 +425,25 @@ impl ShardOperator {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;

info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}");
info!(
"ShardOperator table is closed by operator, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

// Update the shard info after the table is closed.
{
let mut data = self.data.write().unwrap();
data.try_close_table(ctx.updated_table_info.clone())
.box_err()
.with_context(|| CloseTableWithCause {
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
})?;
}

info!(
"ShardOperator close table sequentially finish, shard_id:{}, table:{}",
shard_info.id, table_info.name
);

Ok(())
}
Expand Down
Loading
Loading