Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable step duration transitions #124

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 121 additions & 58 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::{cmp, fmt};
use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicU16, AtomicU64, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::u64;

use block::*;
use bytes::Bytes;
Expand All @@ -31,7 +32,7 @@ use engines::{Engine, Seal, EngineError, ConstructedVerifier};
use engines::block_reward;
use engines::block_reward::{BlockRewardContract, RewardKind};
use error::{Error, ErrorKind, BlockError};
use ethjson;
use ethjson::{spec::StepDuration};
use machine::{AuxiliaryData, Call, EthereumMachine};
use hash::keccak;
use super::signer::EngineSigner;
Expand Down Expand Up @@ -61,12 +62,13 @@ pub type RandomnessPhaseError = randomness::PhaseError;

/// `AuthorityRound` params.
pub struct AuthorityRoundParams {
/// Time to wait before next block or authority switching,
/// in seconds.
/// A map defining intervals of blocks with the given times (in seconds) to wait before next
/// block or authority switching. The keys in the map are numbers of starting blocks of those
/// periods. The entry at `0` should be defined.
///
/// Deliberately typed as u16 as too high of a value leads
/// to slow block issuance.
pub step_duration: u16,
/// Wait times (durations) are deliberately typed as `u16` since larger values lead to slow
/// block issuance.
pub step_durations: BTreeMap<u64, u16>,
/// Starting step,
pub start_step: Option<u64>,
/// Valid validators.
Expand Down Expand Up @@ -101,13 +103,29 @@ const U16_MAX: usize = ::std::u16::MAX as usize;

impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
fn from(p: ethjson::spec::AuthorityRoundParams) -> Self {
let mut step_duration_usize: usize = p.step_duration.into();
if step_duration_usize > U16_MAX {
step_duration_usize = U16_MAX;
warn!(target: "engine", "step_duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX);
}
let map_step_duration = |u: ethjson::uint::Uint| {
let mut step_duration_usize: usize = u.into();
if step_duration_usize == 0 {
panic!("AuthorityRoundParams: step duration cannot be 0");
}
if step_duration_usize > U16_MAX {
step_duration_usize = U16_MAX;
warn!(target: "engine", "step duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX);
}
step_duration_usize as u16
};
let step_durations: BTreeMap<u64, u16> = match p.step_duration {
StepDuration::Single(u) => {
let mut durs: BTreeMap<u64, u16> = BTreeMap::new();
durs.insert(0, map_step_duration(u));
durs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler:

iter::once((0, map_step_duration(u))).collect()

}
StepDuration::Transitions(tr) => {
tr.into_iter().map(|(blknum, u)| (blknum.into(), map_step_duration(u))).collect()
}
};
AuthorityRoundParams {
step_duration: step_duration_usize as u16,
step_durations,
validators: new_validator_set(p.validators),
start_step: p.start_step.map(Into::into),
validate_score_transition: p.validate_score_transition.map_or(0, Into::into),
Expand All @@ -130,51 +148,80 @@ impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
}
}

// Helper for managing the step.
/// Helper for managing the step.
#[derive(Debug)]
struct Step {
calibrate: bool, // whether calibration is enabled.
inner: AtomicUsize,
duration: u16,
inner: AtomicU64,
/// Duration of the current step.
current_duration: AtomicU16,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off. (Spaces/tabs…)

/// Planned durations of steps.
durations: BTreeMap<u64, u16>,
/// The time of the start of the first step after the last change of step duration, in seconds.
starting_sec: AtomicU64,
/// The number of the first step after the last change of step duration.
starting_step: AtomicU64,
}

impl Step {
fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) as u64 }
fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) }
fn duration_remaining(&self) -> Duration {
let now = unix_now();
let expected_seconds = self.load()
.checked_add(1)
.and_then(|ctr| ctr.checked_mul(self.duration as u64))
.checked_sub(self.starting_step.load(AtomicOrdering::SeqCst) as u64)
.and_then(|x| x.checked_add(1))
.and_then(|x| x.checked_mul(self.current_duration.load(AtomicOrdering::SeqCst) as u64))
.and_then(|x| x.checked_add(self.starting_sec.load(AtomicOrdering::SeqCst) as u64))
.map(Duration::from_secs);

match expected_seconds {
Some(step_end) if step_end > now => step_end - now,
Some(_) => Duration::from_secs(0),
None => {
let ctr = self.load();
error!(target: "engine", "Step counter is too high: {}, aborting", ctr);
panic!("step counter is too high: {}", ctr)
error!(target: "engine", "Step counter under- or overflow: {}, aborting", ctr);
panic!("step counter under- or overflow: {}", ctr)
},
}

}

