Skip to content

Commit

Permalink
feat: fine tune for background compaction (#11815)
Browse files Browse the repository at this point in the history
* fine tune sqls

* f

* fix

* try fix

* rollback

* neat
  • Loading branch information
ZhiHanZ authored Jun 21, 2023
1 parent 71db052 commit a3b6850
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 20 deletions.
29 changes: 18 additions & 11 deletions src/query/ee/src/background_service/compaction_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,22 @@ ON t.database = d.name
WHERE t.database != 'system'
AND t.database != 'information_schema'
AND t.engine = 'FUSE'
AND t.num_rows > 1 * 1000 * 1000
AND t.data_compressed_size > 1 * 1000 * 1000 * 1000
AND t.num_rows > 1 * 1000 * 100
AND t.table_id NOT IN (
SELECT
table_id
FROM
system.background_tasks
WHERE
state = 'DONE'
AND table_id = t.table_id
AND type = 'COMPACTION'
AND updated_on > t.updated_on
)
;
";

const SEGMENT_SIZE: u64 = 10;
const SEGMENT_SIZE: u64 = 1;
const PER_SEGMENT_BLOCK: u64 = 100;
const PER_BLOCK_SIZE: u64 = 50; // MB

Expand Down Expand Up @@ -115,14 +125,12 @@ pub fn should_continue_compaction(old: &TableStatistics, new: &TableStatistics)
old.number_of_blocks.unwrap() as f64 / old.number_of_segments.unwrap() as f64;
let new_segment_density =
new.number_of_blocks.unwrap() as f64 / new.number_of_segments.unwrap() as f64;
let should_continue_seg_compact = new_segment_density > old_segment_density
|| old.number_of_blocks != new.number_of_blocks
|| old.number_of_segments != new.number_of_segments;
let should_continue_seg_compact = new_segment_density != old_segment_density
&& new_segment_density < PER_SEGMENT_BLOCK as f64;
let old_block_density = old.data_bytes as f64 / old.number_of_blocks.unwrap() as f64;
let new_block_density = new.data_bytes as f64 / new.number_of_blocks.unwrap() as f64;
let should_continue_blk_compact = new_block_density > old_block_density
|| old.data_bytes != new.data_bytes
|| old.number_of_blocks != new.number_of_blocks;
let should_continue_blk_compact = new_block_density != old_block_density
&& new_block_density < PER_BLOCK_SIZE as f64 * 1024.0 * 1024.0;
(should_continue_seg_compact, should_continue_blk_compact)
}

