@@ -13,7 +13,7 @@ use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec
1313use crate :: remotefs:: { ensure_temp_file_is_dropped, RemoteFs } ;
1414use crate :: store:: { min_max_values_from_data, ChunkDataStore , ChunkStore , ROW_GROUP_SIZE } ;
1515use crate :: table:: data:: { cmp_min_rows, cmp_partition_key} ;
16- use crate :: table:: parquet:: { arrow_schema, ParquetTableStore } ;
16+ use crate :: table:: parquet:: { arrow_schema, CubestoreMetadataCacheFactory , ParquetTableStore } ;
1717use crate :: table:: redistribute:: redistribute;
1818use crate :: table:: { Row , TableValue } ;
1919use crate :: util:: batch_memory:: record_batch_buffer_size;
@@ -75,7 +75,7 @@ pub struct CompactionServiceImpl {
7575 chunk_store : Arc < dyn ChunkDataStore > ,
7676 remote_fs : Arc < dyn RemoteFs > ,
7777 config : Arc < dyn ConfigObj > ,
78- metadata_cache_factory : Arc < dyn MetadataCacheFactory > ,
78+ metadata_cache_factory : Arc < dyn CubestoreMetadataCacheFactory > ,
7979}
8080
8181crate :: di_service!( CompactionServiceImpl , [ CompactionService ] ) ;
@@ -86,7 +86,7 @@ impl CompactionServiceImpl {
8686 chunk_store : Arc < dyn ChunkDataStore > ,
8787 remote_fs : Arc < dyn RemoteFs > ,
8888 config : Arc < dyn ConfigObj > ,
89- metadata_cache_factory : Arc < dyn MetadataCacheFactory > ,
89+ metadata_cache_factory : Arc < dyn CubestoreMetadataCacheFactory > ,
9090 ) -> Arc < CompactionServiceImpl > {
9191 Arc :: new ( CompactionServiceImpl {
9292 meta_store,
@@ -658,7 +658,9 @@ impl CompactionService for CompactionServiceImpl {
658658 ROW_GROUP_SIZE ,
659659 1 ,
660660 None ,
661- self . metadata_cache_factory . make_noop_cache ( ) ,
661+ self . metadata_cache_factory
662+ . cache_factory ( )
663+ . make_noop_cache ( ) ,
662664 ) ?) ;
663665
664666 Arc :: new ( TraceDataLoadedExec :: new (
@@ -680,8 +682,14 @@ impl CompactionService for CompactionServiceImpl {
680682 } ;
681683 let records =
682684 merge_chunks ( key_size, main_table, new, unique_key, aggregate_columns) . await ?;
683- let count_and_min =
684- write_to_files ( records, total_rows as usize , store, new_local_files2) . await ?;
685+ let count_and_min = write_to_files (
686+ records,
687+ total_rows as usize ,
688+ store,
689+ & table,
690+ new_local_files2,
691+ )
692+ . await ?;
685693
686694 if let Some ( c) = & new_chunk {
687695 assert_eq ! ( new_local_files. len( ) , 1 ) ;
@@ -862,7 +870,12 @@ impl CompactionService for CompactionServiceImpl {
862870 // TODO deactivate corrupt tables
863871 let files = download_files ( & partitions, self . remote_fs . clone ( ) ) . await ?;
864872 let keys = find_partition_keys (
865- keys_with_counts ( & files, self . metadata_cache_factory . as_ref ( ) , key_len) . await ?,
873+ keys_with_counts (
874+ & files,
875+ self . metadata_cache_factory . cache_factory ( ) . as_ref ( ) ,
876+ key_len,
877+ )
878+ . await ?,
866879 key_len,
867880 // TODO should it respect table partition_split_threshold?
868881 self . config . partition_split_threshold ( ) as usize ,
@@ -1108,6 +1121,7 @@ pub(crate) async fn write_to_files(
11081121 records : SendableRecordBatchStream ,
11091122 num_rows : usize ,
11101123 store : ParquetTableStore ,
1124+ table : & IdRow < Table > ,
11111125 files : Vec < String > ,
11121126) -> Result < Vec < ( usize , Vec < TableValue > , Vec < TableValue > ) > , CubeError > {
11131127 let rows_per_file = div_ceil ( num_rows as usize , files. len ( ) ) ;
@@ -1165,7 +1179,7 @@ pub(crate) async fn write_to_files(
11651179 } ;
11661180 } ;
11671181
1168- write_to_files_impl ( records, store, files, pick_writer) . await ?;
1182+ write_to_files_impl ( records, store, files, table , pick_writer) . await ?;
11691183
11701184 let mut stats = take ( stats. lock ( ) . unwrap ( ) . deref_mut ( ) ) ;
11711185 if stats. last ( ) . unwrap ( ) . 0 == 0 {
@@ -1185,10 +1199,11 @@ async fn write_to_files_impl(
11851199 records : SendableRecordBatchStream ,
11861200 store : ParquetTableStore ,
11871201 files : Vec < String > ,
1202+ table : & IdRow < Table > ,
11881203 mut pick_writer : impl FnMut ( & RecordBatch ) -> WriteBatchTo ,
11891204) -> Result < ( ) , CubeError > {
11901205 let schema = Arc :: new ( store. arrow_schema ( ) ) ;
1191- let writer_props = store. writer_props ( ) ?;
1206+ let writer_props = store. writer_props ( table ) . await ?;
11921207 let mut writers = files. into_iter ( ) . map ( move |f| -> Result < _ , CubeError > {
11931208 Ok ( ArrowWriter :: try_new (
11941209 File :: create ( f) ?,
@@ -1254,6 +1269,7 @@ async fn write_to_files_impl(
12541269async fn write_to_files_by_keys (
12551270 records : SendableRecordBatchStream ,
12561271 store : ParquetTableStore ,
1272+ table : & IdRow < Table > ,
12571273 files : Vec < String > ,
12581274 keys : Vec < Row > ,
12591275) -> Result < Vec < usize > , CubeError > {
@@ -1297,7 +1313,7 @@ async fn write_to_files_by_keys(
12971313 panic ! ( "impossible" )
12981314 } ;
12991315 let num_files = files. len ( ) ;
1300- write_to_files_impl ( records, store, files, pick_writer) . await ?;
1316+ write_to_files_impl ( records, store, files, table , pick_writer) . await ?;
13011317
13021318 let mut row_counts: Vec < usize > = take ( row_counts. lock ( ) . unwrap ( ) . as_mut ( ) ) ;
13031319 assert ! (
@@ -1418,6 +1434,7 @@ mod tests {
14181434 use crate :: remotefs:: LocalDirRemoteFs ;
14191435 use crate :: store:: MockChunkDataStore ;
14201436 use crate :: table:: data:: rows_to_columns;
1437+ use crate :: table:: parquet:: CubestoreMetadataCacheFactoryImpl ;
14211438 use crate :: table:: { cmp_same_types, Row , TableValue } ;
14221439 use cuberockstore:: rocksdb:: { Options , DB } ;
14231440 use datafusion:: arrow:: array:: { Int64Array , StringArray } ;
@@ -1540,7 +1557,7 @@ mod tests {
15401557 Arc :: new ( chunk_store) ,
15411558 remote_fs,
15421559 Arc :: new ( config) ,
1543- Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ,
1560+ CubestoreMetadataCacheFactoryImpl :: new ( Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ) ,
15441561 ) ;
15451562 compaction_service
15461563 . compact ( 1 , DataLoadedSize :: new ( ) )
@@ -1680,7 +1697,7 @@ mod tests {
16801697 remote_fs. clone ( ) ,
16811698 Arc :: new ( cluster) ,
16821699 config. config_obj ( ) ,
1683- Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ,
1700+ CubestoreMetadataCacheFactoryImpl :: new ( Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ) ,
16841701 10 ,
16851702 ) ;
16861703 metastore
@@ -1768,7 +1785,7 @@ mod tests {
17681785 chunk_store. clone ( ) ,
17691786 remote_fs,
17701787 config. config_obj ( ) ,
1771- Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ,
1788+ CubestoreMetadataCacheFactoryImpl :: new ( Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ) ,
17721789 ) ;
17731790 compaction_service
17741791 . compact_in_memory_chunks ( partition. get_id ( ) )
@@ -1856,7 +1873,7 @@ mod tests {
18561873 remote_fs. clone ( ) ,
18571874 Arc :: new ( MockCluster :: new ( ) ) ,
18581875 config. config_obj ( ) ,
1859- Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ,
1876+ CubestoreMetadataCacheFactoryImpl :: new ( Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ) ,
18601877 50 ,
18611878 ) ;
18621879
@@ -1959,7 +1976,7 @@ mod tests {
19591976 chunk_store. clone ( ) ,
19601977 remote_fs. clone ( ) ,
19611978 config. config_obj ( ) ,
1962- Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ,
1979+ CubestoreMetadataCacheFactoryImpl :: new ( Arc :: new ( BasicMetadataCacheFactory :: new ( ) ) ) ,
19631980 ) ;
19641981 compaction_service
19651982 . compact ( partition. get_id ( ) , DataLoadedSize :: new ( ) )
@@ -2190,7 +2207,7 @@ mod tests {
21902207struct MultiSplit {
21912208 meta : Arc < dyn MetaStore > ,
21922209 fs : Arc < dyn RemoteFs > ,
2193- metadata_cache_factory : Arc < dyn MetadataCacheFactory > ,
2210+ metadata_cache_factory : Arc < dyn CubestoreMetadataCacheFactory > ,
21942211 keys : Vec < Row > ,
21952212 key_len : usize ,
21962213 multi_partition_id : u64 ,
@@ -2206,7 +2223,7 @@ impl MultiSplit {
22062223 fn new (
22072224 meta : Arc < dyn MetaStore > ,
22082225 fs : Arc < dyn RemoteFs > ,
2209- metadata_cache_factory : Arc < dyn MetadataCacheFactory > ,
2226+ metadata_cache_factory : Arc < dyn CubestoreMetadataCacheFactory > ,
22102227 keys : Vec < Row > ,
22112228 key_len : usize ,
22122229 multi_partition_id : u64 ,
@@ -2270,6 +2287,10 @@ impl MultiSplit {
22702287 }
22712288 } ) ;
22722289
2290+ let table = self
2291+ . meta
2292+ . get_table_by_id ( p. index . get_row ( ) . table_id ( ) )
2293+ . await ?;
22732294 let store = ParquetTableStore :: new (
22742295 p. index . get_row ( ) . clone ( ) ,
22752296 ROW_GROUP_SIZE ,
@@ -2278,7 +2299,7 @@ impl MultiSplit {
22782299 let records = if !in_files. is_empty ( ) {
22792300 read_files (
22802301 & in_files. into_iter ( ) . map ( |( f, _) | f) . collect :: < Vec < _ > > ( ) ,
2281- self . metadata_cache_factory . as_ref ( ) ,
2302+ self . metadata_cache_factory . cache_factory ( ) . as_ref ( ) ,
22822303 self . key_len ,
22832304 None ,
22842305 )
@@ -2290,8 +2311,14 @@ impl MultiSplit {
22902311 . execute ( 0 )
22912312 . await ?
22922313 } ;
2293- let row_counts =
2294- write_to_files_by_keys ( records, store, out_files. to_vec ( ) , self . keys . clone ( ) ) . await ?;
2314+ let row_counts = write_to_files_by_keys (
2315+ records,
2316+ store,
2317+ & table,
2318+ out_files. to_vec ( ) ,
2319+ self . keys . clone ( ) ,
2320+ )
2321+ . await ?;
22952322
22962323 for i in 0 ..row_counts. len ( ) {
22972324 mrow_counts[ i] += row_counts[ i] as u64 ;
0 commit comments