Skip to content

Commit

Permalink
support catchup_specific_quorum
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Nov 6, 2024
1 parent 27c67ee commit ce0d3af
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 36 deletions.
25 changes: 16 additions & 9 deletions crates/libs/core/src/runtime/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
// ------------------------------------------------------------

// stateful contains rs definition of stateful traits that user needs to implement

use crate::{GUID, HSTRING};
use mssf_com::FabricRuntime::IFabricStatefulServicePartition;

use crate::sync::CancellationToken;
use crate::types::{LoadMetric, LoadMetricListRef, ReplicaRole};

use super::stateful_types::{Epoch, OpenMode, ReplicaInfo, ReplicaSetConfig, ReplicaSetQuarumMode};
use super::stateful_types::{
Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuarumMode,
};

/// Represents a stateful service factory that is responsible for creating replicas
/// of a specific type of stateful service. Stateful service factories are registered with
Expand All @@ -20,10 +20,10 @@ pub trait StatefulServiceFactory {
/// Called by Service Fabric to create a stateful service replica for a particular service.
fn create_replica(
&self,
servicetypename: &HSTRING,
servicename: &HSTRING,
servicetypename: &crate::HSTRING,
servicename: &crate::HSTRING,
initializationdata: &[u8],
partitionid: &GUID,
partitionid: &crate::GUID,
replicaid: i64,
) -> crate::Result<impl StatefulServiceReplica>;
}
Expand All @@ -36,6 +36,8 @@ pub trait StatefulServiceFactory {
pub trait LocalStatefulServiceReplica: Send + Sync + 'static {
/// Opens an initialized service replica so that additional actions can be taken.
/// Returns PrimaryReplicator that is used by the stateful service.
// Note: we use async trait and cannot use dynamic dispatch to build a similar
// interface hierachy for COM objects. The return type is subject to change in future.
async fn open(
&self,
openmode: OpenMode,
Expand All @@ -54,7 +56,7 @@ pub trait LocalStatefulServiceReplica: Send + Sync + 'static {
&self,
newrole: ReplicaRole,
cancellation_token: CancellationToken,
) -> crate::Result<HSTRING>;
) -> crate::Result<crate::HSTRING>;

