Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persist migration plan to support one-by-one migration of streaming job #9641

Merged
merged 5 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ message ActorLocation {
repeated stream_plan.StreamActor actors = 2;
}

message MigrationPlan {
// map<old_worker_id, worker_id>, the plan indicates that the actors will be migrated from old worker to the new one.
map<uint32, uint32> migration_plan = 1;
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 2;
}

message FlushRequest {
bool checkpoint = 1;
}
Expand Down
159 changes: 107 additions & 52 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
Expand All @@ -34,7 +34,7 @@ use crate::barrier::command::CommandContext;
use crate::barrier::info::BarrierActorInfo;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::ActorId;
use crate::model::MigrationPlan;
use crate::storage::MetaStore;
use crate::stream::build_actor_connector_splits;
use crate::MetaResult;
Expand Down Expand Up @@ -220,45 +220,101 @@ where
new_epoch
}

/// map expired CNs to newly joined CNs, so we can migrate actors later
/// wait until get a sufficient amount of new CNs
/// return "map of `ActorId` in expired CN to new CN id" and "map of `WorkerId` to
/// `WorkerNode` struct in new CNs"
async fn get_migrate_map_plan(
/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn migrate_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start migrate actors.");

// 1. get expired workers.
let expired_workers: HashSet<WorkerId> = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect();
if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
}
let migration_plan = self.generate_migration_plan(info, expired_workers).await?;
// 2. start to migrate fragment one-by-one.
self.fragment_manager
.migrate_fragment_actors(&migration_plan)
.await?;
// 3. remove the migration plan.
migration_plan.delete(self.env.meta_store()).await?;

debug!("migrate actors succeed.");
Ok(true)
}

/// This function will generate a migration plan, which includes:
/// 1. mapping for all expired and in-used worker to a new one.
/// 2. mapping for all expired and in-used parallel unit to a new one.
/// 3. cached worker parallel units.
async fn generate_migration_plan(
&self,
info: &BarrierActorInfo,
expired_workers: &[WorkerId],
) -> (HashMap<ActorId, WorkerId>, HashMap<WorkerId, WorkerNode>) {
let mut cur = 0;
let mut migrate_map = HashMap::new();
let mut node_map = HashMap::new();
mut expired_workers: HashSet<WorkerId>,
) -> MetaResult<MigrationPlan> {
let mut cached_plan = MigrationPlan::get(self.env.meta_store()).await?;

// filter out workers that are already in migration plan.
if !cached_plan.is_empty() {
// clean up expired workers that are already in migration plan and haven't been used by
// any actors.
cached_plan
.worker_plan
.retain(|_, to| expired_workers.contains(to) || info.actor_map.contains_key(to));
cached_plan.parallel_unit_plan.retain(|_, to| {
expired_workers.contains(&to.worker_node_id)
|| info.actor_map.contains_key(&to.worker_node_id)
});

expired_workers.retain(|id| !cached_plan.worker_plan.contains_key(id));
}

if expired_workers.is_empty() {
// all expired workers are already in migration plan.
debug!("all expired workers are already in migration plan.");
return Ok(cached_plan);
}
debug!("got expired workers {:#?}", expired_workers);
let mut expired_workers = expired_workers.into_iter().collect_vec();
let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let start = Instant::now();
while cur < expired_workers.len() {
// if expire workers are not empty, should wait for newly joined worker.
'discovery: while !expired_workers.is_empty() {
let current_nodes = self
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let new_nodes = current_nodes
.into_iter()
.filter(|node| {
!info.actor_map.contains_key(&node.id) && !node_map.contains_key(&node.id)
!info.actor_map.contains_key(&node.id)
&& !cached_plan.worker_plan.values().contains(&node.id)
})
.collect_vec();

for new_node in new_nodes {
let actors = info.actor_map.get(&expired_workers[cur]).unwrap();
for actor in actors {
migrate_map.insert(*actor, new_node.id);
}
cur += 1;
debug!(
"new worker joined: {}, migrate process ({}/{})",
new_node.id,
cur,
expired_workers.len()
);
node_map.insert(new_node.id, new_node);
if cur == expired_workers.len() {
return (migrate_map, node_map);
if let Some(from) = expired_workers.pop() {
debug!(
"new worker joined, plan to migrate from worker {} to {}",
from, new_node.id
);
cached_plan.worker_plan.insert(from, new_node.id);
assert!(all_worker_parallel_units.contains_key(&from));
let from_parallel_units = all_worker_parallel_units.get(&from).unwrap();
// todo: remove it and migrate actors only based on parallel unit mapping.
assert!(from_parallel_units.len() <= new_node.parallel_units.len());
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
for (i, pu) in from_parallel_units.iter().enumerate() {
cached_plan
.parallel_unit_plan
.insert(*pu, new_node.parallel_units[i].clone());
}
} else {
break 'discovery;
}
}
warn!(
Expand All @@ -268,33 +324,32 @@ where
// wait to get newly joined CN
tokio::time::sleep(Duration::from_millis(100)).await;
}
(migrate_map, node_map)
}

async fn migrate_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start migrate actors.");

// 1. get expired workers
let expired_workers = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect_vec();
if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
// update migration plan, if there is a chain in the plan, update it.
let mut new_plan = MigrationPlan::default();
for (from, to) in &cached_plan.worker_plan {
let mut to = *to;
while let Some(target) = cached_plan.worker_plan.get(&to) {
to = *target;
}
new_plan.worker_plan.insert(*from, to);
}
for (from, to) in &cached_plan.parallel_unit_plan {
let mut to = to.clone();
while let Some(target) = cached_plan.parallel_unit_plan.get(&to.id) {
to = target.clone();
}
new_plan.parallel_unit_plan.insert(*from, to);
}
debug!("got expired workers {:#?}", expired_workers);

let (migrate_map, node_map) = self.get_migrate_map_plan(info, &expired_workers).await;
// 2. migrate actors in fragments
self.fragment_manager
.migrate_actors(&migrate_map, &node_map)
.await?;
debug!("migrate actors succeed.");
assert!(
new_plan.worker_plan.values().all_unique(),
"target workers must be unique: {:?}",
new_plan.worker_plan
);

Ok(true)
new_plan.insert(self.env.meta_store()).await?;
Ok(new_plan)
}

/// Update all actors in compute nodes.
Expand Down
122 changes: 43 additions & 79 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitMapping};
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand All @@ -40,7 +38,8 @@ use crate::barrier::Reschedule;
use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, TableFragments, ValTransaction,
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
};
use crate::storage::{MetaStore, Transaction};
use crate::stream::SplitAssignment;
Expand Down Expand Up @@ -123,32 +122,6 @@ where
!self.core.read().await.table_fragments.is_empty()
}

