Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize deferred credits fetch #4622

Merged
merged 8 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,10 +845,10 @@ impl MassaRpcServer for API<Public> {
);

// get execution info
let execution_infos = self
.0
.execution_controller
.get_addresses_infos(&addresses, Some(deferred_credit_max_slot));
let execution_infos = self.0.execution_controller.get_addresses_infos(
&addresses,
std::ops::Bound::Excluded(deferred_credit_max_slot),
);

// get future draws from selector
let selection_draws = {
Expand Down
4 changes: 2 additions & 2 deletions massa-api/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{collections::HashMap, net::SocketAddr};
use massa_api_exports::config::APIConfig;
use massa_consensus_exports::{ConsensusBroadcasts, MockConsensusController};
use massa_execution_exports::{GasCosts, MockExecutionController};
use massa_models::config::{CHAINID, DEFERRED_CREDITS_DELTA};
use massa_models::config::CHAINID;
use massa_models::{
config::{
BASE_OPERATION_GAS_COST, ENDORSEMENT_COUNT, GENESIS_TIMESTAMP, MAX_DATASTORE_VALUE_LENGTH,
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) fn get_apiv2_server(addr: &SocketAddr) -> (API<ApiV2>, APIConfig) {
periods_per_cycle: PERIODS_PER_CYCLE,
last_start_period: 0,
chain_id: *CHAINID,
deferred_credits_delta: DEFERRED_CREDITS_DELTA,
deferred_credits_delta: MassaTime::from_millis(24 * 3600 * 2),
};

// let shared_storage: massa_storage::Storage = massa_storage::Storage::create_root();
Expand Down
2 changes: 1 addition & 1 deletion massa-execution-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub trait ExecutionController: Send + Sync {
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: Option<Slot>,
deferred_credits_max_slot: std::ops::Bound<Slot>,
) -> Vec<ExecutionAddressInfo>;

/// Get execution statistics
Expand Down
5 changes: 3 additions & 2 deletions massa-execution-worker/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,18 +1028,19 @@ impl ExecutionContext {
}

/// Get future deferred credits of an address
/// With optionally a limit slot (excluded)
pub fn get_address_future_deferred_credits(
&self,
address: &Address,
thread_count: u8,
max_slot: Option<Slot>,
max_slot: std::ops::Bound<Slot>,
) -> BTreeMap<Slot, Amount> {
let min_slot = self
.slot
.get_next_slot(thread_count)
.expect("unexpected slot overflow in context.get_addresses_deferred_credits");
self.speculative_roll_state
.get_address_deferred_credits(address, min_slot, max_slot)
.get_address_deferred_credits(address, (std::ops::Bound::Included(min_slot), max_slot))
}

/// in case of
Expand Down
2 changes: 1 addition & 1 deletion massa-execution-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl ExecutionController for ExecutionControllerImpl {
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: Option<Slot>,
deferred_credits_max_slot: std::ops::Bound<Slot>,
) -> Vec<ExecutionAddressInfo> {
let mut res = Vec::with_capacity(addresses.len());
let exec_state = self.execution_state.read();
Expand Down
17 changes: 13 additions & 4 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ impl ExecutionState {
pub fn get_address_future_deferred_credits(
&self,
address: &Address,
max_slot: Option<Slot>,
max_slot: std::ops::Bound<Slot>,
) -> BTreeMap<Slot, Amount> {
context_guard!(self).get_address_future_deferred_credits(
address,
Expand All @@ -1840,7 +1840,14 @@ impl ExecutionState {
.final_state
.read()
.get_pos_state()
.get_address_deferred_credits(address);
.get_deferred_credits_range(.., Some(address))
.credits;
let res_final: BTreeMap<Slot, Amount> = res_final
.into_iter()
.filter_map(|(slot, addr_amount)| {
addr_amount.get(address).map(|amount| (slot, *amount))
})
.collect();

// get values from active history, backwards
let mut res_speculative: BTreeMap<Slot, Amount> = BTreeMap::default();
Expand All @@ -1852,10 +1859,12 @@ impl ExecutionState {
};
}
}

// fill missing speculative entries with final entries
for (s, v) in res_final.iter() {
res_speculative.entry(*s).or_insert(*v);
for (slot, amount) in &res_final {
res_speculative.entry(*slot).or_insert(*amount);
}

// remove zero entries from speculative
res_speculative.retain(|_s, a| !a.is_zero());

Expand Down
121 changes: 40 additions & 81 deletions massa-execution-worker/src/speculative_roll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SpeculativeRollState {
addr: &Address,
amount: &Amount,
) -> Amount {
let credits = self.get_address_deferred_credits(addr, *slot, None);
let credits = self.get_address_deferred_credits(addr, slot..);

let mut remaining_to_slash = *amount;
for (credit_slot, credit_amount) in credits.iter() {
Expand Down Expand Up @@ -294,100 +294,59 @@ impl SpeculativeRollState {
}

/// Get deferred credits of an address starting from a given slot
pub fn get_address_deferred_credits(
/// Get deferred credits of an address starting from a given slot
damip marked this conversation as resolved.
Show resolved Hide resolved
pub fn get_address_deferred_credits<R>(
&self,
address: &Address,
min_slot: Slot,
max_slot_: Option<Slot>,
) -> BTreeMap<Slot, Amount> {
slot_range: R,
) -> BTreeMap<Slot, Amount>
where
R: std::ops::RangeBounds<Slot> + Clone,
{
let mut res: HashMap<Slot, Amount> = HashMap::default();

if let Some(max_slot) = max_slot_ {
// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(min_slot..max_slot)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}

// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(min_slot..max_slot)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}
}
}
// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(slot_range.clone())
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}

// get values from final state
{
let final_state = self.final_state.read();
let deferred_credits = final_state
.get_pos_state()
.get_deferred_credits_range(min_slot..max_slot, Some(*address));
for (slot, addr_amount) in deferred_credits.credits {
// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(slot_range.clone())
{
if let Some(amount) = addr_amount.get(address) {
res.entry(slot).or_insert(*amount);
res.entry(*slot).or_insert(*amount);
};
}
}
} else {
// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
}

// get values from final state
{
let final_state = self.final_state.read();
for (slot, addr_amount) in final_state
.get_pos_state()
.get_deferred_credits_range(slot_range, Some(address))
.credits
.range(min_slot..)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
res.entry(slot).or_insert(*amount);
};
}

// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(min_slot..)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}
}
}

// get values from final state
{
let final_state = self.final_state.read();
let deferred_credits = final_state
.get_pos_state()
.get_deferred_credits_range(min_slot.., Some(*address));
for (slot, addr_amount) in deferred_credits.credits {
if let Some(amount) = addr_amount.get(address) {
res.entry(slot).or_insert(*amount);
};
}
}
}

res.into_iter().filter(|(_s, v)| !v.is_zero()).collect()
Expand Down
2 changes: 1 addition & 1 deletion massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ fn datastore_manipulations() {
// Just checking that is works no asserts for now
universe
.module_controller
.get_addresses_infos(&[addr], None);
.get_addresses_infos(&[addr], std::ops::Bound::Unbounded);
}

/// This test checks causes a history rewrite in slot sequencing and ensures that emitted events match
Expand Down
52 changes: 3 additions & 49 deletions massa-pos-exports/src/pos_final_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ impl PoSFinalState {
pub fn get_deferred_credits_range<R>(
&self,
range: R,
filter_by: Option<Address>,
addr_filter: Option<&Address>,
) -> DeferredCredits
where
R: RangeBounds<Slot>,
Expand Down Expand Up @@ -747,8 +747,8 @@ impl PoSFinalState {
.deserialize::<DeserializeError>(rest_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

if let Some(addr) = filter_by {
if addr != address {
if let Some(addr_filter_value) = &addr_filter {
if &&address != addr_filter_value {
continue;
}
}
Expand All @@ -766,52 +766,6 @@ impl PoSFinalState {
deferred_credits
}

/// Gets the deferred credits for an address
pub fn get_address_deferred_credits(&self, address: &Address) -> BTreeMap<Slot, Amount> {
let db = self.db.read();

let mut deferred_credits = BTreeMap::new();

let start_key_buffer = DEFERRED_CREDITS_PREFIX.as_bytes().to_vec();

for (serialized_key, serialized_value) in db.iterator_cf(
STATE_CF,
MassaIteratorMode::From(&start_key_buffer, MassaDirection::Forward),
) {
if !serialized_key.starts_with(DEFERRED_CREDITS_PREFIX.as_bytes()) {
break;
}

// deserialize the slot
let rest_key = &serialized_key[DEFERRED_CREDITS_PREFIX.len()..];
let (rest_key, slot) = buf_to_array_ctr(rest_key, Slot::from_bytes_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

let (_, addr): (_, Address) = self
.deferred_credits_deserializer
.credit_deserializer
.address_deserializer
.deserialize::<DeserializeError>(rest_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

if &addr != address {
// TODO improve performance
continue;
}

let (_, amount) = self
.deferred_credits_deserializer
.credit_deserializer
.amount_deserializer
.deserialize::<DeserializeError>(&serialized_value)
.expect(DEFERRED_CREDITS_DESER_ERROR);

deferred_credits.insert(slot, amount);
}

deferred_credits
}

/// Gets the index of a cycle in history
pub fn get_cycle_index(&self, cycle: u64) -> Option<usize> {
let first_cycle = match self.cycle_history_cache.front() {
Expand Down