Skip to content

Commit

Permalink
feat: persist migration plan to support one-by-one migration of strea…
Browse files Browse the repository at this point in the history
…ming job (#9641)
  • Loading branch information
yezizp2012 authored May 22, 2023
1 parent f522f68 commit 7167524
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 133 deletions.
7 changes: 7 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ message ActorLocation {
repeated stream_plan.StreamActor actors = 2;
}

message MigrationPlan {
// map<old_worker_id, worker_id>, the plan indicates that the actors will be migrated from old worker to the new one.
map<uint32, uint32> migration_plan = 1;
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 2;
}

message FlushRequest {
bool checkpoint = 1;
}
Expand Down
159 changes: 107 additions & 52 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
Expand All @@ -34,7 +34,7 @@ use crate::barrier::command::CommandContext;
use crate::barrier::info::BarrierActorInfo;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::ActorId;
use crate::model::MigrationPlan;
use crate::storage::MetaStore;
use crate::stream::build_actor_connector_splits;
use crate::MetaResult;
Expand Down Expand Up @@ -220,45 +220,101 @@ where
new_epoch
}

/// map expired CNs to newly joined CNs, so we can migrate actors later
/// wait until get a sufficient amount of new CNs
/// return "map of `ActorId` in expired CN to new CN id" and "map of `WorkerId` to
/// `WorkerNode` struct in new CNs"
async fn get_migrate_map_plan(
/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn migrate_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start migrate actors.");

// 1. get expired workers.
let expired_workers: HashSet<WorkerId> = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect();
if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
}
let migration_plan = self.generate_migration_plan(info, expired_workers).await?;
// 2. start to migrate fragment one-by-one.
self.fragment_manager
.migrate_fragment_actors(&migration_plan)
.await?;
// 3. remove the migration plan.
migration_plan.delete(self.env.meta_store()).await?;

debug!("migrate actors succeed.");
Ok(true)
}

