Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement records caching on HttpRegistryClient level #819

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 112 additions & 15 deletions scarb/src/core/registry/client/http.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use std::path::PathBuf;
use std::str::FromStr;

use anyhow::{Context, Result};
use anyhow::{ensure, Context, Result};
use async_trait::async_trait;
use fs4::tokio::AsyncFileExt;
use futures::StreamExt;
use reqwest::StatusCode;
use reqwest::header::{
HeaderMap, HeaderName, HeaderValue, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED,
};
use reqwest::{Response, StatusCode};
use scarb_ui::components::Status;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::BufWriter;
use tokio::sync::OnceCell;
use tracing::{debug, error, trace};
use tracing::{debug, error, trace, warn};

use crate::core::registry::client::{RegistryClient, RegistryResource};
use crate::core::registry::index::{IndexConfig, IndexRecords};
use crate::core::{Config, Package, PackageId, PackageName, SourceId};
use crate::flock::{FileLockGuard, Filesystem};

// TODO(mkaput): Honour ETag and Last-Modified headers.
// TODO(mkaput): Progressbar.
// TODO(mkaput): Request timeout.

Expand All @@ -29,6 +32,12 @@ pub struct HttpRegistryClient<'c> {
dl_fs: Filesystem<'c>,
}

enum HttpCacheKey {
ETag(HeaderValue),
LastModified(HeaderValue),
None,
}

