Skip to content

Commit

Permalink
refactor(source): merge all inner reader into one stream (#5611)
Browse files Browse the repository at this point in the history
* merge all inner reader into one stream

Signed-off-by: waruto <wmc314@outlook.com>

* fix typo and remove needless dep

* remove needless code

Signed-off-by: waruto <wmc314@outlook.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
waruto210 and mergify[bot] authored Sep 28, 2022
1 parent e5e314e commit fea0bf9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 141 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ crc32fast = "1"
enum-as-inner = "0.5"
farmhash = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2"
futures-concurrency = "3"
itertools = "0.10"
maplit = "1"
memcomparable = { path = "../utils/memcomparable" }
Expand Down
193 changes: 52 additions & 141 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};

use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures_async_stream::try_stream;
use futures_concurrency::prelude::*;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::error::{internal_error, Result, ToRwResult};
use risingwave_common::error::{internal_error, Result, RwError, ToRwResult};
use risingwave_connector::source::{
Column, ConnectorProperties, ConnectorState, SourceMessage, SplitId, SplitMetaData,
SplitReaderImpl,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;

use crate::common::SourceChunkBuilder;
use crate::monitor::SourceMetrics;
Expand Down Expand Up @@ -58,11 +58,6 @@ struct InnerConnectorSourceReader {
context: SourceContext,
}

struct InnerConnectorSourceReaderHandle {
stop_tx: oneshot::Sender<()>,
join_handle: JoinHandle<()>,
}

/// [`ConnectorSource`] serves as a bridge between external components and streaming or
/// batch processing. [`ConnectorSource`] introduces schema at this level while
/// [`SplitReaderImpl`] simply loads raw content from message queue or file system.
Expand All @@ -73,14 +68,9 @@ pub struct ConnectorSourceReader {
pub parser: Arc<SourceParserImpl>,
pub columns: Vec<SourceColumnDesc>,

handles: Option<HashMap<SplitId, InnerConnectorSourceReaderHandle>>,
message_rx: Receiver<Result<Vec<SourceMessage>>>,
// We need to keep this tx, otherwise the channel will return none with 0 inner readers, and we
// need to clone this tx when adding new inner readers in the future.
message_tx: Sender<Result<Vec<SourceMessage>>>,

metrics: Arc<SourceMetrics>,
context: SourceContext,
// merge all streams of inner reader into one
// TODO: make this static dispatch instead of box
all_reader_stream: BoxStream<'static, Result<Vec<SourceMessage>>>,
}

impl InnerConnectorSourceReader {
Expand Down Expand Up @@ -123,42 +113,16 @@ impl InnerConnectorSourceReader {
})
}

