Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize deferred credits fetch #4622

Merged
merged 8 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,25 @@ impl MassaRpcServer for API<Public> {
.collect()
};

// Compute a limit (as a slot) for deferred credits as it can be quite huge
let bound_ts = MassaTime::now()
.checked_add(MassaTime::from_millis(
60 * 24 * 60 * 60 * 1000, // 60 Days, 24 hours, 60 minutes, 60 seconds, 1000 milliseconds
damip marked this conversation as resolved.
Show resolved Hide resolved
))
.map_err(|e| ApiError::InternalServerError(e.to_string()))?;

let deferred_credit_max_slot = timeslots::get_closest_slot_to_timestamp(
self.0.api_settings.thread_count,
self.0.api_settings.t0,
self.0.api_settings.genesis_timestamp,
bound_ts,
);

// get execution info
let execution_infos = self.0.execution_controller.get_addresses_infos(&addresses);
let execution_infos = self
.0
.execution_controller
.get_addresses_infos(&addresses, Some(deferred_credit_max_slot));

// get future draws from selector
let selection_draws = {
Expand Down
6 changes: 5 additions & 1 deletion massa-execution-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ pub trait ExecutionController: Send + Sync {
) -> (bool, bool);

/// Gets information about a batch of addresses
fn get_addresses_infos(&self, addresses: &[Address]) -> Vec<ExecutionAddressInfo>;
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: Option<Slot>,
) -> Vec<ExecutionAddressInfo>;

/// Get execution statistics
fn get_stats(&self) -> ExecutionStats;
Expand Down
3 changes: 2 additions & 1 deletion massa-execution-worker/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,13 +1032,14 @@ impl ExecutionContext {
&self,
address: &Address,
thread_count: u8,
max_slot: Option<Slot>,
) -> BTreeMap<Slot, Amount> {
let min_slot = self
.slot
.get_next_slot(thread_count)
.expect("unexpected slot overflow in context.get_addresses_deferred_credits");
self.speculative_roll_state
.get_address_deferred_credits(address, min_slot)
.get_address_deferred_credits(address, min_slot, max_slot)
}

/// in case of
Expand Down
10 changes: 8 additions & 2 deletions massa-execution-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,11 @@ impl ExecutionController for ExecutionControllerImpl {
}

