Skip to content

Commit

Permalink
feat: introduce puppet mode controller
Browse files Browse the repository at this point in the history
Signed-off-by: bestmike007 <i@bestmike007.com>
  • Loading branch information
bestmike007 committed Sep 19, 2024
1 parent bed29bc commit 989dd5f
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 0 deletions.
36 changes: 36 additions & 0 deletions testnet/stacks-node/conf/puppetnet-conf.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[node]
name = "helium-node"
rpc_bind = "127.0.0.1:20443"
p2p_bind = "127.0.0.1:20444"
use_test_genesis_chainstate = true
enable_puppet_mode = true
# puppet_bind = "127.0.0.1:20445"

[connection_options]
public_ip_address = "127.0.0.1:20444"

[burnchain]
chain = "bitcoin"
mode = "mocknet"
commit_anchor_block_within = 0

# These are addresses from the README.md
[[ustx_balance]]
# Private key: b8d99fd45da58038d630d9855d3ca2466e8e0f89d3894c4724f0efc9ff4b51f001
address = "ST2ZRX0K27GW0SP3GJCEMHD95TQGJMKB7G9Y0X1MH"
amount = 100000000

[[ustx_balance]]
# Private key: 3a4e84abb8abe0c1ba37cef4b604e73c82b1fe8d99015cb36b029a65099d373601
address = "ST26FVX16539KKXZKJN098Q08HRX3XBAP541MFS0P"
amount = 100000000

[[ustx_balance]]
# Private key: 052cc5b8f25b1e44a65329244066f76c8057accd5316c889f476d0ea0329632c01
address = "ST3CECAKJ4BH08JYY7W53MC81BYDT4YDA5M7S5F53"
amount = 100000000

[[ustx_balance]]
# Private key: 9aef533e754663a453984b69d36f109be817e9940519cc84979419e2be00864801
address = "ST31HHVBKYCYQQJ5AQ25ZHA6W2A548ZADDQ6S16GP"
amount = 100000000
13 changes: 13 additions & 0 deletions testnet/stacks-node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,8 @@ pub struct NodeConfig {
pub chain_liveness_poll_time_secs: u64,
/// stacker DBs we replicate
pub stacker_dbs: Vec<QualifiedContractIdentifier>,
pub enable_puppet_mode: bool,
pub puppet_bind: String,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -2060,6 +2062,7 @@ impl Default for NodeConfig {

let rpc_port = 20443;
let p2p_port = 20444;
let puppet_port = 20445;

let mut local_peer_seed = [0u8; 32];
rng.fill_bytes(&mut local_peer_seed);
Expand Down Expand Up @@ -2098,6 +2101,8 @@ impl Default for NodeConfig {
fault_injection_hide_blocks: false,
chain_liveness_poll_time_secs: 300,
stacker_dbs: vec![],
enable_puppet_mode: false,
puppet_bind: format!("0.0.0.0:{}", puppet_port),
}
}
}
Expand Down Expand Up @@ -2554,6 +2559,8 @@ pub struct NodeConfigFile {
pub chain_liveness_poll_time_secs: Option<u64>,
/// Stacker DBs we replicate
pub stacker_dbs: Option<Vec<String>>,
pub enable_puppet_mode: Option<bool>,
pub puppet_bind: Option<String>,
}

impl NodeConfigFile {
Expand Down Expand Up @@ -2632,6 +2639,12 @@ impl NodeConfigFile {
.iter()
.filter_map(|contract_id| QualifiedContractIdentifier::parse(contract_id).ok())
.collect(),
enable_puppet_mode: self
.enable_puppet_mode
.unwrap_or(default_node_config.enable_puppet_mode),
puppet_bind: self
.puppet_bind
.unwrap_or(default_node_config.puppet_bind.clone()),
};
Ok(node_config)
}
Expand Down
14 changes: 14 additions & 0 deletions testnet/stacks-node/src/run_loop/helium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use stacks_common::types::chainstate::BurnchainHeaderHash;

use super::RunLoopCallbacks;
use crate::burnchains::Error as BurnchainControllerError;
use crate::run_loop::puppet::PuppetController;
use crate::{
BitcoinRegtestController, BurnchainController, ChainTip, Config, MocknetController, Node,
};
Expand Down Expand Up @@ -144,12 +145,25 @@ impl RunLoop {
leader_tenure = self.node.initiate_new_tenure();
}

// Puppet mode
let puppet_controller = if self.config.node.enable_puppet_mode {
let mut c = PuppetController::new(&self.config.node.puppet_bind);
c.start();
Some(c)
} else {
None
};

