Skip to content

Commit

Permalink
Implement caching of records returned from registry clients
Browse files Browse the repository at this point in the history
NOTE: Nothing is cached actually yet, because no registry client uses hooks provided by this code. This will come in subsequent PRs.

commit-id:97fc578f
  • Loading branch information
mkaput committed Oct 27, 2023
1 parent 661603a commit d293563
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 17 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ petgraph = "0.6"
predicates = "3"
proc-macro2 = "1"
quote = "1"
redb = "1.3"
reqwest = { version = "0.11.22", features = ["gzip", "brotli", "deflate", "json", "stream"] }
semver = { version = "1", features = ["serde"] }
serde = { version = "1", features = ["serde_derive"] }
Expand Down
3 changes: 2 additions & 1 deletion scarb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ itertools.workspace = true
once_cell.workspace = true
pathdiff.workspace = true
petgraph.workspace = true
redb.workspace = true
reqwest.workspace = true
scarb-build-metadata = { path = "../utils/scarb-build-metadata" }
scarb-metadata = { path = "../scarb-metadata", default-features = false, features = ["builder"] }
Expand All @@ -55,8 +56,8 @@ serde-untagged.workspace = true
serde-value.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
serde_repr.workspace = true
sha2.workspace = true
smallvec.workspace = true
smol_str.workspace = true
tar.workspace = true
Expand Down
281 changes: 266 additions & 15 deletions scarb/src/core/registry/client/cache.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,86 @@
use std::path::PathBuf;

use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use camino::Utf8Path;
use redb::{MultimapTableDefinition, ReadableMultimapTable, ReadableTable, TableDefinition};
use semver::Version;
use tokio::sync::OnceCell;
use tokio::task::block_in_place;
use tracing::trace;

use scarb_ui::Ui;

use crate::core::registry::client::{BeforeNetworkCallback, RegistryClient, RegistryResource};
use crate::core::registry::index::IndexRecords;
use crate::core::{Config, ManifestDependency, PackageId};
use crate::core::registry::index::{IndexRecord, IndexRecords};
use crate::core::{Config, ManifestDependency, PackageId, SourceId};
use crate::internal::fsx;

// TODO(mkaput): Implement cache downloading.
// FIXME(mkaput): Avoid creating database if inner client does not trigger cache writes.
// FIXME(mkaput): We probably have to call db.compact() after all write txs we run in Scarb run.

/// Multimap: `package name -> (version, index records)`.
const RECORDS: MultimapTableDefinition<'_, &str, (&str, &[u8])> =
MultimapTableDefinition::new("records");

/// Map: `package name -> index records cache key`.
///
/// Cache key as returned by wrapped [`RegistryClient`].
const RECORDS_CACHE_KEYS: TableDefinition<'_, &str, &str> =
TableDefinition::new("records_cache_keys");

/// A caching layer on top of a [`RegistryClient`].
///
/// ## Database
///
/// It uses [`redb`] as a local key-value database, where this object stores the following:
/// 1. Multimap table `records`: mapping from _package name_ to all index records that came from
/// the registry to date.
///
/// On the disk, each record are stored as a pair of _package version_ and the record itself
/// serialized as minified JSON. This allows the cache to filter out records that do not match
/// requested dependency specification before deserializing the record itself, saving some
/// execution time (exact numbers are unknown, but Cargo suffered from the same problem, and it
/// implemented identical measures).
/// 2. Table `records_cache_keys`: which maps _package name_ to the last known _cache key_ returned
/// from the [`RegistryClient::get_records`] method call.
///
/// Database files are stored in the `$SCARB_GLOBAL_CACHE/registry/cache` directory. For each
/// `SourceId` a separate database file is maintained, named `{source_id.ident()}.v1.redb`.
/// In case a new database format is used, it should be saved in a `*.v2.redb` file and so on.
/// Old versions should be simply deleted, without using sophisticated migration logic (remember,
/// this is just a cache!) Also, if the database file appears to be corrupted, it is simply deleted
/// and recreated from scratch.
///
/// ## Workflow
///
/// Each wrapper method of this struct performs more or less the same flow of steps:
/// 1. Get existing cache key from the database if exists.
/// 2. Call actual [`RegistryClient`] method with found cache key (or `None`).
/// 3. If the method returned [`RegistryResource::NotFound`], then everything related to queried
/// resource is removed from the cache.
/// 4. Or, if the method returned [`RegistryResource::InCache`], then cached value is deserialized
/// and returned.
/// 5. Or, if the method returned [`RegistryResource::Download`], then new resource data is saved
/// in cache (replacing existing items) along with new cache key and returned to the caller.
pub struct RegistryClientCache<'c> {
source_id: SourceId,
client: Box<dyn RegistryClient + 'c>,
_config: &'c Config,
db_cell: OnceCell<CacheDatabase>,
config: &'c Config,
}

