Skip to content

Commit e6c8bce

Browse files
committed
refactor(cubestore): Move build_writer_props to CubestoreMetadataCacheFactory
Also, have it take an IdRow<Table>.
1 parent 9ac3a1f commit e6c8bce

File tree

6 files changed

+197
-77
lines changed

6 files changed

+197
-77
lines changed

rust/cubestore/cubestore/src/config/injection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ impl Injector {
135135
.map(|s| s.clone().downcast(s.clone()).unwrap())
136136
}
137137

138+
pub async fn try_drop_service(&self, name: &str) -> bool {
139+
self.services.write().await.remove(name).is_some()
140+
}
141+
142+
pub async fn try_drop_service_typed<T: ?Sized + Send + Sync + 'static>(&self) -> bool {
143+
self.try_drop_service(type_name::<T>()).await
144+
}
145+
138146
pub async fn get_service_typed<T: ?Sized + Send + Sync + 'static>(&self) -> Arc<T> {
139147
self.get_service(type_name::<T>()).await
140148
}

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2007,8 +2007,7 @@ impl Config {
20072007
.register_typed::<dyn ChunkDataStore, _, _, _>(async move |i| {
20082008
let metadata_cache_factory = i
20092009
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2010-
.await
2011-
.cache_factory();
2010+
.await;
20122011
ChunkStore::new(
20132012
i.get_service_typed().await,
20142013
i.get_service_typed().await,
@@ -2025,10 +2024,10 @@ impl Config {
20252024
self.injector
20262025
.register_typed::<dyn CubestoreParquetMetadataCache, _, _, _>(async move |i| {
20272026
let c = i.get_service_typed::<dyn ConfigObj>().await;
2028-
let metadata_cache_factory = i
2027+
let cubestore_metadata_cache_factory = i
20292028
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2030-
.await
2031-
.cache_factory();
2029+
.await;
2030+
let metadata_cache_factory: &_ = cubestore_metadata_cache_factory.cache_factory();
20322031
CubestoreParquetMetadataCacheImpl::new(
20332032
match c.metadata_cache_max_capacity_bytes() {
20342033
0 => metadata_cache_factory.make_noop_cache(),
@@ -2045,8 +2044,7 @@ impl Config {
20452044
.register_typed::<dyn CompactionService, _, _, _>(async move |i| {
20462045
let metadata_cache_factory = i
20472046
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2048-
.await
2049-
.cache_factory();
2047+
.await;
20502048
CompactionServiceImpl::new(
20512049
i.get_service_typed().await,
20522050
i.get_service_typed().await,
@@ -2093,7 +2091,8 @@ impl Config {
20932091
i.get_service_typed().await,
20942092
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
20952093
.await
2096-
.cache_factory(),
2094+
.cache_factory()
2095+
.clone(),
20972096
)
20982097
})
20992098
.await;
@@ -2195,7 +2194,8 @@ impl Config {
21952194
let metadata_cache_factory = i
21962195
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
21972196
.await
2198-
.cache_factory();
2197+
.cache_factory()
2198+
.clone();
21992199
QueryPlannerImpl::new(
22002200
i.get_service_typed().await,
22012201
i.get_service_typed().await,
@@ -2211,7 +2211,8 @@ impl Config {
22112211
QueryExecutorImpl::new(
22122212
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
22132213
.await
2214-
.cache_factory(),
2214+
.cache_factory()
2215+
.clone(),
22152216
i.get_service_typed().await,
22162217
i.get_service_typed().await,
22172218
)

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1660,6 +1660,7 @@ mod tests {
16601660

16611661
use crate::metastore::job::JobType;
16621662
use crate::store::compaction::CompactionService;
1663+
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
16631664
use async_compression::tokio::write::GzipEncoder;
16641665
use cuberockstore::rocksdb::{Options, DB};
16651666
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
@@ -1728,7 +1729,7 @@ mod tests {
17281729
remote_fs.clone(),
17291730
Arc::new(MockCluster::new()),
17301731
config.config_obj(),
1731-
Arc::new(BasicMetadataCacheFactory::new()),
1732+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
17321733
rows_per_chunk,
17331734
);
17341735
let limits = Arc::new(ConcurrencyLimits::new(4));
@@ -1807,7 +1808,7 @@ mod tests {
18071808
remote_fs.clone(),
18081809
Arc::new(MockCluster::new()),
18091810
config.config_obj(),
1810-
Arc::new(BasicMetadataCacheFactory::new()),
1811+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
18111812
rows_per_chunk,
18121813
);
18131814
let limits = Arc::new(ConcurrencyLimits::new(4));
@@ -1917,7 +1918,7 @@ mod tests {
19171918
remote_fs.clone(),
19181919
Arc::new(MockCluster::new()),
19191920
config.config_obj(),
1920-
Arc::new(BasicMetadataCacheFactory::new()),
1921+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
19211922
rows_per_chunk,
19221923
);
19231924
let limits = Arc::new(ConcurrencyLimits::new(4));

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec
1313
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
1414
use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE};
1515
use crate::table::data::{cmp_min_rows, cmp_partition_key};
16-
use crate::table::parquet::{arrow_schema, ParquetTableStore};
16+
use crate::table::parquet::{arrow_schema, CubestoreMetadataCacheFactory, ParquetTableStore};
1717
use crate::table::redistribute::redistribute;
1818
use crate::table::{Row, TableValue};
1919
use crate::util::batch_memory::record_batch_buffer_size;
@@ -75,7 +75,7 @@ pub struct CompactionServiceImpl {
7575
chunk_store: Arc<dyn ChunkDataStore>,
7676
remote_fs: Arc<dyn RemoteFs>,
7777
config: Arc<dyn ConfigObj>,
78-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
78+
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
7979
}
8080

8181
crate::di_service!(CompactionServiceImpl, [CompactionService]);
@@ -86,7 +86,7 @@ impl CompactionServiceImpl {
8686
chunk_store: Arc<dyn ChunkDataStore>,
8787
remote_fs: Arc<dyn RemoteFs>,
8888
config: Arc<dyn ConfigObj>,
89-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
89+
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
9090
) -> Arc<CompactionServiceImpl> {
9191
Arc::new(CompactionServiceImpl {
9292
meta_store,
@@ -658,7 +658,9 @@ impl CompactionService for CompactionServiceImpl {
658658
ROW_GROUP_SIZE,
659659
1,
660660
None,
661-
self.metadata_cache_factory.make_noop_cache(),
661+
self.metadata_cache_factory
662+
.cache_factory()
663+
.make_noop_cache(),
662664
)?);
663665

664666
Arc::new(TraceDataLoadedExec::new(
@@ -680,8 +682,14 @@ impl CompactionService for CompactionServiceImpl {
680682
};
681683
let records =
682684
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?;
683-
let count_and_min =
684-
write_to_files(records, total_rows as usize, store, new_local_files2).await?;
685+
let count_and_min = write_to_files(
686+
records,
687+
total_rows as usize,
688+
store,
689+
&table,
690+
new_local_files2,
691+
)
692+
.await?;
685693

686694
if let Some(c) = &new_chunk {
687695
assert_eq!(new_local_files.len(), 1);
@@ -862,7 +870,12 @@ impl CompactionService for CompactionServiceImpl {
862870
// TODO deactivate corrupt tables
863871
let files = download_files(&partitions, self.remote_fs.clone()).await?;
864872
let keys = find_partition_keys(
865-
keys_with_counts(&files, self.metadata_cache_factory.as_ref(), key_len).await?,
873+
keys_with_counts(
874+
&files,
875+
self.metadata_cache_factory.cache_factory().as_ref(),
876+
key_len,
877+
)
878+
.await?,
866879
key_len,
867880
// TODO should it respect table partition_split_threshold?
868881
self.config.partition_split_threshold() as usize,
@@ -1108,6 +1121,7 @@ pub(crate) async fn write_to_files(
11081121
records: SendableRecordBatchStream,
11091122
num_rows: usize,
11101123
store: ParquetTableStore,
1124+
table_config: &IdRow<Table>,
11111125
files: Vec<String>,
11121126
) -> Result<Vec<(usize, Vec<TableValue>, Vec<TableValue>)>, CubeError> {
11131127
let rows_per_file = div_ceil(num_rows as usize, files.len());
@@ -1165,7 +1179,7 @@ pub(crate) async fn write_to_files(
11651179
};
11661180
};
11671181

1168-
write_to_files_impl(records, store, files, pick_writer).await?;
1182+
write_to_files_impl(records, store, files, table_config, pick_writer).await?;
11691183

11701184
let mut stats = take(stats.lock().unwrap().deref_mut());
11711185
if stats.last().unwrap().0 == 0 {
@@ -1185,10 +1199,11 @@ async fn write_to_files_impl(
11851199
records: SendableRecordBatchStream,
11861200
store: ParquetTableStore,
11871201
files: Vec<String>,
1202+
table_config: &IdRow<Table>,
11881203
mut pick_writer: impl FnMut(&RecordBatch) -> WriteBatchTo,
11891204
) -> Result<(), CubeError> {
11901205
let schema = Arc::new(store.arrow_schema());
1191-
let writer_props = store.writer_props()?;
1206+
let writer_props = store.writer_props(table_config).await?;
11921207
let mut writers = files.into_iter().map(move |f| -> Result<_, CubeError> {
11931208
Ok(ArrowWriter::try_new(
11941209
File::create(f)?,
@@ -1254,6 +1269,7 @@ async fn write_to_files_impl(
12541269
async fn write_to_files_by_keys(
12551270
records: SendableRecordBatchStream,
12561271
store: ParquetTableStore,
1272+
table_config: &IdRow<Table>,
12571273
files: Vec<String>,
12581274
keys: Vec<Row>,
12591275
) -> Result<Vec<usize>, CubeError> {
@@ -1297,7 +1313,7 @@ async fn write_to_files_by_keys(
12971313
panic!("impossible")
12981314
};
12991315
let num_files = files.len();
1300-
write_to_files_impl(records, store, files, pick_writer).await?;
1316+
write_to_files_impl(records, store, files, table_config, pick_writer).await?;
13011317

13021318
let mut row_counts: Vec<usize> = take(row_counts.lock().unwrap().as_mut());
13031319
assert!(
@@ -1418,6 +1434,7 @@ mod tests {
14181434
use crate::remotefs::LocalDirRemoteFs;
14191435
use crate::store::MockChunkDataStore;
14201436
use crate::table::data::rows_to_columns;
1437+
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
14211438
use crate::table::{cmp_same_types, Row, TableValue};
14221439
use cuberockstore::rocksdb::{Options, DB};
14231440
use datafusion::arrow::array::{Int64Array, StringArray};
@@ -1540,7 +1557,7 @@ mod tests {
15401557
Arc::new(chunk_store),
15411558
remote_fs,
15421559
Arc::new(config),
1543-
Arc::new(BasicMetadataCacheFactory::new()),
1560+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
15441561
);
15451562
compaction_service
15461563
.compact(1, DataLoadedSize::new())
@@ -1680,7 +1697,7 @@ mod tests {
16801697
remote_fs.clone(),
16811698
Arc::new(cluster),
16821699
config.config_obj(),
1683-
Arc::new(BasicMetadataCacheFactory::new()),
1700+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
16841701
10,
16851702
);
16861703
metastore
@@ -1768,7 +1785,7 @@ mod tests {
17681785
chunk_store.clone(),
17691786
remote_fs,
17701787
config.config_obj(),
1771-
Arc::new(BasicMetadataCacheFactory::new()),
1788+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
17721789
);
17731790
compaction_service
17741791
.compact_in_memory_chunks(partition.get_id())
@@ -1856,7 +1873,7 @@ mod tests {
18561873
remote_fs.clone(),
18571874
Arc::new(MockCluster::new()),
18581875
config.config_obj(),
1859-
Arc::new(BasicMetadataCacheFactory::new()),
1876+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
18601877
50,
18611878
);
18621879

@@ -1959,7 +1976,7 @@ mod tests {
19591976
chunk_store.clone(),
19601977
remote_fs.clone(),
19611978
config.config_obj(),
1962-
Arc::new(BasicMetadataCacheFactory::new()),
1979+
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
19631980
);
19641981
compaction_service
19651982
.compact(partition.get_id(), DataLoadedSize::new())
@@ -2190,7 +2207,7 @@ mod tests {
21902207
struct MultiSplit {
21912208
meta: Arc<dyn MetaStore>,
21922209
fs: Arc<dyn RemoteFs>,
2193-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
2210+
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
21942211
keys: Vec<Row>,
21952212
key_len: usize,
21962213
multi_partition_id: u64,
@@ -2206,7 +2223,7 @@ impl MultiSplit {
22062223
fn new(
22072224
meta: Arc<dyn MetaStore>,
22082225
fs: Arc<dyn RemoteFs>,
2209-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
2226+
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
22102227
keys: Vec<Row>,
22112228
key_len: usize,
22122229
multi_partition_id: u64,
@@ -2270,6 +2287,10 @@ impl MultiSplit {
22702287
}
22712288
});
22722289

2290+
let table = self
2291+
.meta
2292+
.get_table_by_id(p.index.get_row().table_id())
2293+
.await?;
22732294
let store = ParquetTableStore::new(
22742295
p.index.get_row().clone(),
22752296
ROW_GROUP_SIZE,
@@ -2278,7 +2299,7 @@ impl MultiSplit {
22782299
let records = if !in_files.is_empty() {
22792300
read_files(
22802301
&in_files.into_iter().map(|(f, _)| f).collect::<Vec<_>>(),
2281-
self.metadata_cache_factory.as_ref(),
2302+
self.metadata_cache_factory.cache_factory().as_ref(),
22822303
self.key_len,
22832304
None,
22842305
)
@@ -2290,8 +2311,14 @@ impl MultiSplit {
22902311
.execute(0)
22912312
.await?
22922313
};
2293-
let row_counts =
2294-
write_to_files_by_keys(records, store, out_files.to_vec(), self.keys.clone()).await?;
2314+
let row_counts = write_to_files_by_keys(
2315+
records,
2316+
store,
2317+
&table,
2318+
out_files.to_vec(),
2319+
self.keys.clone(),
2320+
)
2321+
.await?;
22952322

22962323
for i in 0..row_counts.len() {
22972324
mrow_counts[i] += row_counts[i] as u64;

0 commit comments

Comments
 (0)