// Start the runloop
round_index = 1;
loop {
if expected_num_rounds == round_index {
return Ok(());
}
match &puppet_controller {
Some(c) => c.block_on(&chain_tip),
_ => (),
}

// Run the last initialized tenure
let artifacts_from_tenure = match leader_tenure {
Expand Down
1 change: 1 addition & 0 deletions testnet/stacks-node/src/run_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod boot_nakamoto;
pub mod helium;
pub mod nakamoto;
pub mod neon;
pub mod puppet;

use clarity::vm::costs::ExecutionCost;
use clarity::vm::database::BurnStateDB;
Expand Down
202 changes: 202 additions & 0 deletions testnet/stacks-node/src/run_loop/puppet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::ChainTip;
use async_std::net::TcpListener;
use async_std::stream::StreamExt;
use async_std::task::block_on;
use http_types::mime::JSON;
use http_types::{Method, Response, StatusCode};
use std::io;
use std::io::{Error, Write};
use std::ops::{Add, DerefMut};
use std::sync::{Arc, Mutex};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

struct PuppetControl {
pub current_block: u64,
pub target_block: u64,
pub next_block_time: SystemTime,
pub block_interval: Duration,
}

impl Default for PuppetControl {
fn default() -> Self {
let default_interval = Duration::from_secs(600);
Self {
current_block: 0,
target_block: 1,
next_block_time: SystemTime::now().add(default_interval),
block_interval: default_interval,
}
}
}

pub struct PuppetController {
inner: Arc<Mutex<PuppetControl>>,
bind_addr: String,
join_handle: Option<JoinHandle<Result<(), Error>>>,
}

impl PuppetController {
pub fn new(bind_addr: &str) -> Self {
Self {
inner: Arc::new(Mutex::new(PuppetControl::default())),
bind_addr: bind_addr.to_string(),
join_handle: None,
}
}

pub fn start(&mut self) {
if self.join_handle.is_some() {
warn!("Puppet mode control server is already started");
return;
}
info!("Starting puppet mode control server..");
let puppet_control = Arc::clone(&self.inner);
let puppet_bind = self.bind_addr.to_string();
self.join_handle = Some(
Builder::new()
.name("puppet".into())
.spawn(move || {
block_on(async {
let listener = TcpListener::bind(puppet_bind).await?;
info!(
"Start puppet mode control server on: {}",
listener.local_addr()?
);

// For each incoming TCP connection, spawn a task and call `accept`.
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
if stream.is_err() {
return Err(stream.unwrap_err());
}
let stream = stream?;
let puppet_control = puppet_control.clone();
async_std::task::spawn(async move {
async_h1::accept(stream.clone(), |req| async {
let mut req = req;
match (req.method(), req.url().path()) {
(Method::Get, "/") => Ok(Response::new(StatusCode::Ok)),
(Method::Post, "/puppet/v1/kick") => {
let mut puppet_control = puppet_control.lock().unwrap();
if puppet_control.target_block <= puppet_control.current_block {
puppet_control.target_block =
puppet_control.current_block + 1;
}
Ok(Response::new(StatusCode::Ok))
}
(Method::Put, "/puppet/v1/duration") => {
let body = req.body_string().await;
match body {
Ok(x) => {
let v = x.parse::<u64>().unwrap_or(0);
if v > 0 {
println!("Setting duration to {}", v);
io::stdout().flush().unwrap();
let mut puppet_control =
puppet_control.lock().unwrap();
puppet_control.block_interval =
Duration::from_secs(v);
puppet_control.next_block_time =
SystemTime::now()
.add(puppet_control.block_interval);
}
}
_ => (),
}
Ok(Response::new(StatusCode::Ok))
}
(Method::Put, "/puppet/v1/until") => {
let body = req.body_string().await;
match body {
Ok(x) => {
let v = x.parse::<u64>().unwrap_or(0);
if v > 0 {
let mut puppet_control =
puppet_control.lock().unwrap();
puppet_control.target_block = if puppet_control.current_block >= v {
puppet_control.current_block
} else {
v
};
println!("Setting target block to {}", puppet_control.target_block);
io::stdout().flush().unwrap();
}
}
_ => (),
}
Ok(Response::new(StatusCode::Ok))
}
(Method::Get, "/puppet/v1/status") => {
let mut response = Response::new(StatusCode::Ok);
let puppet_control = puppet_control.lock().unwrap();
response.set_content_type(JSON);
response.set_body(
format!(
"{{\"current_block\":{},\"target_block\":{},\"duration\":{},\"next_block_time\":{}}}",
puppet_control.current_block,
puppet_control.target_block,
puppet_control.block_interval.as_secs(),
puppet_control.next_block_time.duration_since(UNIX_EPOCH).unwrap().as_secs()));
Ok(response)
}
_ => {
let mut rs = Response::new(StatusCode::BadRequest);
rs.set_body(format!(
"[{}] {}",
req.method(),
req.url().path()
));
Ok(rs)
}
}
})
.await
.unwrap_or(())
});
}
Ok(())
})
})
.unwrap(),
)
}

fn with_lock<F, R>(&self, func: F) -> R
where
F: FnOnce(&mut PuppetControl) -> R,
{
let mut puppet_control = self.inner.lock().unwrap();
func(puppet_control.deref_mut())
}

pub fn block_on(&self, chain_tip: &ChainTip) {
if self.join_handle.is_none() {
return;
}
info!(
"Waiting on block height {}",
chain_tip.metadata.stacks_block_height
);

self.with_lock(|puppet_control| {
puppet_control.current_block = chain_tip.metadata.stacks_block_height;
});
loop {
let should_break = self.with_lock(|puppet_control| {
if puppet_control.target_block > puppet_control.current_block
|| puppet_control.next_block_time.le(&SystemTime::now())
{
puppet_control.next_block_time =
SystemTime::now().add(puppet_control.block_interval);
return true;
}
false
});
if should_break {
break;
}
sleep(Duration::from_millis(100));
}
}
}

0 comments on commit 989dd5f

Please sign in to comment.