Skip to content

Commit

Permalink
Optimize deferred credits fetch (#4622)
Browse files Browse the repository at this point in the history
* Optimize deferred credits fetch

* Fix get_addresses test compilation

* Remove test print statements

* Use config for deferred credits delta

* Move to Settings

* minor fixes

* minor fixes 2

* double comment

---------

Co-authored-by: sydhds <sydhds@gmail.com>
Co-authored-by: Damir Vodenicarevic <damipator@gmail.com>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent b16d0e1 commit d1c6258
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 70 deletions.
2 changes: 2 additions & 0 deletions massa-api-exports/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,6 @@ pub struct APIConfig {
pub last_start_period: u64,
/// chain id
pub chain_id: u64,
/// Delta to compute upper bounds when fetching deferred credits
pub deferred_credits_delta: MassaTime,
}
15 changes: 14 additions & 1 deletion massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,21 @@ impl MassaRpcServer for API<Public> {
.collect()
};

// Compute a limit (as a slot) for deferred credits as it can be quite huge
let bound_ts = MassaTime::now().saturating_add(self.0.api_settings.deferred_credits_delta);

let deferred_credit_max_slot = timeslots::get_closest_slot_to_timestamp(
self.0.api_settings.thread_count,
self.0.api_settings.t0,
self.0.api_settings.genesis_timestamp,
bound_ts,
);

// get execution info
let execution_infos = self.0.execution_controller.get_addresses_infos(&addresses);
let execution_infos = self.0.execution_controller.get_addresses_infos(
&addresses,
std::ops::Bound::Included(deferred_credit_max_slot),
);

// get future draws from selector
let selection_draws = {
Expand Down
2 changes: 2 additions & 0 deletions massa-api/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub(crate) fn get_apiv2_server(addr: &SocketAddr) -> (API<ApiV2>, APIConfig) {
periods_per_cycle: PERIODS_PER_CYCLE,
last_start_period: 0,
chain_id: *CHAINID,
deferred_credits_delta: MassaTime::from_millis(24 * 3600 * 2),
};

// let shared_storage: massa_storage::Storage = massa_storage::Storage::create_root();
Expand Down Expand Up @@ -141,6 +142,7 @@ pub(crate) fn start_public_api(addr: SocketAddr) -> (API<Public>, APIConfig) {
periods_per_cycle: PERIODS_PER_CYCLE,
last_start_period: 0,
chain_id: *CHAINID,
deferred_credits_delta: MassaTime::from_millis(24 * 3600 * 2),
};

let shared_storage: massa_storage::Storage = massa_storage::Storage::create_root();
Expand Down
2 changes: 1 addition & 1 deletion massa-api/src/tests/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ async fn get_addresses() {
let (mut api_public, config) = start_public_api(addr);

let mut exec_ctrl = MockExecutionController::new();
exec_ctrl.expect_get_addresses_infos().returning(|a| {
exec_ctrl.expect_get_addresses_infos().returning(|a, _s| {
a.iter()
.map(|_addr| ExecutionAddressInfo {
candidate_balance: Amount::from_str("100000").unwrap(),
Expand Down
6 changes: 5 additions & 1 deletion massa-execution-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ pub trait ExecutionController: Send + Sync {
) -> (bool, bool);

/// Gets information about a batch of addresses
fn get_addresses_infos(&self, addresses: &[Address]) -> Vec<ExecutionAddressInfo>;
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: std::ops::Bound<Slot>,
) -> Vec<ExecutionAddressInfo>;

/// Get execution statistics
fn get_stats(&self) -> ExecutionStats;
Expand Down
4 changes: 3 additions & 1 deletion massa-execution-worker/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,17 +1028,19 @@ impl ExecutionContext {
}

/// Get future deferred credits of an address
/// With optionally a limit slot (excluded)
pub fn get_address_future_deferred_credits(
&self,
address: &Address,
thread_count: u8,
max_slot: std::ops::Bound<Slot>,
) -> BTreeMap<Slot, Amount> {
let min_slot = self
.slot
.get_next_slot(thread_count)
.expect("unexpected slot overflow in context.get_addresses_deferred_credits");
self.speculative_roll_state
.get_address_deferred_credits(address, min_slot)
.get_address_deferred_credits(address, (std::ops::Bound::Included(min_slot), max_slot))
}

/// in case of
Expand Down
10 changes: 8 additions & 2 deletions massa-execution-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,11 @@ impl ExecutionController for ExecutionControllerImpl {
}

/// Gets information about a batch of addresses
fn get_addresses_infos(&self, addresses: &[Address]) -> Vec<ExecutionAddressInfo> {
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: std::ops::Bound<Slot>,
) -> Vec<ExecutionAddressInfo> {
let mut res = Vec::with_capacity(addresses.len());
let exec_state = self.execution_state.read();
for addr in addresses {
Expand All @@ -441,14 +445,16 @@ impl ExecutionController for ExecutionControllerImpl {
exec_state.get_final_and_candidate_balance(addr);
let (final_roll_count, candidate_roll_count) =
exec_state.get_final_and_candidate_rolls(addr);
let future_deferred_credits =
exec_state.get_address_future_deferred_credits(addr, deferred_credits_max_slot);
res.push(ExecutionAddressInfo {
final_datastore_keys: final_datastore_keys.unwrap_or_default(),
candidate_datastore_keys: candidate_datastore_keys.unwrap_or_default(),
final_balance: final_balance.unwrap_or_default(),
candidate_balance: candidate_balance.unwrap_or_default(),
final_roll_count,
candidate_roll_count,
future_deferred_credits: exec_state.get_address_future_deferred_credits(addr),
future_deferred_credits,
cycle_infos: exec_state.get_address_cycle_infos(addr),
});
}
Expand Down
27 changes: 22 additions & 5 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1817,8 +1817,16 @@ impl ExecutionState {
}

/// Get future deferred credits of an address
pub fn get_address_future_deferred_credits(&self, address: &Address) -> BTreeMap<Slot, Amount> {
context_guard!(self).get_address_future_deferred_credits(address, self.config.thread_count)
pub fn get_address_future_deferred_credits(
&self,
address: &Address,
max_slot: std::ops::Bound<Slot>,
) -> BTreeMap<Slot, Amount> {
context_guard!(self).get_address_future_deferred_credits(
address,
self.config.thread_count,
max_slot,
)
}

/// Get future deferred credits of an address
Expand All @@ -1832,7 +1840,14 @@ impl ExecutionState {
.final_state
.read()
.get_pos_state()
.get_address_deferred_credits(address);
.get_deferred_credits_range(.., Some(address))
.credits;
let res_final: BTreeMap<Slot, Amount> = res_final
.into_iter()
.filter_map(|(slot, addr_amount)| {
addr_amount.get(address).map(|amount| (slot, *amount))
})
.collect();

// get values from active history, backwards
let mut res_speculative: BTreeMap<Slot, Amount> = BTreeMap::default();
Expand All @@ -1844,10 +1859,12 @@ impl ExecutionState {
};
}
}

// fill missing speculative entries with final entries
for (s, v) in res_final.iter() {
res_speculative.entry(*s).or_insert(*v);
for (slot, amount) in &res_final {
res_speculative.entry(*slot).or_insert(*amount);
}

// remove zero entries from speculative
res_speculative.retain(|_s, a| !a.is_zero());

Expand Down
19 changes: 11 additions & 8 deletions massa-execution-worker/src/speculative_roll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SpeculativeRollState {
addr: &Address,
amount: &Amount,
) -> Amount {
let credits = self.get_address_deferred_credits(addr, *slot);
let credits = self.get_address_deferred_credits(addr, slot..);

let mut remaining_to_slash = *amount;
for (credit_slot, credit_amount) in credits.iter() {
Expand Down Expand Up @@ -294,19 +294,22 @@ impl SpeculativeRollState {
}

/// Get deferred credits of an address starting from a given slot
pub fn get_address_deferred_credits(
pub fn get_address_deferred_credits<R>(
&self,
address: &Address,
min_slot: Slot,
) -> BTreeMap<Slot, Amount> {
slot_range: R,
) -> BTreeMap<Slot, Amount>
where
R: std::ops::RangeBounds<Slot> + Clone,
{
let mut res: HashMap<Slot, Amount> = HashMap::default();

// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(min_slot..)
.range(slot_range.clone())
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
Expand All @@ -322,7 +325,7 @@ impl SpeculativeRollState {
.pos_changes
.deferred_credits
.credits
.range(min_slot..)
.range(slot_range.clone())
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
Expand All @@ -336,7 +339,7 @@ impl SpeculativeRollState {
let final_state = self.final_state.read();
for (slot, addr_amount) in final_state
.get_pos_state()
.get_deferred_credits_range(min_slot..)
.get_deferred_credits_range(slot_range, Some(address))
.credits
{
if let Some(amount) = addr_amount.get(address) {
Expand Down Expand Up @@ -565,7 +568,7 @@ impl SpeculativeRollState {
.final_state
.read()
.get_pos_state()
.get_deferred_credits_range(..=slot);
.get_deferred_credits_range(..=slot, None);

// fetch active history deferred credits
credits.extend(
Expand Down
4 changes: 3 additions & 1 deletion massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,9 @@ fn datastore_manipulations() {
],
});
// Just checking that is works no asserts for now
universe.module_controller.get_addresses_infos(&[addr]);
universe
.module_controller
.get_addresses_infos(&[addr], std::ops::Bound::Unbounded);
}

/// This test checks causes a history rewrite in slot sequencing and ensures that emitted events match
Expand Down
2 changes: 2 additions & 0 deletions massa-node/base_config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
enable_ws = false
# whether to broadcast for blocks, endorsements and operations
enable_broadcast = false
# deferred credits delta (in milliseconds)
deferred_credits_delta = 5184000000 # ~ 2 months

[grpc]
[grpc.public]
Expand Down
1 change: 1 addition & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ async fn launch(
periods_per_cycle: PERIODS_PER_CYCLE,
last_start_period: final_state.read().get_last_start_period(),
chain_id: *CHAINID,
deferred_credits_delta: SETTINGS.api.deferred_credits_delta,
};

// spawn Massa API
Expand Down
1 change: 1 addition & 0 deletions massa-node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct APISettings {
pub enable_ws: bool,
// whether to broadcast for blocks, endorsement and operations
pub enable_broadcast: bool,
pub deferred_credits_delta: MassaTime,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down
64 changes: 14 additions & 50 deletions massa-pos-exports/src/pos_final_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,11 @@ impl PoSFinalState {
}

/// Retrieves every deferred credit in a slot range
pub fn get_deferred_credits_range<R>(&self, range: R) -> DeferredCredits
pub fn get_deferred_credits_range<R>(
&self,
range: R,
addr_filter: Option<&Address>,
) -> DeferredCredits
where
R: RangeBounds<Slot>,
{
Expand Down Expand Up @@ -743,50 +747,10 @@ impl PoSFinalState {
.deserialize::<DeserializeError>(rest_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

let (_, amount) = self
.deferred_credits_deserializer
.credit_deserializer
.amount_deserializer
.deserialize::<DeserializeError>(&serialized_value)
.expect(DEFERRED_CREDITS_DESER_ERROR);

deferred_credits.insert(slot, address, amount);
}

deferred_credits
}

/// Gets the deferred credits for an address
pub fn get_address_deferred_credits(&self, address: &Address) -> BTreeMap<Slot, Amount> {
let db = self.db.read();

let mut deferred_credits = BTreeMap::new();

let start_key_buffer = DEFERRED_CREDITS_PREFIX.as_bytes().to_vec();

for (serialized_key, serialized_value) in db.iterator_cf(
STATE_CF,
MassaIteratorMode::From(&start_key_buffer, MassaDirection::Forward),
) {
if !serialized_key.starts_with(DEFERRED_CREDITS_PREFIX.as_bytes()) {
break;
}

// deserialize the slot
let rest_key = &serialized_key[DEFERRED_CREDITS_PREFIX.len()..];
let (rest_key, slot) = buf_to_array_ctr(rest_key, Slot::from_bytes_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

let (_, addr): (_, Address) = self
.deferred_credits_deserializer
.credit_deserializer
.address_deserializer
.deserialize::<DeserializeError>(rest_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

if &addr != address {
// TODO improve performance
continue;
if let Some(addr_filter_value) = &addr_filter {
if &&address != addr_filter_value {
continue;
}
}

let (_, amount) = self
Expand All @@ -796,7 +760,7 @@ impl PoSFinalState {
.deserialize::<DeserializeError>(&serialized_value)
.expect(DEFERRED_CREDITS_DESER_ERROR);

deferred_credits.insert(slot, amount);
deferred_credits.insert(slot, address, amount);
}

deferred_credits
Expand Down Expand Up @@ -1733,10 +1697,10 @@ mod tests {
"deferred credits not loaded correctly"
);
let credits_range_1 =
pos_state.get_deferred_credits_range(Slot::new(4, 0)..Slot::new(4, 1));
pos_state.get_deferred_credits_range(Slot::new(4, 0)..Slot::new(4, 1), None);
assert!(credits_range_1.is_empty());
let credits_range_2 =
pos_state.get_deferred_credits_range(Slot::new(2, 0)..Slot::new(3, 1));
pos_state.get_deferred_credits_range(Slot::new(2, 0)..Slot::new(3, 1), None);
let expected_credits_range_2 = vec![(
Slot::new(3, 0),
vec![(addr1, a_a1_s3), (addr2, a_a2_s3)]
Expand All @@ -1747,10 +1711,10 @@ mod tests {
.collect();
assert_eq!(credits_range_2.credits, expected_credits_range_2);
let credits_range_3 =
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(9, 5));
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(9, 5), None);
assert!(credits_range_3.is_empty());
let credits_range_4 =
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(255, 1));
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(255, 1), None);

let a_a1_s255 = Amount::from_str("5.01").unwrap();
let expected_credits_range_4 = vec![(
Expand Down

0 comments on commit d1c6258

Please sign in to comment.