/// This function will generate a migration plan, which includes:
/// 1. mapping for all expired and in-used worker to a new one.
/// 2. mapping for all expired and in-used parallel unit to a new one.
/// 3. cached worker parallel units.
async fn generate_migration_plan(
&self,
info: &BarrierActorInfo,
expired_workers: &[WorkerId],
) -> (HashMap<ActorId, WorkerId>, HashMap<WorkerId, WorkerNode>) {
let mut cur = 0;
let mut migrate_map = HashMap::new();
let mut node_map = HashMap::new();
mut expired_workers: HashSet<WorkerId>,
) -> MetaResult<MigrationPlan> {
let mut cached_plan = MigrationPlan::get(self.env.meta_store()).await?;

// filter out workers that are already in migration plan.
if !cached_plan.is_empty() {
// clean up expired workers that are already in migration plan and haven't been used by
// any actors.
cached_plan
.worker_plan
.retain(|_, to| expired_workers.contains(to) || info.actor_map.contains_key(to));
cached_plan.parallel_unit_plan.retain(|_, to| {
expired_workers.contains(&to.worker_node_id)
|| info.actor_map.contains_key(&to.worker_node_id)
});

expired_workers.retain(|id| !cached_plan.worker_plan.contains_key(id));
}

if expired_workers.is_empty() {
// all expired workers are already in migration plan.
debug!("all expired workers are already in migration plan.");
return Ok(cached_plan);
}
debug!("got expired workers {:#?}", expired_workers);
let mut expired_workers = expired_workers.into_iter().collect_vec();
let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let start = Instant::now();
while cur < expired_workers.len() {
// if expire workers are not empty, should wait for newly joined worker.
'discovery: while !expired_workers.is_empty() {
let current_nodes = self
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let new_nodes = current_nodes
.into_iter()
.filter(|node| {
!info.actor_map.contains_key(&node.id) && !node_map.contains_key(&node.id)
!info.actor_map.contains_key(&node.id)
&& !cached_plan.worker_plan.values().contains(&node.id)
})
.collect_vec();

for new_node in new_nodes {
let actors = info.actor_map.get(&expired_workers[cur]).unwrap();
for actor in actors {
migrate_map.insert(*actor, new_node.id);
}
cur += 1;
debug!(
"new worker joined: {}, migrate process ({}/{})",
new_node.id,
cur,
expired_workers.len()
);
node_map.insert(new_node.id, new_node);
if cur == expired_workers.len() {
return (migrate_map, node_map);
if let Some(from) = expired_workers.pop() {
debug!(
"new worker joined, plan to migrate from worker {} to {}",
from, new_node.id
);
cached_plan.worker_plan.insert(from, new_node.id);
assert!(all_worker_parallel_units.contains_key(&from));
let from_parallel_units = all_worker_parallel_units.get(&from).unwrap();
// todo: remove it and migrate actors only based on parallel unit mapping.
assert!(from_parallel_units.len() <= new_node.parallel_units.len());
for (i, pu) in from_parallel_units.iter().enumerate() {
cached_plan
.parallel_unit_plan
.insert(*pu, new_node.parallel_units[i].clone());
}
} else {
break 'discovery;
}
}
warn!(
Expand All @@ -268,33 +324,32 @@ where
// wait to get newly joined CN
tokio::time::sleep(Duration::from_millis(100)).await;
}
(migrate_map, node_map)
}

async fn migrate_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start migrate actors.");

// 1. get expired workers
let expired_workers = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect_vec();
if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
// update migration plan, if there is a chain in the plan, update it.
let mut new_plan = MigrationPlan::default();
for (from, to) in &cached_plan.worker_plan {
let mut to = *to;
while let Some(target) = cached_plan.worker_plan.get(&to) {
to = *target;
}
new_plan.worker_plan.insert(*from, to);
}
for (from, to) in &cached_plan.parallel_unit_plan {
let mut to = to.clone();
while let Some(target) = cached_plan.parallel_unit_plan.get(&to.id) {
to = target.clone();
}
new_plan.parallel_unit_plan.insert(*from, to);
}
debug!("got expired workers {:#?}", expired_workers);

let (migrate_map, node_map) = self.get_migrate_map_plan(info, &expired_workers).await;
// 2. migrate actors in fragments
self.fragment_manager
.migrate_actors(&migrate_map, &node_map)
.await?;
debug!("migrate actors succeed.");
assert!(
new_plan.worker_plan.values().all_unique(),
"target workers must be unique: {:?}",
new_plan.worker_plan
);

Ok(true)
new_plan.insert(self.env.meta_store()).await?;
Ok(new_plan)
}

/// Update all actors in compute nodes.
Expand Down
122 changes: 43 additions & 79 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitMapping};
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand All @@ -40,7 +38,8 @@ use crate::barrier::Reschedule;
use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, TableFragments, ValTransaction,
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
};
use crate::storage::{MetaStore, Transaction};
use crate::stream::SplitAssignment;
Expand Down Expand Up @@ -123,32 +122,6 @@ where
!self.core.read().await.table_fragments.is_empty()
}

pub async fn batch_update_table_fragments(
&self,
table_fragments: &[TableFragments],
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
if table_fragments
.iter()
.any(|tf| !map.contains_key(&tf.table_id()))
{
bail!("update table fragments fail, table not found");
}

let mut table_fragments_txn = BTreeMapTransaction::new(map);
table_fragments.iter().for_each(|tf| {
table_fragments_txn.insert(tf.table_id(), tf.clone());
});
commit_meta!(self, table_fragments_txn)?;

for table_fragment in table_fragments {
self.notify_fragment_mapping(table_fragment, Operation::Update)
.await;
}

Ok(())
}