/// Closes the service replica gracefully when it is being shut down.
async fn close(&self, cancellation_token: CancellationToken) -> crate::Result<()>;
Expand Down Expand Up @@ -95,7 +97,7 @@ impl From<&IFabricStatefulServicePartition> for StatefulServicePartition {
/// TODO: replicator has no public documentation
#[trait_variant::make(Replicator: Send)]
pub trait LocalReplicator: Send + Sync + 'static {
async fn open(&self, cancellation_token: CancellationToken) -> crate::Result<HSTRING>; // replicator address
async fn open(&self, cancellation_token: CancellationToken) -> crate::Result<crate::HSTRING>; // replicator address
async fn close(&self, cancellation_token: CancellationToken) -> crate::Result<()>;
async fn change_role(
&self,
Expand Down Expand Up @@ -123,6 +125,7 @@ pub trait LocalReplicator: Send + Sync + 'static {
}

/// TODO: primary replicator has no public documentation
/// IFabricPrimaryReplicator com interface wrapper.
#[trait_variant::make(PrimaryReplicator: Send)]
pub trait LocalPrimaryReplicator: Replicator {
// SF calls this to indicate that possible data loss has occurred (write quorum loss),
Expand All @@ -145,8 +148,12 @@ pub trait LocalPrimaryReplicator: Replicator {
) -> crate::Result<()>;
async fn build_replica(
&self,
replica: &ReplicaInfo,
replica: &ReplicaInformation,
cancellation_token: CancellationToken,
) -> crate::Result<()>;
fn remove_replica(&self, replicaid: i64) -> crate::Result<()>;
}

// IFabricReplicatorCatchupSpecificQuorum
// replicator is checked to have this interface
// implemented: https://github.com/microsoft/service-fabric/blob/9d25e17d9e19ca46bbd1142bfcbae8416ba45e61/src/prod/src/Reliability/Failover/ra/ComProxyReplicator.h#L25
45 changes: 36 additions & 9 deletions crates/libs/core/src/runtime/stateful_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

use std::sync::Arc;

use crate::{Interface, HSTRING};
use tracing::info;
use windows_core::implement;
use windows_core::{Interface, HSTRING};

use mssf_com::{
FabricCommon::IFabricStringResult,
FabricRuntime::{
IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
IFabricReplicatorCatchupSpecificQuorum, IFabricReplicatorCatchupSpecificQuorum_Impl,
IFabricReplicator_Impl, IFabricStatefulServiceFactory, IFabricStatefulServiceFactory_Impl,
IFabricStatefulServicePartition, IFabricStatefulServiceReplica,
IFabricStatefulServiceReplica_Impl,
Expand All @@ -31,7 +32,7 @@ use mssf_com::{
use crate::{
runtime::{
stateful::StatefulServicePartition,
stateful_types::{Epoch, OpenMode, ReplicaInfo, ReplicaSetConfig},
stateful_types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig},
},
strings::HSTRINGWrap,
sync::BridgeContext3,
Expand All @@ -42,6 +43,9 @@ use super::{
executor::Executor,
stateful::{PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica},
};
// bridges from rs into com

// region: StatefulServiceFactoryBridge

#[implement(IFabricStatefulServiceFactory)]
pub struct StatefulServiceFactoryBridge<E, F>
Expand Down Expand Up @@ -103,9 +107,11 @@ where
}
}

// bridges from rs into com
// endregion: StatefulServiceFactoryBridge

// region: IFabricReplicatorBridge

// bridge from safe service instance to com
/// bridge from safe service instance to com
#[implement(IFabricReplicator)]

pub struct IFabricReplicatorBridge<E, R>
Expand Down Expand Up @@ -257,8 +263,17 @@ where
}
}

// primary replicator bridge
#[implement(IFabricPrimaryReplicator)]
// endregion: IFabricReplicatorBridge

// region: IFabricPrimaryReplicatorBridge

/// Primary replicator bridge.
/// mssf_core only supports primary replicator with IFabricReplicatorCatchupSpecificQuorum enabled,
/// which allows an IReplicator to indicate that it supports catching up specific quorums with the
/// use of the MustCatchup flag in ReplicaInformation.
// Nearly all replicators in cpp and csharp all enables CatchupSpecificQuorum, and not enabling it
// is a rare case.
#[implement(IFabricPrimaryReplicator, IFabricReplicatorCatchupSpecificQuorum)]
pub struct IFabricPrimaryReplicatorBridge<E, P>
where
E: Executor,
Expand Down Expand Up @@ -458,7 +473,7 @@ where
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
let inner = self.inner.clone();
let r = ReplicaInfo::from(unsafe { replica.as_ref().unwrap() });
let r = ReplicaInformation::from(unsafe { replica.as_ref().unwrap() });
info!("IFabricPrimaryReplicatorBridge::BeginBuildReplica: {:?}", r);
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(
Expand All @@ -481,8 +496,18 @@ where
}
}

// bridge for replica
// bridge from safe service instance to com
impl<E, P> IFabricReplicatorCatchupSpecificQuorum_Impl for IFabricPrimaryReplicatorBridge<E, P>
where
E: Executor,
P: PrimaryReplicator,
{
}
// endregion: IFabricPrimaryReplicatorBridge

// region: IFabricStatefulServiceReplicaBridge

// Bridge for stateful service replica
// Bridge from safe service instance to com
#[implement(IFabricStatefulServiceReplica)]

pub struct IFabricStatefulServiceReplicaBridge<E, R>
Expand Down Expand Up @@ -594,3 +619,5 @@ where
self.inner.as_ref().abort();
}
}

// endregion: IFabricStatefulServiceReplicaBridge
22 changes: 19 additions & 3 deletions crates/libs/core/src/runtime/stateful_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
use std::ffi::c_void;

