Skip to content

Commit

Permalink
feat: using sqlite as index cache backend
Browse files Browse the repository at this point in the history
  • Loading branch information
weihanglo committed May 30, 2021
1 parent b1684e2 commit 4e27712
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 86 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ clap = "2.31.2"
unicode-width = "0.1.5"
openssl = { version = '0.10.11', optional = true }
im-rc = "15.0.0"
rusqlite = { version = "0.25.3", features = ["bundled"] }

once_cell = "1.7.2"

# A noop dependency that changes in the Rust repository, it's a bit of a hack.
# See the `src/tools/rustc-workspace-hack/README.md` file in `rust-lang/rust`
Expand Down
58 changes: 58 additions & 0 deletions src/cargo/sources/registry/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::CargoResult;
use once_cell::sync::OnceCell;
use rusqlite::{params, Connection};
use std::path::Path;
use std::sync::Mutex;

pub(crate) struct Db(Connection);

const TABLE_SUMMARIES: &'static str = "\
CREATE TABLE IF NOT EXISTS summaries (
name TEXT PRIMARY KEY NOT NULL,
contents BLOB NOT NULL
)";

const INSERT_SUMMERIES: &'static str = "\
INSERT OR REPLACE INTO summaries (name, contents) VALUES (?, ?)";

impl Db {
pub fn open<P>(path: P) -> CargoResult<&'static Mutex<Self>>
where
P: AsRef<Path>,
{
static DB: OnceCell<Mutex<Db>> = OnceCell::new();
DB.get_or_try_init(|| {
let conn = Connection::open(path.as_ref())?;
conn.pragma_update(None, "locking_mode", &"EXCLUSIVE")?;
conn.pragma_update(None, "cache_size", &2048)?;
conn.execute(TABLE_SUMMARIES, [])?;
Ok(Mutex::new(Self(conn)))
})
}

pub fn get<K>(&self, key: K) -> CargoResult<Vec<u8>>
where
K: AsRef<[u8]>,
{
let key = key.as_ref();
Ok(self.0.query_row(
"SELECT contents FROM summaries WHERE name = ? LIMIT 1",
[key],
|row| row.get(0),
)?)
}

pub fn insert<K>(&self, key: K, value: &[u8]) -> CargoResult<()>
where
K: AsRef<[u8]>,
{
let key = key.as_ref();
let modified = self.0.execute(INSERT_SUMMERIES, params![key, value])?;
log::debug!(
"insert {} record for {}",
modified,
String::from_utf8_lossy(key)
);
Ok(())
}
}
140 changes: 54 additions & 86 deletions src/cargo/sources/registry/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,14 @@
use crate::core::dependency::Dependency;
use crate::core::{PackageId, SourceId, Summary};
use crate::sources::registry::{RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::sources::registry::{db::Db, RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::util::interning::InternedString;
use crate::util::{internal, CargoResult, Config, Filesystem, OptVersionReq, ToSemver};
use anyhow::bail;
use cargo_util::paths;
use log::{debug, info};
use semver::Version;
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::fs;
use std::path::Path;
use std::str;
use std::sync::Mutex;

/// Crates.io treats hyphen and underscores as interchangeable, but the index and old Cargo do not.
/// Therefore, the index must store uncanonicalized version of the name so old Cargo's can find it.
Expand Down Expand Up @@ -322,14 +318,14 @@ impl<'cfg> RegistryIndex<'cfg> {
move |maybe| match maybe.parse(config, raw_data, source_id) {
Ok(summary) => Some(summary),
Err(e) => {
info!("failed to parse `{}` registry package: {}", name, e);
log::info!("failed to parse `{}` registry package: {}", name, e);
None
}
},
)
.filter(move |is| {
if is.v > max_version {
debug!(
log::debug!(
"unsupported schema version {} ({} {})",
is.v,
is.summary.name(),
Expand Down Expand Up @@ -367,34 +363,29 @@ impl<'cfg> RegistryIndex<'cfg> {
let cache_root = root.join(".cache");
let index_version = load.current_version();

let db = index_version.and_then(|v| {
let name = |v| format!("{}-{}-{}.sqlite", CURRENT_CACHE_VERSION, INDEX_V_MAX, v);
let path = cache_root.join(&name(v));
Db::open(path)
.map_err(|e| log::debug!("failed to open registry db from {:?}: {}", name(v), e))
.ok()
});

// See module comment in `registry/mod.rs` for why this is structured
// the way it is.
let fs_name = name
let pkg_name = name
.chars()
.flat_map(|c| c.to_lowercase())
.collect::<String>();
let raw_path = match fs_name.len() {
1 => format!("1/{}", fs_name),
2 => format!("2/{}", fs_name),
3 => format!("3/{}/{}", &fs_name[..1], fs_name),
_ => format!("{}/{}/{}", &fs_name[0..2], &fs_name[2..4], fs_name),
};

// Attempt to handle misspellings by searching for a chain of related
// names to the original `raw_path` name. Only return summaries
// associated with the first hit, however. The resolver will later
// reject any candidates that have the wrong name, and with this it'll
// along the way produce helpful "did you mean?" suggestions.
for path in UncanonicalizedIter::new(&raw_path).take(1024) {
let summaries = Summaries::parse(
index_version.as_deref(),
root,
&cache_root,
path.as_ref(),
self.source_id,
load,
self.config,
)?;
for pkg_name in UncanonicalizedIter::new(&pkg_name).take(1024) {
let summaries =
Summaries::parse(root, db, &pkg_name, self.source_id, load, self.config)?;
if let Some(summaries) = summaries {
self.summaries_cache.insert(name, summaries);
return Ok(self.summaries_cache.get_mut(&name).unwrap());
Expand Down Expand Up @@ -520,46 +511,61 @@ impl Summaries {
/// * `load` - the actual index implementation which may be very slow to
/// call. We avoid this if we can.
pub fn parse(
index_version: Option<&str>,
root: &Path,
cache_root: &Path,
relative: &Path,
db: Option<&Mutex<Db>>,
pkg_name: &str,
source_id: SourceId,
load: &mut dyn RegistryData,
config: &Config,
) -> CargoResult<Option<Summaries>> {
// First up, attempt to load the cache. This could fail for all manner
// of reasons, but consider all of them non-fatal and just log their
// occurrence in case anyone is debugging anything.
let cache_path = cache_root.join(relative);
let mut cache_contents = None;
if let Some(index_version) = index_version {
match fs::read(&cache_path) {
Ok(contents) => match Summaries::parse_cache(contents, index_version) {

let db = db.and_then(|db| {
db.lock()
.map_err(|e| log::debug!("db mutex poisoned: {}", e))
.ok()
});

if let Some(db) = &db {
match db.get(pkg_name) {
Err(e) => log::debug!("cache missing for {:?} error: {}", pkg_name, e),
Ok(contents) => match Summaries::parse_cache(contents) {
Ok(s) => {
log::debug!("fast path for registry cache of {:?}", relative);
log::debug!("fast path for registry cache of {:?}", pkg_name);
if cfg!(debug_assertions) {
cache_contents = Some(s.raw_data);
} else {
return Ok(Some(s));
}
}
Err(e) => {
log::debug!("failed to parse {:?} cache: {}", relative, e);
log::debug!("failed to parse {:?} cache: {}", pkg_name, e);
}
},
Err(e) => log::debug!("cache missing for {:?} error: {}", relative, e),
}
}

// This is the fallback path where we actually talk to libgit2 to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
log::debug!("slow path for {:?}", pkg_name);
let mut ret = Summaries::default();
let mut hit_closure = false;
let mut cache_bytes = None;
let err = load.load(root, relative, &mut |contents| {

// See module comment in `registry/mod.rs` for why this is structured
// the way it is.
let relative = match pkg_name.len() {
1 => format!("1/{}", pkg_name),
2 => format!("2/{}", pkg_name),
3 => format!("3/{}/{}", &pkg_name[..1], pkg_name),
_ => format!("{}/{}/{}", &pkg_name[0..2], &pkg_name[2..4], pkg_name),
};

let err = load.load(root, relative.as_ref(), &mut |contents| {
ret.raw_data = contents.to_vec();
let mut cache = SummariesCache::default();
hit_closure = true;
Expand Down Expand Up @@ -588,8 +594,8 @@ impl Summaries {
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
}
if let Some(index_version) = index_version {
cache_bytes = Some(cache.serialize(index_version));
if db.is_some() {
cache_bytes = Some(cache.serialize());
}
Ok(())
});
Expand Down Expand Up @@ -624,13 +630,9 @@ impl Summaries {
//
// This is opportunistic so we ignore failure here but are sure to log
// something in case of error.
if let Some(cache_bytes) = cache_bytes {
if paths::create_dir_all(cache_path.parent().unwrap()).is_ok() {
let path = Filesystem::new(cache_path.clone());
config.assert_package_cache_locked(&path);
if let Err(e) = fs::write(cache_path, cache_bytes) {
log::info!("failed to write cache: {}", e);
}
if let (Some(cache_bytes), Some(db)) = (cache_bytes, db) {
if let Err(e) = db.insert(pkg_name, cache_bytes.as_ref()) {
log::info!("failed to write cache for {:?}: {}", pkg_name, e);
}
}

Expand All @@ -639,8 +641,8 @@ impl Summaries {

/// Parses an open `File` which represents information previously cached by
/// Cargo.
pub fn parse_cache(contents: Vec<u8>, last_index_update: &str) -> CargoResult<Summaries> {
let cache = SummariesCache::parse(&contents, last_index_update)?;
pub fn parse_cache(contents: Vec<u8>) -> CargoResult<Summaries> {
let cache = SummariesCache::parse(&contents)?;
let mut ret = Summaries::default();
for (version, summary) in cache.versions {
let (start, end) = subslice_bounds(&contents, summary);
Expand Down Expand Up @@ -704,40 +706,10 @@ impl Summaries {
const CURRENT_CACHE_VERSION: u8 = 3;

impl<'a> SummariesCache<'a> {
fn parse(data: &'a [u8], last_index_update: &str) -> CargoResult<SummariesCache<'a>> {
fn parse(data: &'a [u8]) -> CargoResult<SummariesCache<'a>> {
// NB: keep this method in sync with `serialize` below
let (first_byte, rest) = data
.split_first()
.ok_or_else(|| anyhow::format_err!("malformed cache"))?;
if *first_byte != CURRENT_CACHE_VERSION {
bail!("looks like a different Cargo's cache, bailing out");
}
let index_v_bytes = rest
.get(..4)
.ok_or_else(|| anyhow::anyhow!("cache expected 4 bytes for index version"))?;
let index_v = u32::from_le_bytes(index_v_bytes.try_into().unwrap());
if index_v != INDEX_V_MAX {
bail!(
"index format version {} doesn't match the version I know ({})",
index_v,
INDEX_V_MAX
);
}
let rest = &rest[4..];

let mut iter = split(rest, 0);
if let Some(update) = iter.next() {
if update != last_index_update.as_bytes() {
bail!(
"cache out of date: current index ({}) != cache ({})",
last_index_update,
str::from_utf8(update)?,
)
}
} else {
bail!("malformed file");
}
let mut ret = SummariesCache::default();
let mut iter = split(data, 0);
while let Some(version) = iter.next() {
let version = str::from_utf8(version)?;
let version = Version::parse(version)?;
Expand All @@ -747,18 +719,14 @@ impl<'a> SummariesCache<'a> {
Ok(ret)
}

fn serialize(&self, index_version: &str) -> Vec<u8> {
fn serialize(&self) -> Vec<u8> {
// NB: keep this method in sync with `parse` above
let size = self
.versions
.iter()
.map(|(_version, data)| (10 + data.len()))
.map(|(_version, data)| 10 + data.len())
.sum();
let mut contents = Vec::with_capacity(size);
contents.push(CURRENT_CACHE_VERSION);
contents.extend(&u32::to_le_bytes(INDEX_V_MAX));
contents.extend_from_slice(index_version.as_bytes());
contents.push(0);
for (version, data) in self.versions.iter() {
contents.extend_from_slice(version.to_string().as_bytes());
contents.push(0);
Expand Down
1 change: 1 addition & 0 deletions src/cargo/sources/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ pub enum MaybeLock {
Download { url: String, descriptor: String },
}

mod db;
mod index;
mod local;
mod remote;
Expand Down

0 comments on commit 4e27712

Please sign in to comment.