Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor source executor (part 1) #15103

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/stream/src/executor/source/executor_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ pub struct StreamSourceCore<S: StateStore> {

/// Split info for stream source. A source executor might read data from several splits of
/// external connector.
pub(crate) stream_source_splits: HashMap<SplitId, SplitImpl>,
pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,

/// Stores information of the splits.
pub(crate) split_state_store: SourceStateTableHandler<S>,

/// In-memory cache for the splits.
/// Contains the latests offsets for the splits that are updated *in the current epoch*.
/// It is cleared after each barrier.
///
/// Source messages will only write the cache.
/// It is read on split change and rebuild stream reader on error.
pub(crate) state_cache: HashMap<SplitId, SplitImpl>,
pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
}

impl<S> StreamSourceCore<S>
Expand All @@ -63,14 +64,14 @@ where
source_name,
column_ids,
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
}
}

pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
self.stream_source_splits = splits
self.latest_split_info = splits
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand Down
29 changes: 15 additions & 14 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
let mut target_state: Vec<SplitImpl> = Vec::new();
let mut no_change_flag = true;
for sc in rhs {
if let Some(s) = core.state_cache.get(&sc.id()) {
if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) {
let fs = s
.as_fs()
.unwrap_or_else(|| panic!("split {:?} is not fs", s));
Expand All @@ -173,7 +173,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
sc
};

core.state_cache
core.updated_splits_in_epoch
.entry(state.id())
.or_insert_with(|| state.clone());
target_state.push(state);
Expand Down Expand Up @@ -201,7 +201,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
self.stream_source_core.latest_split_info = target_state
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand All @@ -215,7 +215,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
) -> StreamExecutorResult<()> {
let core = &mut self.stream_source_core;
let incompleted = core
.state_cache
.updated_splits_in_epoch
.values()
.filter(|split| {
let fs = split
Expand All @@ -227,7 +227,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.collect_vec();

let completed = core
.state_cache
.updated_splits_in_epoch
.values()
.filter(|split| {
let fs = split
Expand All @@ -250,7 +250,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;

core.state_cache.clear();
core.updated_splits_in_epoch.clear();
Ok(())
}

Expand Down Expand Up @@ -439,17 +439,18 @@ impl<S: StateStore> FsSourceExecutor<S> {
let state: Vec<(SplitId, SplitImpl)> = mapping
.iter()
.flat_map(|(id, offset)| {
let origin_split =
self.stream_source_core.stream_source_splits.get_mut(id);

origin_split.map(|split| {
split.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((id.clone(), split.clone()))
})
self.stream_source_core.latest_split_info.get_mut(id).map(
|origin_split| {
origin_split.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((id.clone(), origin_split.clone()))
},
)
})
.try_collect()?;

self.stream_source_core.state_cache.extend(state);
self.stream_source_core
.updated_splits_in_epoch
.extend(state);
Comment on lines +442 to +453
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can see they are updated in the same way on data chunk. Plan to unify them later.

}

self.metrics
Expand Down
94 changes: 39 additions & 55 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl<S: StateStore> SourceExecutor<S> {
]
}

/// Returns `target_states` if split changed. Otherwise `None`.
async fn apply_split_change<const BIASED: bool>(
&mut self,
source_desc: &SourceDesc,
Expand Down Expand Up @@ -176,7 +177,9 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(None)
}

/// Note: `update_state_if_changed` will modify `state_cache`
/// Returns `target_states` if split changed. Otherwise `None`.
///
/// Note: `update_state_if_changed` will modify `updated_splits_in_epoch`
async fn update_state_if_changed(
&mut self,
state: ConnectorState,
Expand All @@ -193,8 +196,9 @@ impl<S: StateStore> SourceExecutor<S> {

let mut split_changed = false;

// Checks added splits
for (split_id, split) in &target_splits {
if let Some(s) = core.state_cache.get(split_id) {
if let Some(s) = core.updated_splits_in_epoch.get(split_id) {
// existing split, no change, clone from cache
target_state.push(s.clone())
} else {
Expand All @@ -211,16 +215,16 @@ impl<S: StateStore> SourceExecutor<S> {
split.clone()
};

core.state_cache
core.updated_splits_in_epoch
.entry(split.id())
.or_insert_with(|| initial_state.clone());

target_state.push(initial_state);
}
}

// state cache may be stale
for existing_split_id in core.stream_source_splits.keys() {
// Checks dropped splits
for existing_split_id in core.latest_split_info.keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also delete removed items in latest_split_info here because it is seen as the ground truth of split assignment when doing recovery internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is updated later in persist_state_and_clear_cache using target_state.. This is how it works previously. This PR doesn't want to change it. #15104 just changes this.

if !target_splits.contains_key(existing_split_id) {
tracing::info!("split dropping detected: {}", existing_split_id);
split_changed = true;
Expand All @@ -235,7 +239,6 @@ impl<S: StateStore> SourceExecutor<S> {
&mut self,
source_desc: &SourceDesc,
stream: &mut StreamReaderWithPause<BIASED, StreamChunk>,
split_info: &mut [SplitImpl],
e: StreamExecutorError,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
Expand All @@ -252,26 +255,8 @@ impl<S: StateStore> SourceExecutor<S> {
self.actor_ctx.id.to_string(),
core.source_id.to_string(),
]);
// fetch the newest offset, either it's in cache (before barrier)
// or in state table (just after barrier)
let target_state = if core.state_cache.is_empty() {
for ele in &mut *split_info {
if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(ele)
.await?
{
*ele = recover_state;
}
}
split_info.to_owned()
} else {
core.state_cache
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec()
};

Comment on lines -255 to 274
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a bug: When state_cache is some, we shouldn't use it to rebuild stream reader, since it only contains splits updated in this epoch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, this is hard to trigger. In my local testing I found that rebuild_stream_reader_from_error isn't triggered even when external Kafka/PG is killed...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a bug: When state_cache is some, we shouldn't use it to rebuild stream reader, since it only contains splits updated in this epoch.

Source attempts to recover from the latest successful offset so we always recover from cache.
The case you mentioned above occurs when some partitions have no new data in one epoch, which means partitioning imbalance and this can hardly happen in MQs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my local testing I found that rebuild_stream_reader_from_error isn't triggered even when external Kafka/PG is killed...

Kafka SDK handle Kafka broker timeout internally and will keep trying to reconnect. For kinesis, the logic will be triggered when network issue happens. 😇

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when some partitions have no new data in one epoch, which means partitioning imbalance and this can hardly happen in MQs.

I think It's not that impossible. Imagine the source is idle for a while, and then only 1 message is produced. Can also happen if user's key is imbalanced according to their bussiness logic.

let target_state = core.latest_split_info.values().cloned().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some correctness concerns here. Seems you are resetting the process to the offset where the last successful barrier came, taking the time as T0.
But later an error occurs at time T1 and requires rebuilding source internally. The logic here may lead to read data from T0 to T1 twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expected logic here is taking a union of both state_cache and split_info to make sure resetting every assigned split to its latest offset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you are resetting the process to the offset where the last successful barrier came

No. latest_split_info is also updated on every message chunk, so its offset is always up to date.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed very confusing and also why I wanted to use just one.

image.png

self.replace_stream_reader_with_target_state(source_desc, stream, target_state)
.await
}
Expand Down Expand Up @@ -301,16 +286,24 @@ impl<S: StateStore> SourceExecutor<S> {

/// - `target_state`: the new split info from barrier. `None` if no split update.
/// - `should_trim_state`: whether to trim state for dropped splits.
///
/// For scaling, the connector splits can be migrated to other actors, but
/// won't be added or removed. Actors should not trim states for splits that
/// are moved to other actors.
///
/// For source split change, split will not be migrated and we can trim states
/// for deleted splits.
async fn persist_state_and_clear_cache(
&mut self,
epoch: EpochPair,
// target_state is Some means split change (or migration) happened.
target_state: Option<Vec<SplitImpl>>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();

let mut cache = core
.state_cache
.updated_splits_in_epoch
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec();
Expand All @@ -322,7 +315,7 @@ impl<S: StateStore> SourceExecutor<S> {
cache.retain(|split| target_split_ids.contains(&split.id()));

let dropped_splits = core
.stream_source_splits
.latest_split_info
.extract_if(|split_id, _| !target_split_ids.contains(split_id))
.map(|(_, split)| split)
.collect_vec();
Expand All @@ -332,7 +325,7 @@ impl<S: StateStore> SourceExecutor<S> {
core.split_state_store.trim_state(&dropped_splits).await?;
}

core.stream_source_splits = target_splits
core.latest_split_info = target_splits
.into_iter()
.map(|split| (split.id(), split))
.collect();
Expand All @@ -345,7 +338,7 @@ impl<S: StateStore> SourceExecutor<S> {

// commit anyway, even if no message saved
core.split_state_store.state_store.commit(epoch).await?;
core.state_cache.clear();
core.updated_splits_in_epoch.clear();

Ok(())
}
Expand Down Expand Up @@ -410,7 +403,6 @@ impl<S: StateStore> SourceExecutor<S> {
_ => {}
}
}
let mut latest_split_info = boot_state.clone();

core.split_state_store.init_epoch(barrier.epoch);

Expand Down Expand Up @@ -462,13 +454,8 @@ impl<S: StateStore> SourceExecutor<S> {
while let Some(msg) = stream.next().await {
let Ok(msg) = msg else {
tokio::time::sleep(Duration::from_millis(1000)).await;
self.rebuild_stream_reader_from_error(
&source_desc,
&mut stream,
&mut latest_split_info,
msg.unwrap_err(),
)
.await?;
self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())
.await?;
continue;
};

Expand Down Expand Up @@ -513,10 +500,6 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

if let Some(target_state) = &target_state {
latest_split_info = target_state.clone();
}

Comment on lines -516 to -519
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the local variable latest_split_info can be replaced by the field.

self.persist_state_and_clear_cache(epoch, target_state, should_trim_state)
.await?;

Expand Down Expand Up @@ -572,24 +555,25 @@ impl<S: StateStore> SourceExecutor<S> {
let state: HashMap<_, _> = mapping
.iter()
.flat_map(|(split_id, offset)| {
let origin_split_impl = self
.stream_source_core
self.stream_source_core
.as_mut()
.unwrap()
.stream_source_splits
.get_mut(split_id);

origin_split_impl.map(|split_impl| {
split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone()))
})
.latest_split_info
.get_mut(split_id)
.map(|original_split_impl| {
original_split_impl.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((
split_id.clone(),
original_split_impl.clone(),
))
})
})
.try_collect()?;

self.stream_source_core
.as_mut()
.unwrap()
.state_cache
.updated_splits_in_epoch
.extend(state);
}
metric_row_per_barrier += chunk.cardinality() as u64;
Expand Down Expand Up @@ -736,9 +720,9 @@ mod tests {
source_id: table_id,
column_ids,
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

Expand Down Expand Up @@ -830,9 +814,9 @@ mod tests {
source_id: table_id,
column_ids: column_ids.clone(),
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
latest_split_info: HashMap::new(),
split_state_store,
state_cache: HashMap::new(),
updated_splits_in_epoch: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

Expand Down
Loading