From baa3ae59dec65bcf613e7da35576c6225ed0c102 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 28 Feb 2023 00:00:27 +0800 Subject: [PATCH 1/2] feat: add flush method for trait --- src/storage/src/region.rs | 18 ++++++++++++++++++ src/storage/src/region/writer.rs | 16 ++++++++++++++++ src/store-api/src/storage/region.rs | 2 ++ src/table/src/table.rs | 5 +++++ 4 files changed, 41 insertions(+) diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index d91fad2de0c6..ddf456956785 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -125,6 +125,10 @@ impl Region for RegionImpl { async fn close(&self) -> Result<()> { self.inner.close().await } + + async fn flush(&self) -> Result<()> { + self.inner.flush().await + } } /// Storage related config for region. @@ -550,4 +554,18 @@ impl RegionInner { async fn close(&self) -> Result<()> { self.writer.close().await } + + async fn flush(&self) -> Result<()> { + let writer_ctx = WriterContext { + shared: &self.shared, + flush_strategy: &self.flush_strategy, + flush_scheduler: &self.flush_scheduler, + compaction_scheduler: &self.compaction_scheduler, + sst_layer: &self.sst_layer, + wal: &self.wal, + writer: &self.writer, + manifest: &self.manifest, + }; + self.writer.flush(writer_ctx).await + } } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 56af329c040f..64e3e1d46e51 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -260,6 +260,15 @@ impl RegionWriter { Ok(()) } + /// Flush task manually + pub async fn flush(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> { + let mut inner = self.inner.lock().await; + + ensure!(!inner.is_closed(), error::ClosedRegionSnafu); + + inner.manual_flush(writer_ctx).await + } + /// Cancel flush task if any async fn cancel_flush(&self) -> Result<()> { let mut inner = self.inner.lock().await; @@ -680,6 +689,13 @@ impl WriterInner { Some(schedule_compaction_cb) } + async fn manual_flush( + &mut self, + writer_ctx: WriterContext<'_, S>, + ) -> Result<()> { + self.trigger_flush(&writer_ctx).await?; + Ok(()) + } #[inline] fn is_closed(&self) -> bool { self.closed diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 032fbc5c2f57..483443710fd9 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -74,6 +74,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>; async fn close(&self) -> Result<(), Self::Error>; + + async fn flush(&self) -> Result<(), Self::Error>; } /// Context for write operations. diff --git a/src/table/src/table.rs b/src/table/src/table.rs index f440d6b9501d..e555d5c1520c 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -93,6 +93,11 @@ pub trait Table: Send + Sync { } .fail()? } + + /// Flush table. + async fn flush(&self) -> Result<()> { + UnsupportedSnafu { operation: "FLUSH" }.fail()? + } } pub type TableRef = Arc; From 832f74de7dc60d4dd73a056ebd9bc7b3adc3d6bd Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 28 Feb 2023 00:00:57 +0800 Subject: [PATCH 2/2] feat: implement flush for mito table --- src/mito/src/table.rs | 15 +++++++++++++++ src/mito/src/table/test_util/mock_engine.rs | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 1b176bf274ba..9f222c448de9 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -322,6 +322,21 @@ impl Table for MitoTable { } Ok(rows_deleted) } + + async fn flush(&self) -> TableResult<()> { + let futs = self + .regions + .iter() + .map(|(_, region)| region.flush()) + .collect::>(); + + futures::future::try_join_all(futs) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(()) + } } struct ChunkStream { diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 7725a955a928..e6520c72af81 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -196,6 +196,10 @@ impl Region for MockRegion { async fn close(&self) -> Result<()> { Ok(()) } + + async fn flush(&self) -> Result<()> { + unimplemented!() + } } impl MockRegionInner {