/// Increments the step number.
///
/// Panics if the new step number is `usize::MAX`.
fn increment(&self) {
use std::usize;
// fetch_add won't panic on overflow but will rather wrap
// around, leading to zero as the step counter, which might
// lead to unexpected situations, so it's better to shut down.
if self.inner.fetch_add(1, AtomicOrdering::SeqCst) == usize::MAX {
let prev_step = self.inner.fetch_add(1, AtomicOrdering::SeqCst);
if prev_step == u64::MAX {
error!(target: "engine", "Step counter is too high: {}, aborting", usize::MAX);
panic!("step counter is too high: {}", usize::MAX);
}

let next_step = prev_step + 1;
if let Some(&next_dur) = self.durations.get(&next_step) {
let prev_dur = *self.durations.range(0 .. next_step).last().expect("step duration map is empty").1;
let prev_starting_sec = self.starting_sec.load(AtomicOrdering::SeqCst);
let prev_starting_step = self.starting_step.load(AtomicOrdering::SeqCst);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we need to put the whole Step in a Mutex instead of making its fields atomic individually: Things probably go wrong if some of those fields are mutated while increment is running?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where these fields are modified apart from the constructor. I agree though that if we start mutating them elsewhere we need an atomic lock on them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm not sure it's even guaranteed that no two instances of increment run at the same time. 😬
And even if it is, it would be safer in a single Mutex (or RwLock?), in case future modifications change this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Atomic stuff was only supposed to help mutating fields of a Step that's passed by an immutable reference. Synchronisation is simulated by calibrate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if the Step itself were passed as an Arc<Mutex<Step>>, that interior mutability wouldn't be necessary.
(I feel like this pattern—interior mutability of fields—is overused in Parity: It creates the illusion of thread-safety.)

let steps_elapsed = prev_step - prev_starting_step;
let starting_sec = prev_starting_sec + (steps_elapsed * prev_dur as u64);
self.current_duration.store(next_dur, AtomicOrdering::SeqCst);
self.starting_sec.store(starting_sec, AtomicOrdering::SeqCst);
self.starting_step.store(next_step, AtomicOrdering::SeqCst);
self.inner.store(next_step, AtomicOrdering::SeqCst);
trace!(target: "engine", "Step duration updated to {} at step {}", next_dur, next_step);
}
}

fn calibrate(&self) {
if self.calibrate {
let new_step = unix_now().as_secs() / (self.duration as u64);
self.inner.store(new_step as usize, AtomicOrdering::SeqCst);
let starting_sec = self.starting_sec.load(AtomicOrdering::SeqCst);
let starting_step = self.starting_step.load(AtomicOrdering::SeqCst);
let step = (
(unix_now().as_secs() - starting_sec) /
(self.current_duration.load(AtomicOrdering::SeqCst) as u64)
) + starting_step + 1;
trace!(target: "engine", "calibrating step {}", step);
self.inner.store(step, AtomicOrdering::SeqCst);
}
}

Expand All @@ -195,7 +242,7 @@ impl Step {
Err(None)
// wait a bit for blocks in near future
} else if given > current {
let d = self.duration as u64;
let d = self.current_duration.load(AtomicOrdering::SeqCst) as u64;
Err(Some(OutOfBounds {
min: None,
max: Some(d * current),
Expand Down Expand Up @@ -669,20 +716,25 @@ impl<'a, A: ?Sized, B> Deref for CowLike<'a, A, B> where B: AsRef<A> {
impl AuthorityRound {
/// Create a new instance of AuthorityRound engine.
pub fn new(our_params: AuthorityRoundParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
if our_params.step_duration == 0 {
error!(target: "engine", "Authority Round step duration can't be zero, aborting");
panic!("authority_round: step duration can't be zero")
let duration = *our_params.step_durations.get(&0).unwrap_or_else(|| {
error!(target: "engine", "Authority Round step 0 duration is undefined, aborting");
panic!("authority_round: step 0 duration is undefined")
});
if our_params.step_durations.values().any(|v| *v == 0) {
panic!("authority_round: step duration cannot be 0");
}
let should_timeout = our_params.start_step.is_none();
let initial_step = our_params.start_step.unwrap_or_else(|| (unix_now().as_secs() / (our_params.step_duration as u64)));
let engine = Arc::new(
AuthorityRound {
transition_service: IoService::<()>::start()?,
step: Arc::new(PermissionedStep {
inner: Step {
inner: AtomicUsize::new(initial_step as usize),
inner: AtomicU64::new(0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we compute the actual current step number on startup?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually another point is true: there is a configurable start_step which got thrown away with the water. I have to take it into account in order not to break the spec.

calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
current_duration: AtomicU16::new(duration),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also indentation.)

durations: our_params.step_durations.clone(),
starting_sec: AtomicU64::new(unix_now().as_secs()),
starting_step: AtomicU64::new(0),
},
can_propose: AtomicBool::new(true),
}),
Expand Down Expand Up @@ -924,8 +976,10 @@ impl IoHandler<()> for TransitionHandler {
}
}

let next_run_at = AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
let next_run_at = Duration::from_millis(
AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2
);
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
Expand Down Expand Up @@ -1179,7 +1233,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
self.validators.on_epoch_begin(first, &header, &mut call)
}

/// Apply the block reward on finalisation of the block.
/// Applies the block reward on finalisation of the block.
fn on_close_block(&self, block: &mut ExecutedBlock) -> Result<(), Error> {
let mut beneficiaries = Vec::new();
if block.header().number() >= self.empty_steps_transition {
Expand Down Expand Up @@ -1234,7 +1288,12 @@ impl Engine<EthereumMachine> for AuthorityRound {
// Genesis is never a new block, but might as well check.
let header = block.header().clone();
let first = header.number() == 0;

let opt_signer = self.signer.read();
let signer = match opt_signer.as_ref() {
Some(signer) => signer,
None => return Ok(Vec::new()), // We are not a validator, so we shouldn't call the contracts.
};
let our_addr = signer.address();
let client = self.client.read().as_ref().and_then(|weak| weak.upgrade()).ok_or_else(|| {
debug!(target: "engine", "Unable to prepare block: missing client ref.");
EngineError::RequiresClient
Expand All @@ -1247,14 +1306,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
full_client.call_contract(BlockId::Latest, to, data).map_err(|e| format!("{}", e))
};

let opt_signer = self.signer.read();
let signer = match opt_signer.as_ref() {
Some(signer) => signer,
None => return Ok(Vec::new()), // We are not a validator, so we shouldn't call the contracts.
};

// Our current account nonce. The transactions must have consecutive nonces, starting with this one.
let mut tx_nonce = block.state.nonce(&signer.address())?;
let mut tx_nonce = block.state.nonce(&our_addr)?;
let mut transactions = Vec::new();

// Creates and signs a transaction with the given contract call.
Expand All @@ -1268,7 +1321,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
if let Some(contract_addr) = self.randomness_contract_address {
let mut contract = util::BoundContract::bind(&*client, BlockId::Latest, contract_addr);
// TODO: How should these errors be handled?
let phase = randomness::RandomnessPhase::load(&contract, signer.address())
let phase = randomness::RandomnessPhase::load(&contract, our_addr)
.map_err(EngineError::RandomnessLoadError)?;
let mut rng = ::rand::OsRng::new()?;
if let Some(data) = phase.advance(&contract, &mut rng, signer.as_ref())
Expand Down Expand Up @@ -1618,7 +1671,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
use hash::keccak;
use accounts::AccountProvider;
use ethereum_types::{Address, H520, H256, U256};
Expand All @@ -1641,7 +1694,7 @@ mod tests {
F: FnOnce(&mut AuthorityRoundParams),
{
let mut params = AuthorityRoundParams {
step_duration: 1,
step_durations: [(0, 1)].to_vec().into_iter().collect(),
start_step: Some(1),
validators: Box::new(TestSet::default()),
validate_score_transition: 0,
Expand Down Expand Up @@ -1941,31 +1994,41 @@ mod tests {
#[should_panic(expected="counter is too high")]
fn test_counter_increment_too_high() {
use super::Step;
use std::sync::atomic::AtomicU16;

let step = Step {
calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX),
duration: 1,
inner: AtomicU64::new(::std::u64::MAX),
current_duration: AtomicU16::new(1),
durations: [(0, 1)].to_vec().into_iter().collect(),
starting_sec: AtomicU64::new(::std::u64::MAX),
starting_step: AtomicU64::new(::std::u64::MAX),
};
step.increment();
}

#[test]
#[should_panic(expected="counter is too high")]
#[should_panic(expected="step counter under- or overflow")]
fn test_counter_duration_remaining_too_high() {
use super::Step;
use std::sync::atomic::AtomicU16;

let step = Step {
calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX),
duration: 1,
inner: AtomicU64::new(::std::u64::MAX),
current_duration: AtomicU16::new(1),
durations: [(0, 1)].to_vec().into_iter().collect(),
starting_sec: AtomicU64::new(::std::u64::MAX),
starting_step: AtomicU64::new(::std::u64::MAX),
};
step.duration_remaining();
}

#[test]
#[should_panic(expected="authority_round: step duration can't be zero")]
#[should_panic(expected="authority_round: step duration cannot be 0")]
fn test_step_duration_zero() {
aura(|params| {
params.step_duration = 0;
params.step_durations = [(0, 0)].to_vec().into_iter().collect();;
});
}

Expand Down Expand Up @@ -2357,7 +2420,7 @@ mod tests {
#[test]
fn test_empty_steps() {
let engine = aura(|p| {
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down Expand Up @@ -2391,7 +2454,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps();
let engine = aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down Expand Up @@ -2428,7 +2491,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps();
let engine = aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down
Loading