From 1dbdfd22e2e21320373a53fbef155885fc103d7b Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 16 Feb 2024 13:53:34 +0800 Subject: [PATCH] refactor: refactor SourceExecutor --- .../src/executor/source/executor_core.rs | 13 +-- .../src/executor/source/fs_source_executor.rs | 29 +++--- .../src/executor/source/source_executor.rs | 94 ++++++++----------- 3 files changed, 61 insertions(+), 75 deletions(-) diff --git a/src/stream/src/executor/source/executor_core.rs b/src/stream/src/executor/source/executor_core.rs index 8857654be3cf2..6b3713cc64af1 100644 --- a/src/stream/src/executor/source/executor_core.rs +++ b/src/stream/src/executor/source/executor_core.rs @@ -35,16 +35,17 @@ pub struct StreamSourceCore { /// Split info for stream source. A source executor might read data from several splits of /// external connector. - pub(crate) stream_source_splits: HashMap, + pub(crate) latest_split_info: HashMap, /// Stores information of the splits. pub(crate) split_state_store: SourceStateTableHandler, - /// In-memory cache for the splits. + /// Contains the latests offsets for the splits that are updated *in the current epoch*. + /// It is cleared after each barrier. /// /// Source messages will only write the cache. /// It is read on split change and rebuild stream reader on error. - pub(crate) state_cache: HashMap, + pub(crate) updated_splits_in_epoch: HashMap, } impl StreamSourceCore @@ -63,14 +64,14 @@ where source_name, column_ids, source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), } } pub fn init_split_state(&mut self, splits: Vec) { - self.stream_source_splits = splits + self.latest_split_info = splits .into_iter() .map(|split| (split.id(), split)) .collect(); diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index cbda448712e72..a2478cdb6bb0d 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -149,7 +149,7 @@ impl FsSourceExecutor { let mut target_state: Vec = Vec::new(); let mut no_change_flag = true; for sc in rhs { - if let Some(s) = core.state_cache.get(&sc.id()) { + if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) { let fs = s .as_fs() .unwrap_or_else(|| panic!("split {:?} is not fs", s)); @@ -173,7 +173,7 @@ impl FsSourceExecutor { sc }; - core.state_cache + core.updated_splits_in_epoch .entry(state.id()) .or_insert_with(|| state.clone()); target_state.push(state); @@ -201,7 +201,7 @@ impl FsSourceExecutor { .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); - self.stream_source_core.stream_source_splits = target_state + self.stream_source_core.latest_split_info = target_state .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -215,7 +215,7 @@ impl FsSourceExecutor { ) -> StreamExecutorResult<()> { let core = &mut self.stream_source_core; let incompleted = core - .state_cache + .updated_splits_in_epoch .values() .filter(|split| { let fs = split @@ -227,7 +227,7 @@ impl FsSourceExecutor { .collect_vec(); let completed = core - .state_cache + .updated_splits_in_epoch .values() .filter(|split| { let fs = split @@ -250,7 +250,7 @@ impl FsSourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.state_cache.clear(); + core.updated_splits_in_epoch.clear(); Ok(()) } @@ -439,17 +439,18 @@ impl FsSourceExecutor { let state: Vec<(SplitId, SplitImpl)> = mapping .iter() .flat_map(|(id, offset)| { - let origin_split = - self.stream_source_core.stream_source_splits.get_mut(id); - - origin_split.map(|split| { - split.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((id.clone(), split.clone())) - }) + self.stream_source_core.latest_split_info.get_mut(id).map( + |origin_split| { + origin_split.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((id.clone(), origin_split.clone())) + }, + ) }) .try_collect()?; - self.stream_source_core.state_cache.extend(state); + self.stream_source_core + .updated_splits_in_epoch + .extend(state); } self.metrics diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 07903a2c7b34e..8ad653c5f8397 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -138,6 +138,7 @@ impl SourceExecutor { ] } + /// Returns `target_states` if split changed. Otherwise `None`. async fn apply_split_change( &mut self, source_desc: &SourceDesc, @@ -176,7 +177,9 @@ impl SourceExecutor { Ok(None) } - /// Note: `update_state_if_changed` will modify `state_cache` + /// Returns `target_states` if split changed. Otherwise `None`. + /// + /// Note: `update_state_if_changed` will modify `updated_splits_in_epoch` async fn update_state_if_changed( &mut self, state: ConnectorState, @@ -193,8 +196,9 @@ impl SourceExecutor { let mut split_changed = false; + // Checks added splits for (split_id, split) in &target_splits { - if let Some(s) = core.state_cache.get(split_id) { + if let Some(s) = core.updated_splits_in_epoch.get(split_id) { // existing split, no change, clone from cache target_state.push(s.clone()) } else { @@ -211,7 +215,7 @@ impl SourceExecutor { split.clone() }; - core.state_cache + core.updated_splits_in_epoch .entry(split.id()) .or_insert_with(|| initial_state.clone()); @@ -219,8 +223,8 @@ impl SourceExecutor { } } - // state cache may be stale - for existing_split_id in core.stream_source_splits.keys() { + // Checks dropped splits + for existing_split_id in core.latest_split_info.keys() { if !target_splits.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; @@ -235,7 +239,6 @@ impl SourceExecutor { &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, - split_info: &mut [SplitImpl], e: StreamExecutorError, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); @@ -252,26 +255,8 @@ impl SourceExecutor { self.actor_ctx.id.to_string(), core.source_id.to_string(), ]); - // fetch the newest offset, either it's in cache (before barrier) - // or in state table (just after barrier) - let target_state = if core.state_cache.is_empty() { - for ele in &mut *split_info { - if let Some(recover_state) = core - .split_state_store - .try_recover_from_state_store(ele) - .await? - { - *ele = recover_state; - } - } - split_info.to_owned() - } else { - core.state_cache - .values() - .map(|split_impl| split_impl.to_owned()) - .collect_vec() - }; + let target_state = core.latest_split_info.values().cloned().collect(); self.replace_stream_reader_with_target_state(source_desc, stream, target_state) .await } @@ -301,16 +286,24 @@ impl SourceExecutor { /// - `target_state`: the new split info from barrier. `None` if no split update. /// - `should_trim_state`: whether to trim state for dropped splits. + /// + /// For scaling, the connector splits can be migrated to other actors, but + /// won't be added or removed. Actors should not trim states for splits that + /// are moved to other actors. + /// + /// For source split change, split will not be migrated and we can trim states + /// for deleted splits. async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, + // target_state is Some means split change (or migration) happened. target_state: Option>, should_trim_state: bool, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); let mut cache = core - .state_cache + .updated_splits_in_epoch .values() .map(|split_impl| split_impl.to_owned()) .collect_vec(); @@ -322,7 +315,7 @@ impl SourceExecutor { cache.retain(|split| target_split_ids.contains(&split.id())); let dropped_splits = core - .stream_source_splits + .latest_split_info .extract_if(|split_id, _| !target_split_ids.contains(split_id)) .map(|(_, split)| split) .collect_vec(); @@ -332,7 +325,7 @@ impl SourceExecutor { core.split_state_store.trim_state(&dropped_splits).await?; } - core.stream_source_splits = target_splits + core.latest_split_info = target_splits .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -345,7 +338,7 @@ impl SourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.state_cache.clear(); + core.updated_splits_in_epoch.clear(); Ok(()) } @@ -410,7 +403,6 @@ impl SourceExecutor { _ => {} } } - let mut latest_split_info = boot_state.clone(); core.split_state_store.init_epoch(barrier.epoch); @@ -462,13 +454,8 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { let Ok(msg) = msg else { tokio::time::sleep(Duration::from_millis(1000)).await; - self.rebuild_stream_reader_from_error( - &source_desc, - &mut stream, - &mut latest_split_info, - msg.unwrap_err(), - ) - .await?; + self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err()) + .await?; continue; }; @@ -513,10 +500,6 @@ impl SourceExecutor { } } - if let Some(target_state) = &target_state { - latest_split_info = target_state.clone(); - } - self.persist_state_and_clear_cache(epoch, target_state, should_trim_state) .await?; @@ -572,24 +555,25 @@ impl SourceExecutor { let state: HashMap<_, _> = mapping .iter() .flat_map(|(split_id, offset)| { - let origin_split_impl = self - .stream_source_core + self.stream_source_core .as_mut() .unwrap() - .stream_source_splits - .get_mut(split_id); - - origin_split_impl.map(|split_impl| { - split_impl.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) - }) + .latest_split_info + .get_mut(split_id) + .map(|original_split_impl| { + original_split_impl.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>(( + split_id.clone(), + original_split_impl.clone(), + )) + }) }) .try_collect()?; self.stream_source_core .as_mut() .unwrap() - .state_cache + .updated_splits_in_epoch .extend(state); } metric_row_per_barrier += chunk.cardinality() as u64; @@ -736,9 +720,9 @@ mod tests { source_id: table_id, column_ids, source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), }; @@ -830,9 +814,9 @@ mod tests { source_id: table_id, column_ids: column_ids.clone(), source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), };