Expand Down Expand Up @@ -543,8 +551,7 @@ impl CompactionJob {
IF(bytes_uncompressed / block_count / 1024 / 1024 < {} and bytes_uncompressed / 1024 / 1024 > 1000, TRUE, FALSE) AS block_advice,
row_count, bytes_uncompressed, bytes_compressed, index_size,
segment_count, block_count,
block_count/segment_count,
humanize_size(bytes_uncompressed / block_count) AS per_block_uncompressed_size_string
block_count/segment_count
from fuse_snapshot('{}', '{}') order by timestamp ASC LIMIT 1;
",
seg_size, avg_seg, avg_blk, database, table
Expand Down
38 changes: 31 additions & 7 deletions src/query/ee/tests/it/background_service/compaction_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn test_get_compaction_advice_sql() -> Result<()> {
);
assert_eq!(
sql.trim(),
"select\n IF(segment_count > 10 and block_count / segment_count < 100, TRUE, FALSE) AS segment_advice,\n IF(bytes_uncompressed / block_count / 1024 / 1024 < 50 and bytes_uncompressed / 1024 / 1024 > 1000, TRUE, FALSE) AS block_advice,\n row_count, bytes_uncompressed, bytes_compressed, index_size,\n segment_count, block_count,\n block_count/segment_count,\n humanize_size(bytes_uncompressed / block_count) AS per_block_uncompressed_size_string\n from fuse_snapshot('db1', 'tbl1') order by timestamp ASC LIMIT 1;"
"select\n IF(segment_count > 10 and block_count / segment_count < 100, TRUE, FALSE) AS segment_advice,\n IF(bytes_uncompressed / block_count / 1024 / 1024 < 50 and bytes_uncompressed / 1024 / 1024 > 1000, TRUE, FALSE) AS block_advice,\n row_count, bytes_uncompressed, bytes_compressed, index_size,\n segment_count, block_count,\n block_count/segment_count\n from fuse_snapshot('db1', 'tbl1') order by timestamp ASC LIMIT 1;"
);
Ok(())
}
Expand Down Expand Up @@ -84,22 +84,46 @@ async fn test_should_continue_compaction() -> Result<()> {
};
let new = TableStatistics {
number_of_blocks: Some(100),
number_of_segments: Some(9),
number_of_segments: Some(1),
..Default::default()
};
assert_eq!(should_continue_compaction(&old, &new), (true, false));
assert_eq!(should_continue_compaction(&old, &new), (false, false));
let old = TableStatistics {
number_of_blocks: Some(100),
number_of_segments: Some(100),
..Default::default()
};
let new = TableStatistics {
number_of_blocks: Some(100),
number_of_segments: Some(90),
..Default::default()
};
assert_eq!(should_continue_compaction(&old, &new), (true, false));
let old = TableStatistics {
number_of_blocks: Some(10000),
number_of_segments: Some(900),
data_bytes: 50 * 50 * 1024 * 1024,
..Default::default()
};
let new = TableStatistics {
number_of_blocks: Some(9000),
number_of_segments: Some(900),
data_bytes: 50 * 50 * 1024 * 1024,
..Default::default()
};
assert_eq!(should_continue_compaction(&old, &new), (true, true));
let old = TableStatistics {
number_of_blocks: Some(1000),
number_of_segments: Some(10),
data_bytes: 100,
data_bytes: 50 * 1001 * 1024 * 1024,
..Default::default()
};
let new = TableStatistics {
number_of_blocks: Some(90),
number_of_blocks: Some(901),
number_of_segments: Some(9),
data_bytes: 100,
data_bytes: 50 * 1001 * 1024 * 1024,
..Default::default()
};
assert_eq!(should_continue_compaction(&old, &new), (true, true));
assert_eq!(should_continue_compaction(&old, &new), (false, false));
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn test_simple_sql() -> Result<()> {
assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
assert_eq!(result.data.len(), 10, "{:?}", result);
assert_eq!(result.schema.len(), 16, "{:?}", result);
assert_eq!(result.schema.len(), 17, "{:?}", result);

// get state
let uri = make_state_uri(query_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'database' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'database' | 'system' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'database' | 'system' | 'tables_with_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'database_id' | 'system' | 'background_tasks' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'database_id' | 'system' | 'databases' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'databases' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'datetime_precision' | 'information_schema' | 'columns' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' |
Expand Down Expand Up @@ -261,6 +262,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'table_catalog' | 'information_schema' | 'views' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'table_collation' | 'information_schema' | 'tables' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' |
| 'table_comment' | 'information_schema' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'table_id' | 'system' | 'background_tasks' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'table_id' | 'system' | 'tables' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'table_id' | 'system' | 'tables_with_history' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'table_name' | 'information_schema' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
Expand All @@ -287,6 +289,9 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'type' | 'system' | 'indexes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'type' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'type' | 'system' | 'settings' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'updated_on' | 'system' | 'background_tasks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' |
| 'updated_on' | 'system' | 'tables' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' |
| 'updated_on' | 'system' | 'tables_with_history' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' |
| 'user' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'vacuum_stats' | 'system' | 'background_tasks' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'value' | 'system' | 'configs' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
Expand Down
14 changes: 14 additions & 0 deletions src/query/storages/system/src/background_tasks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ impl AsyncSystemTable for BackgroundTaskTable {
let mut types = Vec::with_capacity(tasks.len());
let mut stats = Vec::with_capacity(tasks.len());
let mut messages = Vec::with_capacity(tasks.len());
let mut database_ids = Vec::with_capacity(tasks.len());
let mut table_ids = Vec::with_capacity(tasks.len());
let mut compaction_stats = Vec::with_capacity(tasks.len());
let mut vacuum_stats = Vec::with_capacity(tasks.len());
let mut task_run_secs = Vec::with_capacity(tasks.len());
let mut creators = Vec::with_capacity(tasks.len());
let mut create_timestamps = Vec::with_capacity(tasks.len());
let mut update_timestamps = Vec::with_capacity(tasks.len());
for (_, name, task) in tasks {
names.push(name.as_bytes().to_vec());
types.push(task.task_type.to_string().as_bytes().to_vec());
Expand All @@ -82,23 +85,31 @@ impl AsyncSystemTable for BackgroundTaskTable {
);
vacuum_stats.push(task.vacuum_stats.map(|s| s.to_string().as_bytes().to_vec()));
if let Some(compact_stats) = task.compaction_task_stats.as_ref() {
database_ids.push(compact_stats.db_id);
table_ids.push(compact_stats.table_id);
task_run_secs.push(compact_stats.total_compaction_time.map(|s| s.as_secs()));
} else {
database_ids.push(0);
table_ids.push(0);
task_run_secs.push(None);
}
creators.push(task.creator.map(|s| s.to_string().as_bytes().to_vec()));
create_timestamps.push(task.created_at.timestamp_micros());
update_timestamps.push(task.last_updated.unwrap_or_default().timestamp_micros());
}
Ok(DataBlock::new_from_columns(vec![
StringType::from_data(names),
StringType::from_data(types),
StringType::from_data(stats),
StringType::from_data(messages),
NumberType::from_data(database_ids),
NumberType::from_data(table_ids),
StringType::from_opt_data(compaction_stats),
StringType::from_opt_data(vacuum_stats),
NumberType::from_opt_data(task_run_secs),
StringType::from_opt_data(creators),
TimestampType::from_data(create_timestamps),
TimestampType::from_data(update_timestamps),
]))
}
}
Expand All @@ -110,6 +121,8 @@ impl BackgroundTaskTable {
TableField::new("type", TableDataType::String),
TableField::new("state", TableDataType::String),
TableField::new("message", TableDataType::String),
TableField::new("database_id", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("table_id", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("compaction_stats", TableDataType::String),
TableField::new("vacuum_stats", TableDataType::String),
TableField::new(
Expand All @@ -118,6 +131,7 @@ impl BackgroundTaskTable {
),
TableField::new("creator", TableDataType::String),
TableField::new("created_on", TableDataType::Timestamp),
TableField::new("updated_on", TableDataType::Timestamp),
]);

let table_info = TableInfo {
Expand Down
9 changes: 9 additions & 0 deletions src/query/storages/system/src/tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_exception::Result;
use common_expression::types::number::UInt64Type;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::TimestampType;
use common_expression::utils::FromData;
use common_expression::DataBlock;
use common_expression::FromOptData;
Expand Down Expand Up @@ -221,6 +222,12 @@ where TablesTable<T>: HistoryAware
.collect();
let dropped_owns: Vec<Vec<u8>> =
dropped_owns.iter().map(|s| s.as_bytes().to_vec()).collect();

let updated_on = database_tables
.iter()
.map(|v| v.get_table_info().meta.updated_on.timestamp_micros())
.collect::<Vec<_>>();

let cluster_bys: Vec<String> = database_tables
.iter()
.map(|v| {
Expand Down Expand Up @@ -253,6 +260,7 @@ where TablesTable<T>: HistoryAware
StringType::from_data(is_transient),
StringType::from_data(created_owns),
StringType::from_data(dropped_owns),
TimestampType::from_data(updated_on),
UInt64Type::from_opt_data(num_rows),
UInt64Type::from_opt_data(data_size),
UInt64Type::from_opt_data(data_compressed_size),
Expand All @@ -278,6 +286,7 @@ where TablesTable<T>: HistoryAware
TableField::new("is_transient", TableDataType::String),
TableField::new("created_on", TableDataType::String),
TableField::new("dropped_on", TableDataType::String),
TableField::new("updated_on", TableDataType::Timestamp),
TableField::new(
"num_rows",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ insert into showtabstat.t4 values(1)
statement ok
SHOW TABLE STATUS FROM showtabstat WHERE engine = 'test'

statement ok
statement ok
SHOW TABLE STATUS FROM showtabstat WHERE Create_time = 'test'

statement ok
Expand Down

1 comment on commit a3b6850

@vercel
Copy link

@vercel vercel bot commented on a3b6850 Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.