pub async fn batch_update_table_fragments(
&self,
table_fragments: &[TableFragments],
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
if table_fragments
.iter()
.any(|tf| !map.contains_key(&tf.table_id()))
{
bail!("update table fragments fail, table not found");
}

let mut table_fragments_txn = BTreeMapTransaction::new(map);
table_fragments.iter().for_each(|tf| {
table_fragments_txn.insert(tf.table_id(), tf.clone());
});
commit_meta!(self, table_fragments_txn)?;

for table_fragment in table_fragments {
self.notify_fragment_mapping(table_fragment, Operation::Update)
.await;
}

Ok(())
}

async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
for fragment in table_fragment.fragments.values() {
if !fragment.state_table_ids.is_empty() {
Expand Down Expand Up @@ -466,59 +439,50 @@ where
}

/// Used in [`crate::barrier::GlobalBarrierManager`]
/// migrate actors and update fragments, generate migrate info
pub async fn migrate_actors(
&self,
migrate_map: &HashMap<ActorId, WorkerId>,
node_map: &HashMap<WorkerId, WorkerNode>,
) -> MetaResult<()> {
let mut parallel_unit_migrate_map = HashMap::new();
let mut pu_map: HashMap<WorkerId, Vec<&ParallelUnit>> = node_map
.iter()
.map(|(&worker_id, worker)| (worker_id, worker.parallel_units.iter().collect_vec()))
.collect();

// update actor status and generate pu to pu migrate info
let mut table_fragments = self.list_table_fragments().await?;
let mut new_fragments = Vec::new();
table_fragments.iter_mut().for_each(|fragment| {
let mut flag = false;
fragment
.actor_status
.iter_mut()
.for_each(|(actor_id, status)| {
if let Some(new_node_id) = migrate_map.get(actor_id) {
if let Some(ref old_parallel_unit) = status.parallel_unit {
flag = true;
if let Entry::Vacant(e) =
parallel_unit_migrate_map.entry(old_parallel_unit.id)
{
let new_parallel_unit =
pu_map.get_mut(new_node_id).unwrap().pop().unwrap();
e.insert(new_parallel_unit.clone());
status.parallel_unit = Some(new_parallel_unit.clone());
} else {
status.parallel_unit = Some(
parallel_unit_migrate_map
.get(&old_parallel_unit.id)
.unwrap()
.clone(),
);
}
}
};
});
if flag {
// update vnode mapping of updated fragments
fragment.update_vnode_mapping(&parallel_unit_migrate_map);
new_fragments.push(fragment.clone());
/// migrate actors and update fragments one by one according to the migration plan.
pub async fn migrate_fragment_actors(&self, migration_plan: &MigrationPlan) -> MetaResult<()> {
let table_fragments = self.list_table_fragments().await?;
for mut table_fragment in table_fragments {
let mut updated = false;
for status in table_fragment.actor_status.values_mut() {
if let Some(pu) = &status.parallel_unit && migration_plan.parallel_unit_plan.contains_key(&pu.id) {
updated = true;
status.parallel_unit = Some(migration_plan.parallel_unit_plan[&pu.id].clone());
}
}
if updated {
table_fragment.update_vnode_mapping(&migration_plan.parallel_unit_plan);
let map = &mut self.core.write().await.table_fragments;
if map.contains_key(&table_fragment.table_id()) {
let mut txn = BTreeMapTransaction::new(map);
txn.insert(table_fragment.table_id(), table_fragment.clone());
commit_meta!(self, txn)?;
self.notify_fragment_mapping(&table_fragment, Operation::Update)
.await;
}
}
});
// update fragments
self.batch_update_table_fragments(&new_fragments).await?;
}

Ok(())
}

pub async fn all_worker_parallel_units(&self) -> HashMap<WorkerId, HashSet<ParallelUnitId>> {
let mut all_worker_parallel_units = HashMap::new();
let map = &self.core.read().await.table_fragments;
for table_fragment in map.values() {
table_fragment.worker_parallel_units().into_iter().for_each(
|(worker_id, parallel_units)| {
all_worker_parallel_units
.entry(worker_id)
.or_insert_with(HashSet::new)
.extend(parallel_units);
},
);
}

all_worker_parallel_units
}

pub async fn all_node_actors(
&self,
include_inactive: bool,
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ where
S: MetaStore,
{
pub async fn new(env: MetaSrvEnv<S>, max_heartbeat_interval: Duration) -> MetaResult<Self> {
let meta_store = env.meta_store_ref();
let core = ClusterManagerCore::new(meta_store.clone()).await?;
let core = ClusterManagerCore::new(env.meta_store_ref()).await?;

Ok(Self {
env,
Expand Down
Loading