impl<'c> RegistryClientCache<'c> {
pub fn new(client: Box<dyn RegistryClient + 'c>, config: &'c Config) -> Result<Self> {
pub fn new(
source_id: SourceId,
client: Box<dyn RegistryClient + 'c>,
config: &'c Config,
) -> Result<Self> {
Ok(Self {
source_id,
client,
_config: config,
db_cell: OnceCell::new(),
config,
})
}

Expand All @@ -30,22 +94,35 @@ impl<'c> RegistryClientCache<'c> {
dependency: &ManifestDependency,
before_network: BeforeNetworkCallback,
) -> Result<IndexRecords> {
let package_name = dependency.name.as_str();
let db = self.db().await?;

let cache_key = db.get_records_cache_key(package_name).await?;

match self
.client
.get_records(dependency.name.clone(), before_network)
.get_records(
dependency.name.clone(),
cache_key.as_deref(),
before_network,
)
.await?
{
RegistryResource::NotFound => {
trace!("package not found in registry, pruning cache");
db.prune_records(package_name).await?;
bail!("package not found in registry: {dependency}")
}
RegistryResource::InCache => {
trace!("getting records from cache");
todo!()
}
RegistryResource::Download { resource, .. } => {
trace!("got new records, invalidating cache");
Ok(resource)

RegistryResource::InCache => db.get_records(dependency).await,

RegistryResource::Download {
resource: records,
cache_key,
} => {
if let Some(cache_key) = cache_key {
db.upsert_records(package_name, cache_key, &records).await?;
}
Ok(records)
}
}
}
Expand All @@ -72,4 +149,178 @@ impl<'c> RegistryClientCache<'c> {
}
}
}

async fn db(&self) -> Result<&CacheDatabase> {
self.db_cell
.get_or_try_init(|| async {
let ui = self.config.ui();
let fs = self.config.dirs().registry_dir().into_child("cache");
let db_path = fs
.path_existent()?
.join(format!("{}.v1.redb", self.source_id.ident()));

CacheDatabase::create(&db_path, ui)
})
.await
}
}

struct CacheDatabase {
db: redb::Database,
ui: Ui,
}