async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
for fragment in table_fragment.fragments.values() {
if !fragment.state_table_ids.is_empty() {
Expand Down Expand Up @@ -466,59 +439,50 @@ where
}

/// Used in [`crate::barrier::GlobalBarrierManager`]
/// migrate actors and update fragments, generate migrate info
pub async fn migrate_actors(
&self,
migrate_map: &HashMap<ActorId, WorkerId>,
node_map: &HashMap<WorkerId, WorkerNode>,
) -> MetaResult<()> {
let mut parallel_unit_migrate_map = HashMap::new();
let mut pu_map: HashMap<WorkerId, Vec<&ParallelUnit>> = node_map
.iter()
.map(|(&worker_id, worker)| (worker_id, worker.parallel_units.iter().collect_vec()))
.collect();

// update actor status and generate pu to pu migrate info
let mut table_fragments = self.list_table_fragments().await?;
let mut new_fragments = Vec::new();
table_fragments.iter_mut().for_each(|fragment| {
let mut flag = false;
fragment
.actor_status
.iter_mut()
.for_each(|(actor_id, status)| {
if let Some(new_node_id) = migrate_map.get(actor_id) {
if let Some(ref old_parallel_unit) = status.parallel_unit {
flag = true;
if let Entry::Vacant(e) =
parallel_unit_migrate_map.entry(old_parallel_unit.id)
{
let new_parallel_unit =
pu_map.get_mut(new_node_id).unwrap().pop().unwrap();
e.insert(new_parallel_unit.clone());
status.parallel_unit = Some(new_parallel_unit.clone());
} else {
status.parallel_unit = Some(
parallel_unit_migrate_map
.get(&old_parallel_unit.id)
.unwrap()
.clone(),
);
}
}
};
});
if flag {
// update vnode mapping of updated fragments
fragment.update_vnode_mapping(&parallel_unit_migrate_map);
new_fragments.push(fragment.clone());
/// migrate actors and update fragments one by one according to the migration plan.
pub async fn migrate_fragment_actors(&self, migration_plan: &MigrationPlan) -> MetaResult<()> {
let table_fragments = self.list_table_fragments().await?;
for mut table_fragment in table_fragments {
let mut updated = false;
for status in table_fragment.actor_status.values_mut() {
if let Some(pu) = &status.parallel_unit && migration_plan.parallel_unit_plan.contains_key(&pu.id) {
updated = true;
status.parallel_unit = Some(migration_plan.parallel_unit_plan[&pu.id].clone());
}
}
if updated {
table_fragment.update_vnode_mapping(&migration_plan.parallel_unit_plan);
let map = &mut self.core.write().await.table_fragments;
if map.contains_key(&table_fragment.table_id()) {
let mut txn = BTreeMapTransaction::new(map);
txn.insert(table_fragment.table_id(), table_fragment.clone());
commit_meta!(self, txn)?;
self.notify_fragment_mapping(&table_fragment, Operation::Update)
.await;
}
}
});
// update fragments
self.batch_update_table_fragments(&new_fragments).await?;
}

Ok(())
}

pub async fn all_worker_parallel_units(&self) -> HashMap<WorkerId, HashSet<ParallelUnitId>> {
let mut all_worker_parallel_units = HashMap::new();
let map = &self.core.read().await.table_fragments;
for table_fragment in map.values() {
table_fragment.worker_parallel_units().into_iter().for_each(
|(worker_id, parallel_units)| {
all_worker_parallel_units
.entry(worker_id)
.or_insert_with(HashSet::new)
.extend(parallel_units);
},
);
}

all_worker_parallel_units
}

pub async fn all_node_actors(
&self,
include_inactive: bool,
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ where
S: MetaStore,
{
pub async fn new(env: MetaSrvEnv<S>, max_heartbeat_interval: Duration) -> MetaResult<Self> {
let meta_store = env.meta_store_ref();
let core = ClusterManagerCore::new(meta_store.clone()).await?;
let core = ClusterManagerCore::new(env.meta_store_ref()).await?;

Ok(Self {
env,
Expand Down
Loading

0 comments on commit 7167524

Please sign in to comment.