Skip to content

Commit

Permalink
refactor: refactor source executor (part 1) (#15103)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Feb 19, 2024
1 parent 8323b17 commit 3ce0996
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 75 deletions.
13 changes: 7 additions & 6 deletions src/stream/src/executor/source/executor_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ pub struct StreamSourceCore<S: StateStore> {

/// Split info for stream source. A source executor might read data from several splits of
/// external connector.
pub(crate) stream_source_splits: HashMap<SplitId, SplitImpl>,
pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,

/// Stores information of the splits.
pub(crate) split_state_store: SourceStateTableHandler<S>,

/// In-memory cache for the splits.
/// Contains the latests offsets for the splits that are updated *in the current epoch*.
/// It is cleared after each barrier.
///
/// Source messages will only write the cache.
/// It is read on split change and rebuild stream reader on error.
pub(crate) state_cache: HashMap<SplitId, SplitImpl>,
pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
}

impl<S> StreamSourceCore<S>
Expand All @@ -63,14 +64,14 @@ where
source_name,
column_ids,
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
}
}

pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
self.stream_source_splits = splits
self.latest_split_info = splits
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand Down
29 changes: 15 additions & 14 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
let mut target_state: Vec<SplitImpl> = Vec::new();
let mut no_change_flag = true;
for sc in rhs {
if let Some(s) = core.state_cache.get(&sc.id()) {
if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) {
let fs = s
.as_fs()
.unwrap_or_else(|| panic!("split {:?} is not fs", s));
Expand All @@ -173,7 +173,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
sc
};

core.state_cache
core.updated_splits_in_epoch
.entry(state.id())
.or_insert_with(|| state.clone());
target_state.push(state);
Expand Down Expand Up @@ -201,7 +201,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
self.stream_source_core.latest_split_info = target_state
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand All @@ -215,7 +215,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
) -> StreamExecutorResult<()> {
let core = &mut self.stream_source_core;
let incompleted = core
.state_cache
.updated_splits_in_epoch
.values()
.filter(|split| {
let fs = split
Expand All @@ -227,7 +227,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.collect_vec();

let completed = core
.state_cache
.updated_splits_in_epoch
.values()
.filter(|split| {
let fs = split
Expand All @@ -250,7 +250,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;

core.state_cache.clear();
core.updated_splits_in_epoch.clear();
Ok(())
}

Expand Down Expand Up @@ -439,17 +439,18 @@ impl<S: StateStore> FsSourceExecutor<S> {
let state: Vec<(SplitId, SplitImpl)> = mapping
.iter()
.flat_map(|(id, offset)| {
let origin_split =
self.stream_source_core.stream_source_splits.get_mut(id);

origin_split.map(|split| {
split.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((id.clone(), split.clone()))
})
self.stream_source_core.latest_split_info.get_mut(id).map(
|origin_split| {
origin_split.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((id.clone(), origin_split.clone()))
},
)
})
.try_collect()?;

self.stream_source_core.state_cache.extend(state);
self.stream_source_core
.updated_splits_in_epoch
.extend(state);
}

self.metrics
Expand Down
94 changes: 39 additions & 55 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl<S: StateStore> SourceExecutor<S> {
]
}

/// Returns `target_states` if split changed. Otherwise `None`.
async fn apply_split_change<const BIASED: bool>(
&mut self,
source_desc: &SourceDesc,
Expand Down Expand Up @@ -176,7 +177,9 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(None)
}

/// Note: `update_state_if_changed` will modify `state_cache`
/// Returns `target_states` if split changed. Otherwise `None`.
///
/// Note: `update_state_if_changed` will modify `updated_splits_in_epoch`
async fn update_state_if_changed(
&mut self,
state: ConnectorState,
Expand All @@ -193,8 +196,9 @@ impl<S: StateStore> SourceExecutor<S> {

let mut split_changed = false;

// Checks added splits
for (split_id, split) in &target_splits {
if let Some(s) = core.state_cache.get(split_id) {
if let Some(s) = core.updated_splits_in_epoch.get(split_id) {
// existing split, no change, clone from cache
target_state.push(s.clone())
} else {
Expand All @@ -211,16 +215,16 @@ impl<S: StateStore> SourceExecutor<S> {
split.clone()
};

core.state_cache
core.updated_splits_in_epoch
.entry(split.id())
.or_insert_with(|| initial_state.clone());

target_state.push(initial_state);
}
}

// state cache may be stale
for existing_split_id in core.stream_source_splits.keys() {
// Checks dropped splits
for existing_split_id in core.latest_split_info.keys() {
if !target_splits.contains_key(existing_split_id) {
tracing::info!("split dropping detected: {}", existing_split_id);
split_changed = true;
Expand All @@ -235,7 +239,6 @@ impl<S: StateStore> SourceExecutor<S> {
&mut self,
source_desc: &SourceDesc,
stream: &mut StreamReaderWithPause<BIASED, StreamChunk>,
split_info: &mut [SplitImpl],
e: StreamExecutorError,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
Expand All @@ -252,26 +255,8 @@ impl<S: StateStore> SourceExecutor<S> {
self.actor_ctx.id.to_string(),
core.source_id.to_string(),
]);
// fetch the newest offset, either it's in cache (before barrier)
// or in state table (just after barrier)
let target_state = if core.state_cache.is_empty() {
for ele in &mut *split_info {
if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(ele)
.await?
{
*ele = recover_state;
}
}
split_info.to_owned()
} else {
core.state_cache
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec()
};

let target_state = core.latest_split_info.values().cloned().collect();
self.replace_stream_reader_with_target_state(source_desc, stream, target_state)
.await
}
Expand Down Expand Up @@ -301,16 +286,24 @@ impl<S: StateStore> SourceExecutor<S> {

/// - `target_state`: the new split info from barrier. `None` if no split update.
/// - `should_trim_state`: whether to trim state for dropped splits.
///
/// For scaling, the connector splits can be migrated to other actors, but
/// won't be added or removed. Actors should not trim states for splits that
/// are moved to other actors.
///
/// For source split change, split will not be migrated and we can trim states
/// for deleted splits.
async fn persist_state_and_clear_cache(
&mut self,
epoch: EpochPair,
// target_state is Some means split change (or migration) happened.
target_state: Option<Vec<SplitImpl>>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();

let mut cache = core
.state_cache
.updated_splits_in_epoch
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec();
Expand All @@ -322,7 +315,7 @@ impl<S: StateStore> SourceExecutor<S> {
cache.retain(|split| target_split_ids.contains(&split.id()));

let dropped_splits = core
.stream_source_splits
.latest_split_info
.extract_if(|split_id, _| !target_split_ids.contains(split_id))
.map(|(_, split)| split)
.collect_vec();
Expand All @@ -332,7 +325,7 @@ impl<S: StateStore> SourceExecutor<S> {
core.split_state_store.trim_state(&dropped_splits).await?;
}

core.stream_source_splits = target_splits
core.latest_split_info = target_splits
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand All @@ -345,7 +338,7 @@ impl<S: StateStore> SourceExecutor<S> {

// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;
core.state_cache.clear();
core.updated_splits_in_epoch.clear();

Ok(())
}
Expand Down Expand Up @@ -410,7 +403,6 @@ impl<S: StateStore> SourceExecutor<S> {
_ => {}
}
}
let mut latest_split_info = boot_state.clone();

core.split_state_store.init_epoch(barrier.epoch);

Expand Down Expand Up @@ -462,13 +454,8 @@ impl<S: StateStore> SourceExecutor<S> {
while let Some(msg) = stream.next().await {
let Ok(msg) = msg else {
tokio::time::sleep(Duration::from_millis(1000)).await;
self.rebuild_stream_reader_from_error(
&source_desc,
&mut stream,
&mut latest_split_info,
msg.unwrap_err(),
)
.await?;
self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())
.await?;
continue;
};

Expand Down Expand Up @@ -513,10 +500,6 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

if let Some(target_state) = &target_state {
latest_split_info = target_state.clone();
}

self.persist_state_and_clear_cache(epoch, target_state, should_trim_state)
.await?;

Expand Down Expand Up @@ -572,24 +555,25 @@ impl<S: StateStore> SourceExecutor<S> {
let state: HashMap<_, _> = mapping
.iter()
.flat_map(|(split_id, offset)| {
let origin_split_impl = self
.stream_source_core
self.stream_source_core
.as_mut()
.unwrap()
.stream_source_splits
.get_mut(split_id);

origin_split_impl.map(|split_impl| {
split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone()))
})
.latest_split_info
.get_mut(split_id)
.map(|original_split_impl| {
original_split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((
split_id.clone(),
original_split_impl.clone(),
))
})
})
.try_collect()?;

self.stream_source_core
.as_mut()
.unwrap()
.state_cache
.updated_splits_in_epoch
.extend(state);
}
metric_row_per_barrier += chunk.cardinality() as u64;
Expand Down Expand Up @@ -736,9 +720,9 @@ mod tests {
source_id: table_id,
column_ids,
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

Expand Down Expand Up @@ -830,9 +814,9 @@ mod tests {
source_id: table_id,
column_ids: column_ids.clone(),
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

Expand Down

0 comments on commit 3ce0996

Please sign in to comment.