impl<'c> HttpRegistryClient<'c> {
pub fn new(source_id: SourceId, config: &'c Config) -> Result<Self> {
let dl_fs = config
Expand Down Expand Up @@ -81,35 +90,50 @@ impl<'c> RegistryClient for HttpRegistryClient<'c> {
async fn get_records(
&self,
package: PackageName,
_cache_key: Option<&str>,
cache_key: Option<&str>,
) -> Result<RegistryResource<IndexRecords>> {
let cache_key = HttpCacheKey::deserialize(cache_key);

if cache_key.is_some() && !self.config.network_allowed() {
debug!("network is not allowed, while cached record exists, using cache");
return Ok(RegistryResource::InCache);
}

let index_config = self.index_config().await?;
let records_url = index_config.index.expand(package.into())?;

let response = self
.config
.online_http()?
.get(records_url)
.headers(cache_key.to_headers_for_request())
.send()
.await?
.error_for_status();
.await?;

if let Err(err) = &response {
if let Some(status) = err.status() {
if status == StatusCode::NOT_FOUND {
return Ok(RegistryResource::NotFound);
}
let response = match response.status() {
StatusCode::NOT_MODIFIED => {
ensure!(
cache_key.is_some(),
"server said not modified (HTTP 304) when no local cache exists"
);
return Ok(RegistryResource::InCache);
}
}
StatusCode::NOT_FOUND => {
return Ok(RegistryResource::NotFound);
}
_ => response.error_for_status()?,
};

let records = response?
let cache_key = HttpCacheKey::extract(&response).serialize();

let records = response
.json()
.await
.context("failed to deserialize index records")?;

Ok(RegistryResource::Download {
resource: records,
cache_key: None,
cache_key,
})
}

Expand Down Expand Up @@ -177,3 +201,76 @@ impl<'c> RegistryClient for HttpRegistryClient<'c> {
todo!("Publishing to HTTP registries is not implemented yet.")
}
}

impl HttpCacheKey {
fn extract(response: &Response) -> Self {
if let Some(val) = response.headers().get(ETAG) {
Self::ETag(val.clone())
} else if let Some(val) = response.headers().get(LAST_MODIFIED) {
Self::LastModified(val.clone())
} else {
Self::None
}
}

fn to_headers_for_request(&self) -> HeaderMap {
let mut headers = HeaderMap::new();
match self {
Self::ETag(val) => {
headers.insert(IF_NONE_MATCH, val.clone());
}
Self::LastModified(val) => {
headers.insert(IF_MODIFIED_SINCE, val.clone());
}
Self::None => {}
}
headers
}

fn serialize(&self) -> Option<String> {
let (key, val) = match self {
HttpCacheKey::ETag(val) => (ETAG, val),
HttpCacheKey::LastModified(val) => (LAST_MODIFIED, val),
HttpCacheKey::None => return None,
};

Some(format!(
"{key}: {val}",
val = String::from_utf8_lossy(val.as_bytes())
))
}

fn deserialize(cache_key: Option<&str>) -> Self {
let Some(cache_key) = cache_key else {
return Self::None;
};
let Some((key, value)) = cache_key.split_once(':') else {
warn!("invalid cache key: {cache_key}");
return Self::None;
};
let Ok(key) = HeaderName::from_str(key) else {
warn!("invalid cache key: {cache_key}");
return Self::None;
};
let Ok(value) = HeaderValue::from_str(value.trim()) else {
warn!("invalid cache key: {cache_key}");
return Self::None;
};
match key {
ETAG => Self::ETag(value),
LAST_MODIFIED => Self::LastModified(value),
_ => {
warn!("invalid cache key: {cache_key}");
Self::None
}
}
}

fn is_some(&self) -> bool {
!self.is_none()
}

fn is_none(&self) -> bool {
matches!(self, Self::None)
}
}
1 change: 0 additions & 1 deletion scarb/src/core/registry/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub enum RegistryResource<T> {
/// The requested resource was not found.
NotFound,
/// The cache is valid and the cached data should be used.
#[allow(dead_code)]
InCache,
/// The cache is out of date, new data was downloaded and should be used from now on.
Download {
Expand Down
135 changes: 135 additions & 0 deletions scarb/tests/http_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,141 @@ fn missing_config_json() {
expected.assert_eq(&registry.logs());
}

#[test]
fn caching() {
let cache_dir = TempDir::new().unwrap();

let mut registry = HttpRegistry::serve();
registry.publish(|t| {
ProjectBuilder::start()
.name("bar")
.version("1.0.0")
.lib_cairo(r#"fn f() -> felt252 { 0 }"#)
.build(t);
});

let t = TempDir::new().unwrap();
ProjectBuilder::start()
.name("foo")
.version("0.1.0")
.dep("bar", Dep.version("1").registry(&registry))
.lib_cairo(r#"fn f() -> felt252 { bar::f() }"#)
.build(&t);

// FIXME(mkaput): Why are verbose statuses not appearing here?
Scarb::quick_snapbox()
.arg("fetch")
.env("SCARB_CACHE", cache_dir.path())
.current_dir(&t)
.timeout(Duration::from_secs(10))
.assert()
.success()
.stdout_matches(indoc! {r#"
[..] Downloading bar v1.0.0 ([..])
"#});

// FIXME(mkaput): Why are verbose statuses not appearing here?
Scarb::quick_snapbox()
.arg("fetch")
.env("SCARB_CACHE", cache_dir.path())
.current_dir(&t)
.timeout(Duration::from_secs(10))
.assert()
.success()
.stdout_matches(indoc! {r#"
[..] Downloading bar v1.0.0 ([..])
"#});

let expected = expect![["
GET /config.json
accept: */*
accept-encoding: gzip, br, deflate
host: ...
user-agent: ...

200 OK
accept-ranges: bytes
content-length: ...
content-type: application/json
etag: ...
last-modified: ...

###

GET /index/3/b/bar.json
accept: */*
accept-encoding: gzip, br, deflate
host: ...
user-agent: ...

200 OK
accept-ranges: bytes
content-length: ...
content-type: application/json
etag: ...
last-modified: ...

###

GET /bar-1.0.0.tar.zst
accept: */*
accept-encoding: gzip, br, deflate
host: ...
user-agent: ...

200 OK
accept-ranges: bytes
content-length: ...
content-type: application/octet-stream
etag: ...
last-modified: ...

###

GET /config.json
accept: */*
accept-encoding: gzip, br, deflate
host: ...
user-agent: ...

200 OK
accept-ranges: bytes
content-length: ...
content-type: application/json
etag: ...
last-modified: ...

###

GET /index/3/b/bar.json
accept: */*
accept-encoding: gzip, br, deflate
host: ...
if-none-match: ...
user-agent: ...

304 Not Modified
content-length: 0
etag: ...

###

GET /bar-1.0.0.tar.zst
accept: */*
accept-encoding: gzip, br, deflate
host: ...
user-agent: ...

200 OK
accept-ranges: bytes
content-length: ...
content-type: application/octet-stream
etag: ...
last-modified: ...
"]];
expected.assert_eq(&registry.logs());
}

// TODO(mkaput): Test errors properly when package is in index, but tarball is missing.
// TODO(mkaput): Test interdependencies.
// TODO(mkaput): Test offline mode, including with some cache prepopulated.
26 changes: 21 additions & 5 deletions utils/scarb-test-support/src/simple_http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::sync::Arc;

use axum::body::Body;
use axum::extract::State;
use axum::http::header::ETAG;
use axum::http::HeaderMap;
use axum::http::header::{ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH};
use axum::http::Method;
use axum::http::Request;
use axum::http::StatusCode;
use axum::http::{HeaderMap, HeaderValue};
use axum::middleware;
use axum::middleware::Next;
use axum::response::Response;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl SimpleHttpServer {

let app = Router::new()
.fallback_service(ServeDir::new(dir))
.layer(middleware::map_response(set_etag))
.layer(middleware::from_fn(set_etag))
.layer(middleware::from_fn_with_state(
(logs.clone(), print_logs.clone()),
logger,
Expand Down Expand Up @@ -159,17 +159,33 @@ async fn logger<B>(
response
}

async fn set_etag(res: Response) -> Response<Body> {
async fn set_etag<B>(request: Request<B>, next: Next<B>) -> Response<Body> {
let if_none_match = request.headers().get(IF_NONE_MATCH).cloned();

if if_none_match.is_none() && request.headers().contains_key(IF_MODIFIED_SINCE) {
todo!("This server does not support If-Modified-Since header.")
}

let res = next.run(request).await;

let (mut parts, body) = res.into_parts();
let bytes = hyper::body::to_bytes(body).await.unwrap();

let mut digest = sha2::Sha256::new();
digest.update(&bytes);
let digest: [u8; 32] = digest.finalize_fixed().into();
let digest = HEXLOWER.encode(&digest);
let digest = HeaderValue::from_str(&digest).unwrap();

parts.headers.insert(ETAG, digest.parse().unwrap());
if let Some(if_none_match) = if_none_match {
if digest == if_none_match {
parts.status = StatusCode::NOT_MODIFIED;
parts.headers = HeaderMap::from_iter([(ETAG, digest)]);
return Response::from_parts(parts, Body::empty());
}
}

parts.headers.insert(ETAG, digest);
Response::from_parts(parts, Body::from(bytes))
}

Expand Down
Loading