Skip to content

Commit

Permalink
feat: create connector source worker before the txn write of catalog (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Oct 12, 2024
1 parent 798896f commit f505359
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 98 deletions.
18 changes: 5 additions & 13 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ use crate::controller::utils::{
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::rpc::ddl_controller::DropMode;
use crate::stream::SourceManagerRef;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -1161,8 +1160,7 @@ impl CatalogController {
pub async fn create_source(
&self,
mut pb_source: PbSource,
source_manager_ref: Option<SourceManagerRef>,
) -> MetaResult<NotificationVersion> {
) -> MetaResult<(SourceId, NotificationVersion)> {
let inner = self.inner.write().await;
let owner_id = pb_source.owner as _;
let txn = inner.db.begin().await?;
Expand All @@ -1185,17 +1183,11 @@ impl CatalogController {
Some(pb_source.schema_id as _),
)
.await?;
pb_source.id = source_obj.oid as _;
let source_id = source_obj.oid;
pb_source.id = source_id as _;
let source: source::ActiveModel = pb_source.clone().into();
Source::insert(source).exec(&txn).await?;

if let Some(src_manager) = source_manager_ref {
let ret = src_manager.register_source(&pb_source).await;
if let Err(e) = ret {
txn.rollback().await?;
return Err(e);
}
}
txn.commit().await?;

let version = self
Expand All @@ -1204,7 +1196,7 @@ impl CatalogController {
PbRelationInfo::Source(pb_source),
)
.await;
Ok(version)
Ok((source_id, version))
}

pub async fn create_function(
Expand Down Expand Up @@ -3614,7 +3606,7 @@ mod tests {
.to_string(),
..Default::default()
};
mgr.create_source(pb_source, None).await?;
mgr.create_source(pb_source).await?;
let source_id: SourceId = Source::find()
.select_only()
.column(source::Column::SourceId)
Expand Down
40 changes: 21 additions & 19 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ use crate::manager::{
use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::stream::{
validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
ReplaceTableContext, SourceManagerRef, StreamFragmentGraph,
create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder,
CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, StreamFragmentGraph,
};
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -477,34 +477,36 @@ impl DdlController {
&self,
mut source: Source,
) -> MetaResult<NotificationVersion> {
match &self.metadata_manager {
let handle = create_source_worker_handle(&source, self.source_manager.metrics.clone())
.await
.context("failed to create source worker")?;

let (source_id, version) = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
source.id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
source.id = source_id;
// set the initialized_at_epoch to the current epoch.
source.initialized_at_epoch = Some(Epoch::now().0);
source.initialized_at_cluster_version = Some(current_cluster_version());

mgr.catalog_manager
.start_create_source_procedure(&source)
.await?;

if let Err(e) = self.source_manager.register_source(&source).await {
mgr.catalog_manager
.cancel_create_source_procedure(&source)
.await?;
return Err(e);
}

mgr.catalog_manager
let version = mgr
.catalog_manager
.finish_create_source_procedure(source, vec![])
.await
.await?;
(source_id, version)
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.create_source(source, Some(self.source_manager.clone()))
.await
let (source_id, version) = mgr.catalog_controller.create_source(source).await?;
(source_id as _, version)
}
}
};
self.source_manager
.register_source_with_handle(source_id, handle)
.await;
Ok(version)
}

async fn drop_source(
Expand Down
139 changes: 73 additions & 66 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ pub struct SourceManager {
pub paused: Mutex<()>,
barrier_scheduler: BarrierScheduler,
core: Mutex<SourceManagerCore>,
metrics: Arc<MetaMetrics>,
pub metrics: Arc<MetaMetrics>,
}

const MAX_FAIL_CNT: u32 = 10;
const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);

struct SharedSplitMap {
splits: Option<BTreeMap<SplitId, SplitImpl>>,
Expand Down Expand Up @@ -95,6 +96,56 @@ fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorPro
Ok(properties)
}

/// Used to create a new `ConnectorSourceWorkerHandle` for a new source.
///
/// It will call `ConnectorSourceWorker::tick()` to fetch split metadata once before returning.
pub async fn create_source_worker_handle(
source: &Source,
metrics: Arc<MetaMetrics>,
) -> MetaResult<ConnectorSourceWorkerHandle> {
tracing::info!("spawning new watcher for source {}", source.id);

let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();

let connector_properties = extract_prop_from_new_source(source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = dispatch_source_prop!(connector_properties, prop, {
let mut worker = ConnectorSourceWorker::create(
source,
*prop,
DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
current_splits_ref.clone(),
metrics,
)
.await?;

// if fail to fetch meta info, will refuse to create source

// todo: make the timeout configurable, longer than `properties.sync.call.timeout`
// in kafka
tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick())
.await
.ok()
.with_context(|| {
format!(
"failed to fetch meta info for source {}, timeout {:?}",
source.id, DEFAULT_SOURCE_TICK_TIMEOUT
)
})??;

tokio::spawn(async move { worker.run(sync_call_rx).await })
});

Ok(ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits,
enable_scale_in,
})
}

const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);

impl<P: SourceProperties> ConnectorSourceWorker<P> {
Expand Down Expand Up @@ -207,7 +258,7 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
}

/// Handle for a running [`ConnectorSourceWorker`].
struct ConnectorSourceWorkerHandle {
pub struct ConnectorSourceWorkerHandle {
handle: JoinHandle<()>,
sync_call_tx: UnboundedSender<oneshot::Sender<MetaResult<()>>>,
splits: SharedSplitMapRef,
Expand Down Expand Up @@ -625,7 +676,6 @@ fn align_backfill_splits(

impl SourceManager {
const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);

pub async fn new(
barrier_scheduler: BarrierScheduler,
Expand Down Expand Up @@ -969,19 +1019,34 @@ impl SourceManager {
Ok(assigned)
}

/// register connector worker for source.
/// create and register connector worker for source.
pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
let mut core = self.core.lock().await;
if core.managed_sources.contains_key(&source.get_id()) {
tracing::warn!("source {} already registered", source.get_id());
} else {
Self::create_source_worker(source, &mut core.managed_sources, self.metrics.clone())
if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id()) {
let handle = create_source_worker_handle(source, self.metrics.clone())
.await
.context("failed to create source worker")?;
e.insert(handle);
} else {
tracing::warn!("source {} already registered", source.get_id());
}
Ok(())
}

/// register connector worker for source.
pub async fn register_source_with_handle(
&self,
source_id: SourceId,
handle: ConnectorSourceWorkerHandle,
) {
let mut core = self.core.lock().await;
if let Entry::Vacant(e) = core.managed_sources.entry(source_id) {
e.insert(handle);
} else {
tracing::warn!("source {} already registered", source_id);
}
}

/// Unregister connector worker for source.
pub async fn unregister_sources(&self, source_ids: Vec<SourceId>) {
let mut core = self.core.lock().await;
Expand Down Expand Up @@ -1049,64 +1114,6 @@ impl SourceManager {
Ok(())
}

/// Used when registering new sources (`Self::register_source`).
///
/// It will call `ConnectorSourceWorker::tick()` to fetch split metadata once before returning.
async fn create_source_worker(
source: &Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
metrics: Arc<MetaMetrics>,
) -> MetaResult<()> {
tracing::info!("spawning new watcher for source {}", source.id);

let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();
let source_id = source.id;

let connector_properties = extract_prop_from_new_source(source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = dispatch_source_prop!(connector_properties, prop, {
let mut worker = ConnectorSourceWorker::create(
source,
*prop,
DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
current_splits_ref.clone(),
metrics,
)
.await?;

// if fail to fetch meta info, will refuse to create source

// todo: make the timeout configurable, longer than `properties.sync.call.timeout`
// in kafka
tokio::time::timeout(Self::DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick())
.await
.ok()
.with_context(|| {
format!(
"failed to fetch meta info for source {}, timeout {:?}",
source.id,
Self::DEFAULT_SOURCE_TICK_TIMEOUT
)
})??;

tokio::spawn(async move { worker.run(sync_call_rx).await })
});

managed_sources.insert(
source_id,
ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits,
enable_scale_in,
},
);

Ok(())
}

pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
let core = self.core.lock().await;
core.actor_splits.clone()
Expand Down

0 comments on commit f505359

Please sign in to comment.