Skip to content

Commit

Permalink
Fix initial checkpoint for broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
DogLooksGood committed Nov 6, 2024
1 parent ed19125 commit efc2781
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 99 deletions.
3 changes: 2 additions & 1 deletion core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct GameContext {
}

impl GameContext {
pub fn try_new_with_sub_game_spec(spec: &SubGameSpec) -> Result<Self> {
pub fn try_new_with_sub_game_spec(spec: &SubGameSpec, checkpoint: Checkpoint) -> Result<Self> {
let SubGameSpec {
game_addr,
nodes,
Expand All @@ -183,6 +183,7 @@ impl GameContext {
init_data: init_account.data.clone(),
entry_type: EntryType::Disabled,
allow_exit: false,
checkpoint,
..Default::default()
})
}
Expand Down
34 changes: 17 additions & 17 deletions transactor/src/component/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use race_api::event::Event;
use race_core::checkpoint::{Checkpoint, CheckpointOffChain};
use race_core::checkpoint::CheckpointOffChain;
use race_core::types::{BroadcastFrame, BroadcastSync, EventHistory, TxState};
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::component::common::{Component, ConsumerPorts};
use crate::frame::EventFrame;
Expand All @@ -36,15 +36,13 @@ pub struct EventBackupGroup {
pub events: LinkedList<EventBackup>,
pub settle_version: u64,
pub access_version: u64,
pub checkpoint: Option<Checkpoint>,
pub checkpoint_off_chain: Option<CheckpointOffChain>,
}

pub struct BroadcasterContext {
id: String,
event_backup_groups: Arc<Mutex<LinkedList<EventBackupGroup>>>,
broadcast_tx: broadcast::Sender<BroadcastFrame>,
#[allow(unused)]
debug_mode: bool,
}

/// A component that pushes event to clients.
Expand All @@ -57,7 +55,7 @@ pub struct Broadcaster {
}

impl Broadcaster {
pub fn init(id: String, game_id: usize, debug_mode: bool) -> (Self, BroadcasterContext) {
pub fn init(id: String, game_id: usize,) -> (Self, BroadcasterContext) {
let event_backup_groups = Arc::new(Mutex::new(LinkedList::new()));
let (broadcast_tx, broadcast_rx) = broadcast::channel(10);
drop(broadcast_rx);
Expand All @@ -70,7 +68,6 @@ impl Broadcaster {
},
BroadcasterContext {
id,
debug_mode,
event_backup_groups,
broadcast_tx,
},
Expand All @@ -82,17 +79,16 @@ impl Broadcaster {
}

pub async fn get_checkpoint(&self, settle_version: u64) -> Option<CheckpointOffChain> {

let event_backup_groups = self.event_backup_groups.lock().await;
info!("Get checkpoint with settle_version = {}", settle_version);

for group in event_backup_groups.iter() {
if group.settle_version == settle_version {
info!("Found checkpoint");
return group.checkpoint.as_ref().map(|cp| cp.derive_offchain_part());
return group.checkpoint_off_chain.clone();
}
}
info!("Found checkpoint");

warn!("Missing the checkpoint for settle_version = {}, the client won't be able to join this game.", settle_version);
None
}

Expand Down Expand Up @@ -122,7 +118,7 @@ impl Broadcaster {
}
frames.push(BroadcastFrame::EventHistories {
game_addr: self.id.clone(),
checkpoint_off_chain: group.checkpoint.as_ref().map(|c| c.derive_offchain_part()),
checkpoint_off_chain: group.checkpoint_off_chain.clone(),
histories,
state_sha: group.state_sha.clone(),
settle_version: group.settle_version,
Expand Down Expand Up @@ -178,10 +174,15 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
events: LinkedList::new(),
access_version,
settle_version,
checkpoint: Some(checkpoint),
checkpoint_off_chain: Some(checkpoint.derive_offchain_part()),
});
}
EventFrame::InitState { access_version, settle_version, .. } => {
EventFrame::InitState {
access_version,
settle_version,
checkpoint,
..
} => {
info!(
"{} Create new history group (via InitState) with access_version = {}, settle_version = {}",
env.log_prefix, access_version, settle_version
Expand All @@ -193,10 +194,9 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
events: LinkedList::new(),
access_version,
settle_version,
checkpoint: None,
checkpoint_off_chain: Some(checkpoint.derive_offchain_part()),
state_sha: "".into(),
});

}
EventFrame::TxState { tx_state } => match tx_state {
TxState::SettleSucceed { .. } => {
Expand Down Expand Up @@ -361,7 +361,7 @@ mod tests {
access_version: 10,
verify_key: "alice".into(),
}
.into()],
.into()],
access_version: 10,
};
let event_frame = EventFrame::TxState {
Expand Down
12 changes: 6 additions & 6 deletions transactor/src/component/event_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Component<PipelinePorts, EventBridgeParentContext> for EventBridgeParent {
event,
access_version,
settle_version,
checkpoint,
checkpoint_state,
} => {
info!("{} Receives event: {}", env.log_prefix, event);
ports
Expand All @@ -117,16 +117,16 @@ impl Component<PipelinePorts, EventBridgeParentContext> for EventBridgeParent {
event,
access_version,
settle_version,
checkpoint,
checkpoint_state,
})
.await;
}
_ => (),
}
} else {
match event_frame {
EventFrame::LaunchSubGame { spec } => {
let f = SignalFrame::LaunchSubGame { spec: *spec };
EventFrame::LaunchSubGame { spec, checkpoint } => {
let f = SignalFrame::LaunchSubGame { spec: *spec, checkpoint };
if let Err(e) = ctx.signal_tx.send(f).await {
error!("{} Failed to send: {}", env.log_prefix, e);
}
Expand Down Expand Up @@ -220,7 +220,7 @@ impl Component<PipelinePorts, EventBridgeChildContext> for EventBridgeChild {
event,
access_version,
settle_version,
checkpoint,
checkpoint_state,
} if dest == ctx.game_id => {
info!("{} Receives event: {}", env.log_prefix, event);
ports
Expand All @@ -230,7 +230,7 @@ impl Component<PipelinePorts, EventBridgeChildContext> for EventBridgeChild {
event,
access_version,
settle_version,
checkpoint,
checkpoint_state,
})
.await;
}
Expand Down
28 changes: 16 additions & 12 deletions transactor/src/component/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ async fn init_state(
mut init_account,
} = sub_game;
// Use the existing checkpoint when possible.
let checkpoint = cp.clone();
init_account.checkpoint = cp.get_data(id);
let settle_version = cp.get_version(id);

Expand All @@ -102,6 +103,7 @@ async fn init_state(
settle_version,
init_account,
}),
checkpoint,
};
ports.send(ef).await;
}
Expand Down Expand Up @@ -191,14 +193,14 @@ async fn handle_event(
if game_mode == GameMode::Main {
for sub_game in effects.launch_sub_games {
info!("{} Launch sub game: {}", env.log_prefix, sub_game.id);
let cp = game_context.checkpoint_mut();
let checkpoint = game_context.checkpoint().clone();
let SubGame {
bundle_addr,
id,
mut init_account,
} = sub_game;
// Use the existing checkpoint when possible.
init_account.checkpoint = cp.get_data(id);
init_account.checkpoint = checkpoint.get_data(id);

let ef = EventFrame::LaunchSubGame {
spec: Box::new(SubGameSpec {
Expand All @@ -210,6 +212,7 @@ async fn handle_event(
settle_version,
init_account,
}),
checkpoint,
};
ports.send(ef).await;
}
Expand All @@ -232,7 +235,7 @@ async fn handle_event(
},
access_version: game_context.access_version(),
settle_version: game_context.settle_version(),
checkpoint: state.clone(),
checkpoint_state: state.clone(),
};
ports.send(ef).await;
}
Expand Down Expand Up @@ -309,6 +312,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
init_account,
access_version,
settle_version,
..
} => {
if let Some(close_reason) = init_state(
init_account,
Expand All @@ -321,7 +325,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
ctx.game_mode,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand All @@ -342,7 +346,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand Down Expand Up @@ -402,7 +406,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand All @@ -424,7 +428,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand All @@ -440,7 +444,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
event,
dest,
from,
checkpoint,
checkpoint_state,
settle_version,
..
} => {
Expand All @@ -451,7 +455,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {

if game_context.game_id() == 0 && dest == 0 && from != 0 && settle_version > 0 {
info!("Update checkpoint for child game: {}", from);
game_context.checkpoint_mut().set_data(from, checkpoint)
game_context.checkpoint_mut().set_data(from, checkpoint_state)
}

if let Some(close_reason) = handle_event(
Expand All @@ -464,7 +468,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand All @@ -481,7 +485,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand All @@ -502,7 +506,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
timestamp,
&env,
)
.await
.await
{
ports.send(EventFrame::Shutdown).await;
return close_reason;
Expand Down
Loading

0 comments on commit efc2781

Please sign in to comment.