Skip to content

Commit

Permalink
Merge pull request #21 from DeterminateSystems/eelcodolstra/fh-224-ma…
Browse files Browse the repository at this point in the history
…gic-nix-cache-use-post-build-hook-for-the-gha-cache-as

Use post-build hook to trigger GHA cache uploads
  • Loading branch information
edolstra authored Mar 1, 2024
2 parents 14b3ed8 + 9bf26f0 commit b2a2acd
Show file tree
Hide file tree
Showing 14 changed files with 4,009 additions and 1,203 deletions.
4,671 changes: 3,677 additions & 994 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ lto = true
panic = "abort"
incremental = false
codegen-units = 1

[patch.crates-io]
# See attic's Cargo.toml for why we need a clickhouse fork.
clickhouse = { git = "https://github.com/cole-h/clickhouse.rs", branch = "rustls" }
38 changes: 19 additions & 19 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

flake-compat.url = "https://flakehub.com/f/edolstra/flake-compat/1.0.1.tar.gz";

nix.url = "https://flakehub.com/f/NixOS/nix/2.19.tar.gz";
nix.url = "https://flakehub.com/f/NixOS/nix/2.20.tar.gz";
};

outputs = { self, nixpkgs, nix, ... }@inputs:
Expand Down Expand Up @@ -46,6 +46,7 @@
default = magic-nix-cache;
});