/// Gets information about a batch of addresses
fn get_addresses_infos(&self, addresses: &[Address]) -> Vec<ExecutionAddressInfo> {
fn get_addresses_infos(
&self,
addresses: &[Address],
deferred_credits_max_slot: Option<Slot>,
) -> Vec<ExecutionAddressInfo> {
let mut res = Vec::with_capacity(addresses.len());
let exec_state = self.execution_state.read();
for addr in addresses {
Expand All @@ -441,14 +445,16 @@ impl ExecutionController for ExecutionControllerImpl {
exec_state.get_final_and_candidate_balance(addr);
let (final_roll_count, candidate_roll_count) =
exec_state.get_final_and_candidate_rolls(addr);
let future_deferred_credits =
exec_state.get_address_future_deferred_credits(addr, deferred_credits_max_slot);
res.push(ExecutionAddressInfo {
final_datastore_keys: final_datastore_keys.unwrap_or_default(),
candidate_datastore_keys: candidate_datastore_keys.unwrap_or_default(),
final_balance: final_balance.unwrap_or_default(),
candidate_balance: candidate_balance.unwrap_or_default(),
final_roll_count,
candidate_roll_count,
future_deferred_credits: exec_state.get_address_future_deferred_credits(addr),
future_deferred_credits,
cycle_infos: exec_state.get_address_cycle_infos(addr),
});
}
Expand Down
12 changes: 10 additions & 2 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1817,8 +1817,16 @@ impl ExecutionState {
}

/// Get future deferred credits of an address
pub fn get_address_future_deferred_credits(&self, address: &Address) -> BTreeMap<Slot, Amount> {
context_guard!(self).get_address_future_deferred_credits(address, self.config.thread_count)
pub fn get_address_future_deferred_credits(
&self,
address: &Address,
max_slot: Option<Slot>,
) -> BTreeMap<Slot, Amount> {
context_guard!(self).get_address_future_deferred_credits(
address,
self.config.thread_count,
max_slot,
)
}

/// Get future deferred credits of an address
Expand Down
121 changes: 87 additions & 34 deletions massa-execution-worker/src/speculative_roll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SpeculativeRollState {
addr: &Address,
amount: &Amount,
) -> Amount {
let credits = self.get_address_deferred_credits(addr, *slot);
let credits = self.get_address_deferred_credits(addr, *slot, None);

let mut remaining_to_slash = *amount;
for (credit_slot, credit_amount) in credits.iter() {
Expand Down Expand Up @@ -298,51 +298,104 @@ impl SpeculativeRollState {
&self,
address: &Address,
min_slot: Slot,
max_slot_: Option<Slot>,
) -> BTreeMap<Slot, Amount> {
let mut res: HashMap<Slot, Amount> = HashMap::default();

// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(min_slot..)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}
if let Some(max_slot) = max_slot_ {
// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(min_slot..max_slot)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}

// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(min_slot..)
{
// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(min_slot..max_slot)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}
}
}

// get values from final state
{
let final_state = self.final_state.read();
let deferred_credits = final_state
.get_pos_state()
.get_deferred_credits_range(min_slot..max_slot, Some(*address));
println!("def creds len: {}", deferred_credits.credits.len());
let mut inserted = 0;
for (slot, addr_amount) in deferred_credits.credits {
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
res.entry(slot).or_insert(*amount);
inserted += 1;
};
}
println!("inserted: {}", inserted);
damip marked this conversation as resolved.
Show resolved Hide resolved
}
}

// get values from final state
{
let final_state = self.final_state.read();
for (slot, addr_amount) in final_state
.get_pos_state()
.get_deferred_credits_range(min_slot..)
} else {
// get added values
for (slot, addr_amount) in self
.added_changes
.deferred_credits
.credits
.range(min_slot..)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(slot).or_insert(*amount);
res.entry(*slot).or_insert(*amount);
};
}

// get values from active history, backwards
{
let hist = self.active_history.read();
for hist_item in hist.0.iter().rev() {
for (slot, addr_amount) in hist_item
.state_changes
.pos_changes
.deferred_credits
.credits
.range(min_slot..)
{
if let Some(amount) = addr_amount.get(address) {
res.entry(*slot).or_insert(*amount);
};
}
}
}

// get values from final state
{
let final_state = self.final_state.read();
let deferred_credits = final_state
.get_pos_state()
.get_deferred_credits_range(min_slot.., Some(*address));
println!("def creds len: {}", deferred_credits.credits.len());
damip marked this conversation as resolved.
Show resolved Hide resolved
let mut inserted = 0;
for (slot, addr_amount) in deferred_credits.credits {
if let Some(amount) = addr_amount.get(address) {
res.entry(slot).or_insert(*amount);
inserted += 1;
};
}
println!("inserted: {}", inserted);
damip marked this conversation as resolved.
Show resolved Hide resolved
}
Leo-Besancon marked this conversation as resolved.
Show resolved Hide resolved
}

res.into_iter().filter(|(_s, v)| !v.is_zero()).collect()
Expand Down Expand Up @@ -565,7 +618,7 @@ impl SpeculativeRollState {
.final_state
.read()
.get_pos_state()
.get_deferred_credits_range(..=slot);
.get_deferred_credits_range(..=slot, None);

// fetch active history deferred credits
credits.extend(
Expand Down
4 changes: 3 additions & 1 deletion massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,9 @@ fn datastore_manipulations() {
],
});
// Just checking that is works no asserts for now
universe.module_controller.get_addresses_infos(&[addr]);
universe
.module_controller
.get_addresses_infos(&[addr], None);
}

/// This test checks causes a history rewrite in slot sequencing and ensures that emitted events match
Expand Down
20 changes: 15 additions & 5 deletions massa-pos-exports/src/pos_final_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,11 @@ impl PoSFinalState {
}

/// Retrieves every deferred credit in a slot range
pub fn get_deferred_credits_range<R>(&self, range: R) -> DeferredCredits
pub fn get_deferred_credits_range<R>(
&self,
range: R,
filter_by: Option<Address>,
) -> DeferredCredits
where
R: RangeBounds<Slot>,
{
Expand Down Expand Up @@ -743,6 +747,12 @@ impl PoSFinalState {
.deserialize::<DeserializeError>(rest_key)
.expect(DEFERRED_CREDITS_DESER_ERROR);

if let Some(addr) = filter_by {
if addr != address {
continue;
}
}

let (_, amount) = self
.deferred_credits_deserializer
.credit_deserializer
Expand Down Expand Up @@ -1733,10 +1743,10 @@ mod tests {
"deferred credits not loaded correctly"
);
let credits_range_1 =
pos_state.get_deferred_credits_range(Slot::new(4, 0)..Slot::new(4, 1));
pos_state.get_deferred_credits_range(Slot::new(4, 0)..Slot::new(4, 1), None);
assert!(credits_range_1.is_empty());
let credits_range_2 =
pos_state.get_deferred_credits_range(Slot::new(2, 0)..Slot::new(3, 1));
pos_state.get_deferred_credits_range(Slot::new(2, 0)..Slot::new(3, 1), None);
let expected_credits_range_2 = vec![(
Slot::new(3, 0),
vec![(addr1, a_a1_s3), (addr2, a_a2_s3)]
Expand All @@ -1747,10 +1757,10 @@ mod tests {
.collect();
assert_eq!(credits_range_2.credits, expected_credits_range_2);
let credits_range_3 =
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(9, 5));
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(9, 5), None);
assert!(credits_range_3.is_empty());
let credits_range_4 =
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(255, 1));
pos_state.get_deferred_credits_range(Slot::new(7, 0)..Slot::new(255, 1), None);

let a_a1_s255 = Amount::from_str("5.01").unwrap();
let expected_credits_range_4 = vec![(
Expand Down