use mssf_com::FabricRuntime::{
IFabricPrimaryReplicator, IFabricReplicator, IFabricStatefulServiceReplica,
IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
IFabricStatefulServiceReplica,
};
use tracing::info;
use windows_core::{Interface, HSTRING};
Expand All @@ -22,7 +23,7 @@ use crate::{

use super::{
stateful::{PrimaryReplicator, Replicator, StatefulServicePartition, StatefulServiceReplica},
stateful_types::{Epoch, OpenMode, ReplicaInfo, ReplicaSetConfig, ReplicaSetQuarumMode},
stateful_types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuarumMode},
};

pub struct StatefulServiceReplicaProxy {
Expand Down Expand Up @@ -53,9 +54,20 @@ impl StatefulServiceReplica for StatefulServiceReplicaProxy {
Some(cancellation_token),
);
let rplctr = rx.await??;

// Check COM interface is implemented.
let catchup_specific_quorum = rplctr
.cast::<IFabricReplicatorCatchupSpecificQuorum>()
.is_ok();
assert!(
catchup_specific_quorum,
"mssf does not support replicator without catchup_specific_quorum interface"
);

// TODO: cast without clone will cause access violation on AddRef in SF runtime.
let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); // must work
// Replicator must impl primary replicator as well.

let res = PrimaryReplicatorProxy::new(p_rplctr);
Ok(res)
}
Expand Down Expand Up @@ -183,6 +195,10 @@ impl PrimaryReplicatorProxy {
let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
PrimaryReplicatorProxy { com_impl, parent }
}

pub fn get_com(&self) -> &IFabricPrimaryReplicator {
&self.com_impl
}
}

impl Replicator for PrimaryReplicatorProxy {
Expand Down Expand Up @@ -272,7 +288,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
}
async fn build_replica(
&self,
replica: &ReplicaInfo,
replica: &ReplicaInformation,
cancellation_token: CancellationToken,
) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::build_replica");
Expand Down
23 changes: 12 additions & 11 deletions crates/libs/core/src/runtime/stateful_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl From<FABRIC_REPLICA_STATUS> for ReplicaStatus {
// Safe wrapping for FABRIC_REPLICA_SET_CONFIGURATION
#[derive(Debug)]
pub struct ReplicaSetConfig {
pub replicas: Vec<ReplicaInfo>,
pub replicas: Vec<ReplicaInformation>,
pub write_quorum: u32,
}

Expand All @@ -142,7 +142,7 @@ impl From<&FABRIC_REPLICA_SET_CONFIGURATION> for ReplicaSetConfig {
for i in 0..r.ReplicaCount {
let replica = unsafe { r.Replicas.offset(i as isize) };
let replica_ref = unsafe { replica.as_ref().unwrap() };
res.replicas.push(ReplicaInfo::from(replica_ref))
res.replicas.push(ReplicaInformation::from(replica_ref))
}
res
}
Expand Down Expand Up @@ -202,19 +202,20 @@ impl ReplicaSetConfigView<'_> {
}
}