async fn run(
&mut self,
mut stop: oneshot::Receiver<()>,
output: mpsc::Sender<Result<Vec<SourceMessage>>>,
) {
let actor_id = self.context.actor_id.to_string();
let source_id = self.context.source_id.to_string();
let id = match &self.split {
Some(splits) => splits[0].id(),
None => DEFAULT_SPLIT_ID.clone(),
};

async fn next(&mut self) -> Result<Option<Vec<SourceMessage>>> {
loop {
let chunk: anyhow::Result<Option<Vec<SourceMessage>>>;
tokio::select! {
biased;
// stop chan has high priority
_ = stop.borrow_mut() => {
tracing::debug!("connector reader {} stop signal received", id);
break;
}

c = self.reader.next() => {
chunk = c;
}
}
let chunk = self.reader.next().await;

match chunk.map_err(|e| internal_error(e.to_string())) {
Err(e) => {
tracing::error!("connector reader {} error happened {}", id, e.to_string());
output.send(Err(e)).await.ok();
break;
return Err(e);
}
Ok(None) => {
tracing::warn!("connector reader {} stream stopped", id);
break;
return Ok(None);
}
Ok(Some(msg)) => {
if msg.is_empty() {
Expand All @@ -168,23 +132,48 @@ impl InnerConnectorSourceReader {
// DataGen or Nexmark.
tokio::task::consume_budget().await;

self.metrics
.partition_input_count
.with_label_values(&[actor_id.as_str(), source_id.as_str(), &*id])
.inc_by(msg.len() as u64);

output.send(Ok(msg)).await.ok();
return Ok(Some(msg));
}
}
}
}
}

#[try_stream(ok = Vec<SourceMessage>, error = RwError)]
async fn inner_connector_source_reader_into_stream(mut reader: InnerConnectorSourceReader) {
let actor_id = reader.context.actor_id.to_string();
let source_id = reader.context.source_id.to_string();
let id = match &reader.split {
Some(splits) => splits[0].id(),
None => DEFAULT_SPLIT_ID.clone(),
};
loop {
match reader.next().await {
Ok(None) => {
tracing::warn!("connector reader {} stream stopped", id);
break;
}
Ok(Some(msg)) => {
reader
.metrics
.partition_input_count
.with_label_values(&[actor_id.as_str(), source_id.as_str(), &*id])
.inc_by(msg.len() as u64);
yield msg
}
Err(e) => {
tracing::error!("connector reader {} error happened {}", id, e.to_string());
return Err(e);
}
}
}
}

impl SourceChunkBuilder for ConnectorSourceReader {}

impl ConnectorSourceReader {
pub async fn next(&mut self) -> Result<StreamChunkWithState> {
let batch = self.message_rx.recv().await.unwrap()?;
let batch = self.all_reader_stream.next().await.unwrap()?;

let mut split_offset_mapping: HashMap<SplitId, String> = HashMap::new();

Expand Down Expand Up @@ -214,64 +203,6 @@ impl ConnectorSourceReader {
}
}

impl Drop for ConnectorSourceReader {
fn drop(&mut self) {
let handles = self.handles.take().unwrap();

for (_, handle) in handles {
handle.join_handle.abort();
}
}
}

impl ConnectorSourceReader {
pub async fn add_split(&mut self, split: ConnectorState) -> Result<()> {
if let Some(append_splits) = split {
for split in append_splits {
let split_id = split.id();

let mut reader = InnerConnectorSourceReader::new(
self.config.clone(),
Some(vec![split]),
self.columns.clone(),
self.metrics.clone(),
self.context.clone(),
)
.await?;
let (stop_tx, stop_rx) = oneshot::channel();
let sender = self.message_tx.clone();
let join_handle = tokio::spawn(async move { reader.run(stop_rx, sender).await });

if let Some(handles) = self.handles.as_mut() {
handles.insert(
split_id,
InnerConnectorSourceReaderHandle {
stop_tx,
join_handle,
},
);
}
}
}

Ok(())
}

pub async fn drop_split(&mut self, split_id: SplitId) -> Result<()> {
let handle = self
.handles
.as_mut()
.and_then(|handles| handles.remove(&split_id))
.ok_or_else(|| internal_error(format!("could not find split {}", split_id)))
.unwrap();
handle.stop_tx.send(()).unwrap();
handle
.join_handle
.await
.map_err(|e| internal_error(e.to_string()))
}
}

#[derive(Clone, Debug)]
pub struct ConnectorSource {
pub config: ConnectorProperties,
Expand Down Expand Up @@ -306,12 +237,6 @@ impl ConnectorSource {
metrics: Arc<SourceMetrics>,
context: SourceContext,
) -> Result<ConnectorSourceReader> {
let (tx, rx) = mpsc::channel(self.connector_message_buffer_size);
let mut handles = HashMap::with_capacity(if let Some(split) = &splits {
split.len()
} else {
1
});
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;
let source_metrics = metrics.clone();
Expand All @@ -336,33 +261,19 @@ impl ConnectorSource {
}))
.await?;

for mut reader in readers {
let split_id = match &reader.split {
Some(s) => s[0].id(),
None => DEFAULT_SPLIT_ID.clone(),
};
let (stop_tx, stop_rx) = oneshot::channel();
let sender = tx.clone();
let join_handle = tokio::spawn(async move { reader.run(stop_rx, sender).await });

handles.insert(
split_id,
InnerConnectorSourceReaderHandle {
stop_tx,
join_handle,
},
);
}
let streams = readers
.into_iter()
.map(inner_connector_source_reader_into_stream)
.collect::<Vec<_>>()
.merge()
.into_stream()
.boxed();

Ok(ConnectorSourceReader {
config: self.config.clone(),
handles: Some(handles),
message_rx: rx,
parser: self.parser.clone(),
columns,
message_tx: tx,
metrics: metrics.clone(),
context: context.clone(),
all_reader_stream: streams,
})
}
}
1 change: 1 addition & 0 deletions src/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![feature(lint_reasons)]
#![feature(result_option_inspect)]
#![feature(once_cell)]
#![feature(generators)]

use std::collections::HashMap;
use std::fmt::Debug;
Expand Down

0 comments on commit fea0bf9

Please sign in to comment.