Skip to content

Commit

Permalink
add stream_count in snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Nov 25, 2023
1 parent cb04956 commit cd9ef34
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 110 deletions.
50 changes: 45 additions & 5 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use common_storages_stream::stream_table::OPT_KEY_TABLE_ID;
use common_storages_stream::stream_table::OPT_KEY_TABLE_NAME;
use common_storages_stream::stream_table::OPT_KEY_TABLE_VER;
use common_storages_stream::stream_table::STREAM_ENGINE;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use stream_handler::StreamHandler;
use stream_handler::StreamHandlerWrapper;
Expand All @@ -52,8 +53,10 @@ impl StreamHandler for RealStreamHandler {
ctx: Arc<dyn TableContext>,
plan: &CreateStreamPlan,
) -> Result<CreateTableReply> {
let table = ctx
.get_table(&plan.catalog, &plan.table_database, &plan.table_name)
let tenant = plan.tenant.as_str();
let catalog = ctx.get_catalog(&plan.catalog).await?;
let table = catalog
.get_table(tenant, &plan.table_database, &plan.table_name)
.await?;
if !table.change_tracking_enabled() {
return Err(ErrorCode::IllegalStream(format!(
Expand All @@ -66,7 +69,45 @@ impl StreamHandler for RealStreamHandler {
let table_info = fuse_table.get_table_info();
let table_version = table_info.ident.seq;
let table_id = table_info.ident.table_id;
let schema = table_info.schema().clone();

match catalog
.get_table(tenant, &plan.database, &plan.stream_name)
.await
{
Ok(stream) => {
if plan.if_not_exists {
return Ok(CreateTableReply {
table_id: stream.get_id(),
new_table: false,
});
}
return Err(ErrorCode::StreamAlreadyExists(format!(
"'{}.{}' as stream Already Exists",
plan.database, plan.stream_name
)));
}
Err(e) => {
if e.code() != ErrorCode::UNKNOWN_TABLE {
return Err(e);
}

// update table meta.
let previous = fuse_table.read_table_snapshot().await?;
let mut new_snapshot =
TableSnapshot::from_opt_previous(previous, &table_info.meta.schema);
new_snapshot.stream_count += 1;
FuseTable::commit_to_meta_server(
ctx.as_ref(),
table_info,
fuse_table.meta_location_generator(),
new_snapshot,
None,
&None,
fuse_table.get_operator_ref(),
)
.await?;
}
}

let mut options = BTreeMap::new();
match &plan.navigation {
Expand Down Expand Up @@ -121,12 +162,11 @@ impl StreamHandler for RealStreamHandler {
engine: STREAM_ENGINE.to_string(),
options,
comment: plan.comment.clone().unwrap_or("".to_string()),
schema,
schema: table_info.schema(),
..Default::default()
},
};

let catalog = ctx.get_catalog(&plan.catalog).await?;
catalog.create_table(req).await
}

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
&snapshot_0.timestamp,
Some((snapshot_0.snapshot_id, TableSnapshotV2::VERSION)),
schema.as_ref().clone(),
0,
Statistics::default(),
locations,
None,
Expand Down
4 changes: 3 additions & 1 deletion src/query/service/tests/it/storages/fuse/meta/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn default_snapshot() -> TableSnapshot {
let uuid = Uuid::new_v4();
let schema = TableSchema::empty();
let stats = Default::default();
TableSnapshot::new(uuid, &None, None, schema, stats, vec![], None, None)
TableSnapshot::new(uuid, &None, None, schema, 0, stats, vec![], None, None)
}

#[test]
Expand All @@ -45,6 +45,7 @@ fn snapshot_timestamp_monotonic_increase() {
&prev.timestamp,
prev.prev_snapshot_id,
schema,
0,
Default::default(),
vec![],
None,
Expand All @@ -69,6 +70,7 @@ fn snapshot_timestamp_time_skew_tolerance() {
&prev.timestamp,
prev.prev_snapshot_id,
schema,
0,
Default::default(),
vec![],
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ async fn test_commit_to_meta_server() -> Result<()> {
&None,
None,
table.schema().as_ref().clone(),
0,
Statistics::default(),
new_segments,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ async fn test_safety() -> Result<()> {
&None,
None,
schema.as_ref().clone(),
0,
summary,
locations.clone(),
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ async fn test_safety_for_recluster() -> Result<()> {
&None,
None,
schema.as_ref().clone(),
0,
summary,
locations.clone(),
None,
Expand Down
18 changes: 18 additions & 0 deletions src/query/storages/common/table_meta/src/meta/v4/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::io::Cursor;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -71,6 +72,10 @@ pub struct TableSnapshot {
/// For each snapshot, we keep a schema for it (in case of schema evolution)
pub schema: TableSchema,

/// The counter of streams ever created on the table, incremented when create stream.
#[serde(default)]
pub stream_count: u64,

/// Summary Statistics
pub summary: Statistics,

Expand All @@ -91,6 +96,7 @@ impl TableSnapshot {
prev_timestamp: &Option<DateTime<Utc>>,
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
schema: TableSchema,
stream_count: u64,
summary: Statistics,
segments: Vec<Location>,
cluster_key_meta: Option<ClusterKey>,
Expand All @@ -110,6 +116,7 @@ impl TableSnapshot {
timestamp,
prev_snapshot_id,
schema,
stream_count,
summary,
segments,
cluster_key_meta,
Expand All @@ -123,6 +130,7 @@ impl TableSnapshot {
&None,
None,
schema,
0,
Statistics::default(),
vec![],
None,
Expand All @@ -139,13 +147,21 @@ impl TableSnapshot {
&clone.timestamp,
Some((clone.snapshot_id, clone.format_version)),
clone.schema,
clone.stream_count,
clone.summary,
clone.segments,
clone.cluster_key_meta,
clone.table_statistics_location,
)
}

pub fn from_opt_previous(previous: Option<Arc<TableSnapshot>>, schema: &TableSchema) -> Self {
match previous {
Some(prev) => Self::from_previous(&prev),
None => Self::new_empty_snapshot(schema.clone()),
}
}

/// Serializes the struct to a byte vector.
///
/// The byte vector contains the format version, encoding, compression, and compressed data. The encoding
Expand Down Expand Up @@ -217,6 +233,7 @@ impl From<v2::TableSnapshot> for TableSnapshot {
timestamp: s.timestamp,
prev_snapshot_id: s.prev_snapshot_id,
schema: s.schema,
stream_count: 0,
summary: s.summary,
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
Expand All @@ -238,6 +255,7 @@ where T: Into<v3::TableSnapshot>
timestamp: s.timestamp,
prev_snapshot_id: s.prev_snapshot_id,
schema: s.schema.into(),
stream_count: 0,
summary: s.summary.into(),
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
Expand Down
18 changes: 18 additions & 0 deletions src/query/storages/common/table_meta/src/meta/v5/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::io::Cursor;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -71,6 +72,9 @@ pub struct TableSnapshot {
/// For each snapshot, we keep a schema for it (in case of schema evolution)
pub schema: TableSchema,

/// The counter of streams ever created on the table, incremented when create stream.
pub stream_count: u64,

/// Summary Statistics
pub summary: Statistics,

Expand All @@ -91,6 +95,7 @@ impl TableSnapshot {
prev_timestamp: &Option<DateTime<Utc>>,
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
schema: TableSchema,
stream_count: u64,
summary: Statistics,
segments: Vec<Location>,
cluster_key_meta: Option<ClusterKey>,
Expand All @@ -110,6 +115,7 @@ impl TableSnapshot {
timestamp,
prev_snapshot_id,
schema,
stream_count,
summary,
segments,
cluster_key_meta,
Expand All @@ -123,6 +129,7 @@ impl TableSnapshot {
&None,
None,
schema,
0,
Statistics::default(),
vec![],
None,
Expand All @@ -139,13 +146,21 @@ impl TableSnapshot {
&clone.timestamp,
Some((clone.snapshot_id, clone.format_version)),
clone.schema,
clone.stream_count,
clone.summary,
clone.segments,
clone.cluster_key_meta,
clone.table_statistics_location,
)
}

pub fn from_opt_previous(previous: Option<Arc<TableSnapshot>>, schema: &TableSchema) -> Self {
match previous {
Some(prev) => Self::from_previous(&prev),
None => Self::new_empty_snapshot(schema.clone()),
}
}

/// Serializes the struct to a byte vector.
///
/// The byte vector contains the format version, encoding, compression, and compressed data. The encoding
Expand Down Expand Up @@ -218,6 +233,7 @@ impl From<v2::TableSnapshot> for TableSnapshot {
timestamp: s.timestamp,
prev_snapshot_id: s.prev_snapshot_id,
schema: s.schema,
stream_count: 0,
summary: Statistics::from_v2(s.summary),
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
Expand All @@ -239,6 +255,7 @@ where T: Into<v3::TableSnapshot>
timestamp: s.timestamp,
prev_snapshot_id: s.prev_snapshot_id,
schema: s.schema.into(),
stream_count: 0,
summary: s.summary.into(),
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
Expand All @@ -257,6 +274,7 @@ impl From<v4::TableSnapshot> for TableSnapshot {
timestamp: s.timestamp,
prev_snapshot_id: s.prev_snapshot_id,
schema: s.schema,
stream_count: s.stream_count,
summary: Statistics::from_v2(s.summary),
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
Expand Down
Loading

0 comments on commit cd9ef34

Please sign in to comment.