// Safe wrapping for FABRIC_REPLICA_INFORMATION
/// Safe wrapping for FABRIC_REPLICA_INFORMATION
#[derive(Debug, PartialEq, Clone)]
pub struct ReplicaInfo {
pub struct ReplicaInformation {
pub id: i64,
pub role: ReplicaRole,
pub status: ReplicaStatus,
pub replicator_address: HSTRING,
pub current_progress: i64,
pub catch_up_capability: i64,
/// indicating whether the replica must be caught up as part of a WaitForQuorumCatchup
pub must_catch_up: bool,
}

impl From<&FABRIC_REPLICA_INFORMATION> for ReplicaInfo {
impl From<&FABRIC_REPLICA_INFORMATION> for ReplicaInformation {
fn from(r: &FABRIC_REPLICA_INFORMATION) -> Self {
let ex1 = r.Reserved as *const FABRIC_REPLICA_INFORMATION_EX1;
let mut must_catchup = false;
Expand All @@ -223,7 +224,7 @@ impl From<&FABRIC_REPLICA_INFORMATION> for ReplicaInfo {
must_catchup = ex1ref.MustCatchup.as_bool();
}
}
ReplicaInfo {
ReplicaInformation {
id: r.Id,
role: (&r.Role).into(),
status: r.Status.into(),
Expand All @@ -235,7 +236,7 @@ impl From<&FABRIC_REPLICA_INFORMATION> for ReplicaInfo {
}
}

impl ReplicaInfo {
impl ReplicaInformation {
// The parts have the same lifetime as self.
// Caller needs to stitch the parts together, i.e.
// FABRIC_REPLICA_INFORMATION::Reserved needs to point at FABRIC_REPLICA_INFORMATION_EX1
Expand Down Expand Up @@ -295,7 +296,7 @@ mod test {
FABRIC_REPLICA_STATUS_UP,
};

use super::{Epoch, ReplicaInfo, ReplicaSetConfig};
use super::{Epoch, ReplicaInformation, ReplicaSetConfig};

// caller needs to stitch the reserved ptr.
fn create_test_data(id: i64) -> (FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_INFORMATION_EX1) {
Expand All @@ -321,7 +322,7 @@ mod test {
info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;

// test raw -> wrap
let wrap = ReplicaInfo::from(&info);
let wrap = ReplicaInformation::from(&info);
assert_eq!(wrap.id, 123);
assert!(wrap.must_catch_up);

Expand All @@ -333,7 +334,7 @@ mod test {

#[test]
fn test_replica_set_config_conv() {
let replica1 = ReplicaInfo {
let replica1 = ReplicaInformation {
id: 1,
role: super::ReplicaRole::Primary,
status: super::ReplicaStatus::Up,
Expand All @@ -343,7 +344,7 @@ mod test {
must_catch_up: true,
};

let replica2 = ReplicaInfo {
let replica2 = ReplicaInformation {
id: 2,
role: super::ReplicaRole::ActiveSecondary,
status: super::ReplicaStatus::Up,
Expand Down
8 changes: 5 additions & 3 deletions crates/samples/echomain-stateful2/src/statefulstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use mssf_core::{
PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServicePartition,
StatefulServiceReplica,
},
stateful_types::{Epoch, OpenMode, ReplicaInfo, ReplicaSetConfig, ReplicaSetQuarumMode},
stateful_types::{
Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuarumMode,
},
},
types::ReplicaRole,
};
Expand Down Expand Up @@ -138,7 +140,7 @@ impl PrimaryReplicator for AppFabricReplicator {

async fn build_replica(
&self,
_replica: &ReplicaInfo,
_replica: &ReplicaInformation,
_: CancellationToken,
) -> mssf_core::Result<()> {
info!("AppFabricReplicator2::PrimaryReplicator::build_replica");
Expand Down Expand Up @@ -241,7 +243,7 @@ impl StatefulServiceReplica for Replica {
openmode: OpenMode,
partition: &StatefulServicePartition,
_: CancellationToken,
) -> mssf_core::Result<impl PrimaryReplicator + 'static> {
) -> mssf_core::Result<impl PrimaryReplicator> {
// should be primary replicator
info!("Replica::open {:?}", openmode);
self.svc.start_loop_in_background(partition);
Expand Down
2 changes: 1 addition & 1 deletion crates/samples/kvstore/manifests/ApplicationManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ServiceManifestImport>
<DefaultServices>
<Service Name="KvStoreService">
<StatefulService ServiceTypeName="KvStoreService" TargetReplicaSetSize="1" MinReplicaSetSize="1">
<StatefulService ServiceTypeName="KvStoreService" TargetReplicaSetSize="3" MinReplicaSetSize="3">
<SingletonPartition />
</StatefulService>
</Service>
Expand Down

0 comments on commit ce0d3af

Please sign in to comment.