impl CacheDatabase {
#[tracing::instrument(level = "trace", skip(ui))]
fn create(path: &Utf8Path, ui: Ui) -> Result<Self> {
fn create(path: &Utf8Path, ui: &Ui) -> Result<redb::Database> {
redb::Database::create(path)
.context("failed to open local registry cache, trying to recreate it")
.or_else(|error| {
ui.warn_anyhow(&error);
fsx::remove_file(path).context("failed to remove local registry cache")?;
redb::Database::create(path)
.with_context(|| db_fatal("failed to open local registry cache"))
})
}

fn init_tables(db: &redb::Database) -> Result<()> {
let tx = db.begin_write()?;
{
tx.open_multimap_table(RECORDS)?;
tx.open_table(RECORDS_CACHE_KEYS)?;
}
tx.commit()?;
Ok(())
}

trace!("opening local registry cache: {path}");
let db = block_in_place(|| -> Result<_> {
let db = create(path, &ui)?;
trace!("database opened/created successfully");
init_tables(&db).context("failed to initialize local registry cache database")?;
trace!("created all tables in local registry cache database");
Ok(db)
})?;

Ok(Self { db, ui })
}

#[tracing::instrument(level = "trace", skip_all)]
async fn get_records_cache_key(&self, package_name: &str) -> Result<Option<String>> {
trace!("looking up cache key");
block_in_place(|| -> Result<_> {
let tx = self.db.begin_read()?;
let table = tx.open_table(RECORDS_CACHE_KEYS)?;
let cache_key = table.get(package_name)?.map(|g| g.value().to_owned());
trace!(?cache_key);
Ok(cache_key)
})
.with_context(|| db_error("failed to lookup cache key in registry cache"))
.or_else(|err| -> Result<_> {
self.ui.warn_anyhow(&err);
Ok(None)
})
}

#[tracing::instrument(level = "trace", skip_all)]
async fn get_records(&self, dependency: &ManifestDependency) -> Result<IndexRecords> {
trace!("getting records from cache");
block_in_place(|| -> Result<_> {
let tx = self.db.begin_read()?;
let table = tx.open_multimap_table(RECORDS)?;

let mut records = IndexRecords::new();
for g in table.get(dependency.name.as_str())? {
let g = g?;
let (raw_version, raw_record) = g.value();

let version = Version::parse(raw_version)
.with_context(|| db_fatal("failed to parse version from cache"))?;
if !dependency.matches_name_and_version(&dependency.name, &version) {
continue;
}

let record = serde_json::from_slice::<IndexRecord>(raw_record)
.with_context(|| db_fatal("failed to deserialize index record from cache"))?;

records.push(record);
}
trace!("records read successfully");
Ok(records)
})
}

#[tracing::instrument(level = "trace", skip_all)]
async fn prune_records(&self, package_name: &str) -> Result<()> {
trace!("package not found in registry, pruning cache");
block_in_place(|| -> Result<_> {
let tx = self.db.begin_write()?;
{
let mut table = tx.open_multimap_table(RECORDS)?;
table.remove_all(package_name)?;
}
tx.commit()?;
trace!("cache pruned successfully");
Ok(())
})
.with_context(|| db_error("failed to purge cache from now non-existent entries"))
.or_else(|err| -> Result<_> {
self.ui.warn_anyhow(&err);
Ok(())
})?;
Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
async fn upsert_records(
&self,
package_name: &str,
cache_key: String,
records: &IndexRecords,
) -> Result<()> {
trace!("got new records, invalidating cache");
trace!(?cache_key);
block_in_place(|| -> Result<_> {
let tx = self.db.begin_write()?;
{
let mut table = tx.open_table(RECORDS_CACHE_KEYS)?;
table.insert(package_name, cache_key.as_str())?;
}
{
let mut table = tx.open_multimap_table(RECORDS)?;
table.remove_all(package_name)?;

for record in records {
let raw_version = record.version.to_string();
let raw_record = serde_json::to_vec(&record)?;
table.insert(package_name, (raw_version.as_str(), raw_record.as_slice()))?;
}
}
tx.commit()?;
trace!("cache updated successfully");
Ok(())
})
.with_context(|| db_error("failed to cache registry index records"))
.or_else(|err| -> Result<_> {
self.ui.warn_anyhow(&err);
Ok(())
})
}
}

fn db_error(message: &str) -> String {
format!(
"{message}\n\
note: perhaps cache is corrupted\n\
help: try restarting scarb to recreate it"
)
}

fn db_fatal(message: &str) -> String {
format!(
"{message}\n\
note: cache is corrupted and is in unrecoverable state\n\
help: run the following to wipe entire cache: scarb cache clean"
)
}
Loading

0 comments on commit d293563

Please sign in to comment.