Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track time spent in datastore methods; use this to charge energy #1957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions crates/client-api-messages/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ pub struct EnergyQuanta {
pub quanta: u128,
}

#[derive(Copy, Clone)]
#[repr(transparent)]
#[must_use]
/// Time taken while executing datastore operations, for which energy should be charged.
///
/// A transparent newtype around [`Duration`], to enable a `must_use`
/// annotation so that we don't forget to charge energy.
pub struct DatastoreComputeDuration(pub Duration);

impl DatastoreComputeDuration {
pub fn from_micros(micros: u64) -> Self {
Self(Duration::from_micros(micros))
}
}

impl EnergyQuanta {
pub const ZERO: Self = EnergyQuanta { quanta: 0 };

Expand Down Expand Up @@ -43,6 +58,15 @@ impl EnergyQuanta {
let byte_seconds = Self::from_disk_usage(bytes_stored, storage_period).get();
Self::new(byte_seconds * Self::ENERGY_PER_MEM_BYTE_SEC)
}

// TODO(energy): This should probably be dynamically specified by the server owner at startup,
// as the price/value per time to operate a machine varies wildly depending on the specific hardware.
const ENERGY_PER_DATASTORE_MICROSECOND: u128 = 100;

pub fn from_datastore_compute_duration(compute_time: DatastoreComputeDuration) -> Self {
let micros = compute_time.0.as_micros();
Self::new(micros * Self::ENERGY_PER_DATASTORE_MICROSECOND)
}
}

impl fmt::Display for EnergyQuanta {
Expand Down
52 changes: 37 additions & 15 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::execution_context::Workload;
use crate::{
db::{
datastore::{
Expand All @@ -25,6 +24,7 @@ use crate::{
error::{DBError, TableError},
execution_context::ExecutionContext,
};
use crate::{energy::DatastoreComputeDuration, execution_context::Workload};
use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -314,11 +314,12 @@ impl Tx for Locking {
lock_wait_time,
timer,
ctx,
datastore_compute_time_microseconds: Default::default(),
}
}

fn release_tx(&self, tx: Self::Tx) {
tx.release();
fn release_tx(&self, tx: Self::Tx) -> DatastoreComputeDuration {
tx.release()
}
}

Expand Down Expand Up @@ -603,6 +604,7 @@ impl MutTxDatastore for Locking {
}

/// This utility is responsible for recording all transaction metrics.
// TODO(metrics): Add a metric for datastore compute time?
pub(super) fn record_metrics(
ctx: &ExecutionContext,
tx_timer: Instant,
Expand Down Expand Up @@ -693,14 +695,15 @@ impl MutTx for Locking {
lock_wait_time,
timer,
ctx,
datastore_compute_time_microseconds: Default::default(),
}
}

fn rollback_mut_tx(&self, tx: Self::MutTx) {
tx.rollback();
fn rollback_mut_tx(&self, tx: Self::MutTx) -> DatastoreComputeDuration {
tx.rollback()
}

fn commit_mut_tx(&self, tx: Self::MutTx) -> Result<Option<TxData>> {
fn commit_mut_tx(&self, tx: Self::MutTx) -> Result<Option<(TxData, DatastoreComputeDuration)>> {
Ok(Some(tx.commit()))
}
}
Expand Down Expand Up @@ -1516,7 +1519,8 @@ mod tests {
);
}

datastore.rollback_mut_tx(tx);
// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);
Ok(())
}

Expand Down Expand Up @@ -1558,7 +1562,10 @@ mod tests {
#[test]
fn test_create_table_post_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.rollback_mut_tx(tx);

// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);

let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
assert!(
!datastore.table_id_exists_mut_tx(&tx, &table_id),
Expand Down Expand Up @@ -1673,7 +1680,8 @@ mod tests {
#[test]
fn test_schema_for_table_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.rollback_mut_tx(tx);
// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);
let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let schema = datastore.schema_for_table_mut_tx(&tx, table_id);
assert!(schema.is_err());
Expand Down Expand Up @@ -1716,10 +1724,16 @@ mod tests {
fn test_insert_post_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
let row = u32_str_u32(15, "Foo", 18); // 15 is ignored.
datastore.commit_mut_tx(tx)?;

// Ignore the returned datastore compute time; we don't care.
let _ = datastore.commit_mut_tx(tx)?;

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.rollback_mut_tx(tx);

// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);

let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![]);
Expand Down Expand Up @@ -1824,7 +1838,10 @@ mod tests {
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
datastore.rollback_mut_tx(tx);

// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
#[rustfmt::skip]
Expand Down Expand Up @@ -1958,7 +1975,9 @@ mod tests {
};
datastore.create_index_mut_tx(&mut tx, index_def, true)?;

datastore.rollback_mut_tx(tx);
// Ignore the returned datastore compute time; we don't care.
let _ = datastore.rollback_mut_tx(tx);

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let query = query_st_tables(&tx);

Expand Down Expand Up @@ -2056,8 +2075,11 @@ mod tests {
let rows = &[row1, row2];
assert_eq!(&all_rows_tx(&read_tx_2, table_id), rows);
assert_eq!(&all_rows_tx(&read_tx_1, table_id), rows);
read_tx_2.release();
read_tx_1.release();

// Ignore the returned datastore compute time; we don't care.
let _ = read_tx_2.release();
let _ = read_tx_1.release();

Ok(())
}

Expand Down
Loading
Loading