/*
devShells = forEachSupportedSystem ({ pkgs, cranePkgs, lib }: {
default = pkgs.mkShell {
inputsFrom = [ cranePkgs.magic-nix-cache ];
Expand Down Expand Up @@ -156,5 +157,6 @@
];
};
});
*/
};
}
2 changes: 1 addition & 1 deletion gha-cache/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ We should contribute support for the latter to [Octocrab](https://github.com/XAM
Since GHAC uses private APIs that use special tokens for authentication, we need to get them from a workflow run.

The easiest way is with the `keygen` workflow in this repo.
Generate an `age` encryption key with `age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`.
Generate an `age` encryption key with `nix shell nixpkgs#age --command age-keygen -o key.txt`, and add the Public Key as a repository secret named `AGE_PUBLIC_KEY`.
Then, trigger the `keygen` workflow which will print out a command that will let you decrypt the credentials.
6 changes: 3 additions & 3 deletions gha-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ impl Api {
Err(Error::TooManyCollisions)
}

/// Uploads a file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<()>
/// Uploads a file. Returns the size of the file.
pub async fn upload_file<S>(&self, allocation: FileAllocation, mut stream: S) -> Result<usize>
where
S: AsyncRead + Unpin + Send,
{
Expand Down Expand Up @@ -396,7 +396,7 @@ impl Api {

self.commit_cache(allocation.0, offset).await?;

Ok(())
Ok(offset)
}

/// Downloads a file based on a list of key prefixes.
Expand Down
6 changes: 5 additions & 1 deletion magic-nix-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde = { version = "1.0.162", features = ["derive"] }
serde_json = { version = "1.0.96", default-features = false }
thiserror = "1.0.40"
tokio-stream = { version = "0.1.14", default-features = false }
tokio-util = { version = "0.7.8", features = ["io"] }
tokio-util = { version = "0.7.8", features = ["io", "compat"] }
daemonize = "0.5.0"
is_ci = "1.1.1"
sha2 = { version = "0.10.6", default-features = false }
Expand All @@ -27,10 +27,14 @@ attic = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", branch = "
#attic = { path = "../../attic-priv/attic" }
attic-client = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", branch = "main" }
#attic-client = { path = "../../attic-priv/client" }
attic-server = { git = "ssh://git@github.com/DeterminateSystems/attic-priv", branch = "main" }
#attic-server = { path = "../../attic-priv/server" }
indicatif = "0.17"
anyhow = "1.0.71"
tempfile = "3.9"
uuid = { version = "1.4.0", features = ["serde", "v7", "rand", "std"] }
futures = "0.3"
async-compression = "0.4"

[dependencies.tokio]
version = "1.28.0"
Expand Down
69 changes: 20 additions & 49 deletions magic-nix-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@
//!
//! This API is intended to be used by nix-installer-action.
use std::net::SocketAddr;

use axum::{extract::Extension, http::uri::Uri, routing::post, Json, Router};
use axum::{extract::Extension, routing::post, Json, Router};
use axum_macros::debug_handler;
use serde::{Deserialize, Serialize};

use super::State;
use crate::error::{Error, Result};
use crate::util::{get_store_paths, upload_paths};

#[derive(Debug, Clone, Serialize)]
struct WorkflowStartResponse {
num_original_paths: usize,
}
struct WorkflowStartResponse {}

#[derive(Debug, Clone, Serialize)]
struct WorkflowFinishResponse {
num_original_paths: usize,
num_final_paths: usize,
num_new_paths: usize,
//num_new_paths: usize,
}

pub fn get_router() -> Router {
Expand All @@ -33,33 +26,23 @@ pub fn get_router() -> Router {

/// Record existing paths.
#[debug_handler]
async fn workflow_start(Extension(state): Extension<State>) -> Result<Json<WorkflowStartResponse>> {
async fn workflow_start(
Extension(_state): Extension<State>,
) -> Result<Json<WorkflowStartResponse>> {
tracing::info!("Workflow started");

let mut original_paths = state.original_paths.lock().await;
*original_paths = get_store_paths(&state.store).await?;

Ok(Json(WorkflowStartResponse {
num_original_paths: original_paths.len(),
}))
Ok(Json(WorkflowStartResponse {}))
}

/// Push new paths and shut down.
async fn workflow_finish(
Extension(state): Extension<State>,
) -> Result<Json<WorkflowFinishResponse>> {
tracing::info!("Workflow finished");
let original_paths = state.original_paths.lock().await;
let final_paths = get_store_paths(&state.store).await?;
let new_paths = final_paths
.difference(&original_paths)
.cloned()
.collect::<Vec<_>>();

if state.api.is_some() {
tracing::info!("Pushing {} new paths to GHA cache", new_paths.len());
let store_uri = make_store_uri(&state.self_endpoint);
upload_paths(new_paths.clone(), &store_uri).await?;

if let Some(gha_cache) = &state.gha_cache {
tracing::info!("Waiting for GitHub action cache uploads to finish");
gha_cache.shutdown().await?;
}

if let Some(sender) = state.shutdown_sender.lock().await.take() {
Expand All @@ -69,36 +52,18 @@ async fn workflow_finish(

// Wait for the Attic push workers to finish.
if let Some(attic_state) = state.flakehub_state.write().await.take() {
tracing::info!("Waiting for FlakeHub cache uploads to finish");
attic_state.push_session.wait().await?;
}
}

let reply = WorkflowFinishResponse {
num_original_paths: original_paths.len(),
num_final_paths: final_paths.len(),
num_new_paths: new_paths.len(),
};
let reply = WorkflowFinishResponse {};

state
.metrics
.num_original_paths
.set(reply.num_original_paths);
state.metrics.num_final_paths.set(reply.num_final_paths);
state.metrics.num_new_paths.set(reply.num_new_paths);
//state.metrics.num_new_paths.set(num_new_paths);

Ok(Json(reply))
}

fn make_store_uri(self_endpoint: &SocketAddr) -> String {
Uri::builder()
.scheme("http")
.authority(self_endpoint.to_string())
.path_and_query("/?compression=zstd&parallel-compression=true")
.build()
.expect("Cannot construct URL to self")
.to_string()
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuePathsRequest {
pub store_paths: Vec<String>,
Expand All @@ -120,6 +85,12 @@ async fn enqueue_paths(
.map(|path| state.store.follow_store_path(path).map_err(Error::Attic))
.collect::<Result<Vec<_>>>()?;

if let Some(gha_cache) = &state.gha_cache {
gha_cache
.enqueue_paths(state.store.clone(), store_paths.clone())
.await?;
}

if let Some(flakehub_state) = &*state.flakehub_state.read().await {
crate::flakehub::enqueue_paths(flakehub_state, store_paths).await?;
}
Expand Down
30 changes: 18 additions & 12 deletions magic-nix-cache/src/binary_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn get_narinfo(
let key = format!("{}.narinfo", store_path_hash);

if state
.narinfo_nagative_cache
.narinfo_negative_cache
.read()
.await
.contains(&store_path_hash)
Expand All @@ -61,20 +61,21 @@ async fn get_narinfo(
return pull_through(&state, &path);
}

if let Some(api) = &state.api {
if let Some(url) = api.get_file_url(&[&key]).await? {
if let Some(gha_cache) = &state.gha_cache {
if let Some(url) = gha_cache.api.get_file_url(&[&key]).await? {
state.metrics.narinfos_served.incr();
return Ok(Redirect::temporary(&url));
}
}

let mut negative_cache = state.narinfo_nagative_cache.write().await;
let mut negative_cache = state.narinfo_negative_cache.write().await;
negative_cache.insert(store_path_hash);

state.metrics.narinfos_sent_upstream.incr();
state.metrics.narinfos_negative_cache_misses.incr();
pull_through(&state, &path)
}

async fn put_narinfo(
Extension(state): Extension<State>,
Path(path): Path<String>,
Expand All @@ -90,19 +91,19 @@ async fn put_narinfo(
return Err(Error::BadRequest);
}

let api = state.api.as_ref().ok_or(Error::GHADisabled)?;
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;

let store_path_hash = components[0].to_string();
let key = format!("{}.narinfo", store_path_hash);
let allocation = api.allocate_file_with_random_suffix(&key).await?;
let allocation = gha_cache.api.allocate_file_with_random_suffix(&key).await?;
let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
api.upload_file(allocation, stream).await?;
gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.narinfos_uploaded.incr();

state
.narinfo_nagative_cache
.narinfo_negative_cache
.write()
.await
.remove(&store_path_hash);
Expand All @@ -112,9 +113,10 @@ async fn put_narinfo(

async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -> Result<Redirect> {
if let Some(url) = state
.api
.gha_cache
.as_ref()
.ok_or(Error::GHADisabled)?
.api
.get_file_url(&[&path])
.await?
{
Expand All @@ -129,18 +131,22 @@ async fn get_nar(Extension(state): Extension<State>, Path(path): Path<String>) -
Err(Error::NotFound)
}
}

async fn put_nar(
Extension(state): Extension<State>,
Path(path): Path<String>,
body: BodyStream,
) -> Result<()> {
let api = state.api.as_ref().ok_or(Error::GHADisabled)?;
let gha_cache = state.gha_cache.as_ref().ok_or(Error::GHADisabled)?;

let allocation = api.allocate_file_with_random_suffix(&path).await?;
let allocation = gha_cache
.api
.allocate_file_with_random_suffix(&path)
.await?;
let stream = StreamReader::new(
body.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);
api.upload_file(allocation, stream).await?;
gha_cache.api.upload_file(allocation, stream).await?;
state.metrics.nars_uploaded.incr();

Ok(())
Expand Down
3 changes: 0 additions & 3 deletions magic-nix-cache/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ pub enum Error {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),

#[error("Failed to upload paths")]
FailedToUpload,

#[error("GHA cache is disabled")]
GHADisabled,

Expand Down
Loading

0 comments on commit b2a2acd

Please sign in to comment.