From 621394879eaeadac9d9b39a3d328baeeff3f315f Mon Sep 17 00:00:00 2001
From: Alexander Lyon <arlyon@me.com>
Date: Fri, 3 May 2024 15:26:21 +0100
Subject: [PATCH] add support for upload speed / remaining in the cache upload
 step

---
 Cargo.lock                                    |  85 +++++++---
 crates/turborepo-api-client/Cargo.toml        |   5 +-
 crates/turborepo-api-client/src/error.rs      |   2 +
 crates/turborepo-api-client/src/lib.rs        |  13 +-
 crates/turborepo-auth/src/auth/login.rs       |   6 +-
 crates/turborepo-auth/src/auth/sso.rs         |   6 +-
 crates/turborepo-auth/src/lib.rs              |   6 +-
 crates/turborepo-cache/Cargo.toml             |   4 +
 crates/turborepo-cache/src/async_cache.rs     |  53 ++++--
 crates/turborepo-cache/src/http.rs            |  36 +++-
 crates/turborepo-cache/src/lib.rs             |   1 +
 crates/turborepo-cache/src/multiplexer.rs     |  15 +-
 crates/turborepo-cache/src/upload_progress.rs | 160 ++++++++++++++++++
 crates/turborepo-lib/Cargo.toml               |   1 +
 crates/turborepo-lib/src/config.rs            |   5 +-
 crates/turborepo-lib/src/run/cache.rs         |  15 +-
 crates/turborepo-lib/src/run/mod.rs           |  65 ++++++-
 17 files changed, 418 insertions(+), 60 deletions(-)
 create mode 100644 crates/turborepo-cache/src/upload_progress.rs

diff --git a/Cargo.lock b/Cargo.lock
index 905bfbec2558d..df1515a29cb89 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2166,24 +2166,24 @@ checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35"
 
 [[package]]
 name = "curl"
-version = "0.4.44"
+version = "0.4.46"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "509bd11746c7ac09ebd19f0b17782eae80aadee26237658a6b4808afb5c11a22"
+checksum = "1e2161dd6eba090ff1594084e95fd67aeccf04382ffea77999ea94ed42ec67b6"
 dependencies = [
  "curl-sys",
  "libc",
  "openssl-probe",
  "openssl-sys",
  "schannel",
- "socket2 0.4.9",
- "winapi",
+ "socket2 0.5.7",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
 name = "curl-sys"
-version = "0.4.60+curl-7.88.1"
+version = "0.4.72+curl-8.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "717abe2cb465a5da6ce06617388a3980c9a2844196734bec8ccb8e575250f13f"
+checksum = "29cbdc8314c447d11e8fd156dcdd031d9e02a7a976163e396b548c03153bc9ea"
 dependencies = [
  "cc",
  "libc",
@@ -2192,7 +2192,7 @@ dependencies = [
  "openssl-sys",
  "pkg-config",
  "vcpkg",
- "winapi",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -3471,6 +3471,12 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "human_format"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c3b1f728c459d27b12448862017b96ad4767b1ec2ec5e6434e99f1577f085b8"
+
 [[package]]
 name = "humantime"
 version = "2.1.0"
@@ -3494,7 +3500,7 @@ dependencies = [
  "httpdate",
  "itoa",
  "pin-project-lite",
- "socket2 0.5.4",
+ "socket2 0.5.7",
  "tokio",
  "tower-service",
  "tracing",
@@ -4220,9 +4226,9 @@ dependencies = [
 
 [[package]]
 name = "libnghttp2-sys"
-version = "0.1.7+1.45.0"
+version = "0.1.10+1.61.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57ed28aba195b38d5ff02b9170cbff627e336a20925e43b4945390401c5dc93f"
+checksum = "959c25552127d2e1fa72f0e52548ec04fc386e827ba71a7bd01db46a447dc135"
 dependencies = [
  "cc",
  "libc",
@@ -4657,9 +4663,9 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.8.8"
+version = "0.8.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
+checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
 dependencies = [
  "libc",
  "log",
@@ -5516,18 +5522,18 @@ dependencies = [
 
 [[package]]
 name = "pin-project"
-version = "1.1.0"
+version = "1.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
+checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
 dependencies = [
  "pin-project-internal",
 ]
 
 [[package]]
 name = "pin-project-internal"
-version = "1.1.0"
+version = "1.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
+checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -6402,10 +6408,12 @@ dependencies = [
  "tokio",
  "tokio-native-tls",
  "tokio-rustls",
+ "tokio-util",
  "tower-service",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
+ "wasm-streams",
  "web-sys",
  "webpki-roots 0.22.6",
  "winreg",
@@ -7231,9 +7239,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
 
 [[package]]
 name = "similar"
-version = "2.2.1"
+version = "2.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf"
+checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640"
 dependencies = [
  "bstr 0.2.17",
  "unicode-segmentation",
@@ -7309,12 +7317,12 @@ dependencies = [
 
 [[package]]
 name = "socket2"
-version = "0.5.4"
+version = "0.5.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
+checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
 dependencies = [
  "libc",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -9468,9 +9476,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.32.0"
+version = "1.37.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
+checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
 dependencies = [
  "backtrace",
  "bytes",
@@ -9480,7 +9488,7 @@ dependencies = [
  "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
- "socket2 0.5.4",
+ "socket2 0.5.7",
  "tokio-macros",
  "tracing",
  "windows-sys 0.48.0",
@@ -9498,9 +9506,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-macros"
-version = "2.1.0"
+version = "2.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
+checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -9551,9 +9559,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-stream"
-version = "0.1.14"
+version = "0.1.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
+checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
 dependencies = [
  "futures-core",
  "pin-project-lite",
@@ -10937,6 +10945,7 @@ name = "turborepo-api-client"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "bytes",
  "chrono",
  "http 0.2.11",
  "httpmock",
@@ -10950,6 +10959,8 @@ dependencies = [
  "test-case",
  "thiserror",
  "tokio",
+ "tokio-stream",
+ "tokio-util",
  "tracing",
  "turbopath",
  "turborepo-ci",
@@ -10993,6 +11004,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "base64 0.21.4",
+ "bytes",
  "camino",
  "futures",
  "hmac",
@@ -11000,6 +11012,7 @@ dependencies = [
  "os_str_bytes",
  "path-clean 1.0.1",
  "petgraph",
+ "pin-project",
  "port_scanner",
  "reqwest",
  "serde",
@@ -11010,6 +11023,8 @@ dependencies = [
  "test-case",
  "thiserror",
  "tokio",
+ "tokio-stream",
+ "tokio-util",
  "tracing",
  "turbopath",
  "turborepo-analytics",
@@ -11145,6 +11160,7 @@ dependencies = [
  "globwatch",
  "go-parse-duration",
  "hex",
+ "human_format",
  "humantime",
  "ignore",
  "itertools 0.10.5",
@@ -12054,6 +12070,19 @@ dependencies = [
  "leb128",
 ]
 
+[[package]]
+name = "wasm-streams"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
 [[package]]
 name = "wasmer"
 version = "4.2.5"
diff --git a/crates/turborepo-api-client/Cargo.toml b/crates/turborepo-api-client/Cargo.toml
index c900b52ae2411..306ecb9656ede 100644
--- a/crates/turborepo-api-client/Cargo.toml
+++ b/crates/turborepo-api-client/Cargo.toml
@@ -21,15 +21,18 @@ workspace = true
 
 [dependencies]
 anyhow = { workspace = true }
+bytes.workspace = true
 chrono = { workspace = true, features = ["serde"] }
 lazy_static = { workspace = true }
 regex = { workspace = true }
-reqwest = { workspace = true, features = ["json"] }
+reqwest = { workspace = true, features = ["json", "stream"] }
 rustc_version_runtime = "0.2.1"
 serde = { workspace = true }
 serde_json = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
+tokio-stream = "0.1.15"
+tokio-util = { version = "0.7.10", features = ["codec"] }
 tracing = { workspace = true }
 turbopath = { workspace = true }
 turborepo-ci = { workspace = true }
diff --git a/crates/turborepo-api-client/src/error.rs b/crates/turborepo-api-client/src/error.rs
index 5bd378db7b938..422d8d5762039 100644
--- a/crates/turborepo-api-client/src/error.rs
+++ b/crates/turborepo-api-client/src/error.rs
@@ -7,6 +7,8 @@ use crate::CachingStatus;
 
 #[derive(Debug, Error)]
 pub enum Error {
+    #[error("Error reading from disk: {0}")]
+    ReadError(#[from] std::io::Error),
     #[error("Error making HTTP request: {0}")]
     ReqwestError(#[from] reqwest::Error),
     #[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")]
diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs
index 1a659ec471c0d..e541fca9e7ed1 100644
--- a/crates/turborepo-api-client/src/lib.rs
+++ b/crates/turborepo-api-client/src/lib.rs
@@ -8,7 +8,7 @@ use std::{backtrace::Backtrace, env, future::Future, time::Duration};
 use lazy_static::lazy_static;
 use regex::Regex;
 pub use reqwest::Response;
-use reqwest::{Method, RequestBuilder, StatusCode};
+use reqwest::{Body, Method, RequestBuilder, StatusCode};
 use serde::Deserialize;
 use turborepo_ci::{is_ci, Vendor};
 use turborepo_vercel_api::{
@@ -26,6 +26,9 @@ mod retry;
 pub mod spaces;
 pub mod telemetry;
 
+pub use bytes::Bytes;
+pub use tokio_stream::Stream;
+
 lazy_static! {
     static ref AUTHORIZATION_REGEX: Regex =
         Regex::new(r"(?i)(?:^|,) *authorization *(?:,|$)").unwrap();
@@ -74,7 +77,7 @@ pub trait CacheClient {
     fn put_artifact(
         &self,
         hash: &str,
-        artifact_body: &[u8],
+        artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
         duration: u64,
         tag: Option<&str>,
         token: &str,
@@ -358,7 +361,7 @@ impl CacheClient for APIClient {
     async fn put_artifact(
         &self,
         hash: &str,
-        artifact_body: &[u8],
+        artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
         duration: u64,
         tag: Option<&str>,
         token: &str,
@@ -382,13 +385,15 @@ impl CacheClient for APIClient {
             request_url = preflight_response.location.clone();
         }
 
+        let stream = Body::wrap_stream(artifact_body);
+
         let mut request_builder = self
             .cache_client
             .put(request_url)
             .header("Content-Type", "application/octet-stream")
             .header("x-artifact-duration", duration.to_string())
             .header("User-Agent", self.user_agent.clone())
-            .body(artifact_body.to_vec());
+            .body(stream);
 
         if allow_auth {
             request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
diff --git a/crates/turborepo-auth/src/auth/login.rs b/crates/turborepo-auth/src/auth/login.rs
index b27cfcb534e12..8ba766cdd6f10 100644
--- a/crates/turborepo-auth/src/auth/login.rs
+++ b/crates/turborepo-auth/src/auth/login.rs
@@ -313,7 +313,11 @@ mod tests {
         async fn put_artifact(
             &self,
             _hash: &str,
-            _artifact_body: &[u8],
+            _artifact_body: impl turborepo_api_client::Stream<
+                    Item = Result<turborepo_api_client::Bytes, turborepo_api_client::Error>,
+                > + Send
+                + Sync
+                + 'static,
             _duration: u64,
             _tag: Option<&str>,
             _token: &str,
diff --git a/crates/turborepo-auth/src/auth/sso.rs b/crates/turborepo-auth/src/auth/sso.rs
index 7f626fe7fc9e8..ee9d707d04cd4 100644
--- a/crates/turborepo-auth/src/auth/sso.rs
+++ b/crates/turborepo-auth/src/auth/sso.rs
@@ -310,7 +310,11 @@ mod tests {
         async fn put_artifact(
             &self,
             _hash: &str,
-            _artifact_body: &[u8],
+            _artifact_body: impl turborepo_api_client::Stream<
+                    Item = Result<turborepo_api_client::Bytes, turborepo_api_client::Error>,
+                > + Send
+                + Sync
+                + 'static,
             _duration: u64,
             _tag: Option<&str>,
             _token: &str,
diff --git a/crates/turborepo-auth/src/lib.rs b/crates/turborepo-auth/src/lib.rs
index 6a157e9ae16b4..1959e68ce2d3c 100644
--- a/crates/turborepo-auth/src/lib.rs
+++ b/crates/turborepo-auth/src/lib.rs
@@ -420,7 +420,11 @@ mod tests {
         async fn put_artifact(
             &self,
             _hash: &str,
-            _artifact_body: &[u8],
+            _artifact_body: impl turborepo_api_client::Stream<
+                    Item = Result<turborepo_api_client::Bytes, turborepo_api_client::Error>,
+                > + Send
+                + Sync
+                + 'static,
             _duration: u64,
             _tag: Option<&str>,
             _token: &str,
diff --git a/crates/turborepo-cache/Cargo.toml b/crates/turborepo-cache/Cargo.toml
index 39db432bcf79d..0a79d1bd10f11 100644
--- a/crates/turborepo-cache/Cargo.toml
+++ b/crates/turborepo-cache/Cargo.toml
@@ -24,12 +24,14 @@ workspace = true
 
 [dependencies]
 base64 = "0.21.0"
+bytes.workspace = true
 camino = { workspace = true }
 futures = { workspace = true }
 hmac = "0.12.1"
 os_str_bytes = "6.5.0"
 path-clean = { workspace = true }
 petgraph = "0.6.3"
+pin-project = "1.1.5"
 reqwest = { workspace = true }
 serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
@@ -37,6 +39,8 @@ sha2 = { workspace = true }
 tar = "0.4.38"
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
+tokio-stream = "0.1.15"
+tokio-util = { version = "0.7.10", features = ["codec"] }
 tracing = { workspace = true }
 turbopath = { workspace = true }
 turborepo-analytics = { workspace = true }
diff --git a/crates/turborepo-cache/src/async_cache.rs b/crates/turborepo-cache/src/async_cache.rs
index 94c32a0ad8da6..f117d21aebc37 100644
--- a/crates/turborepo-cache/src/async_cache.rs
+++ b/crates/turborepo-cache/src/async_cache.rs
@@ -1,13 +1,15 @@
-use std::sync::{atomic::AtomicU8, Arc};
+use std::sync::{atomic::AtomicU8, Arc, Mutex};
 
 use futures::{stream::FuturesUnordered, StreamExt};
-use tokio::sync::{mpsc, Semaphore};
+use tokio::sync::{mpsc, oneshot, Semaphore};
 use tracing::{warn, Instrument, Level};
 use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
 use turborepo_analytics::AnalyticsSender;
 use turborepo_api_client::{APIAuth, APIClient};
 
-use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts};
+use crate::{
+    http::UploadMap, multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts,
+};
 
 const WARNING_CUTOFF: u8 = 4;
 
@@ -24,8 +26,11 @@ enum WorkerRequest {
         duration: u64,
         files: Vec<AnchoredSystemPathBuf>,
     },
-    Flush(tokio::sync::oneshot::Sender<()>),
-    Shutdown(tokio::sync::oneshot::Sender<()>),
+    Flush(oneshot::Sender<()>),
+    /// Shutdown the cache. The first oneshot notifies when shutdown starts and
+    /// allows the user to inspect the status of the uploads. The second
+    /// oneshot notifies when the shutdown is complete.
+    Shutdown(oneshot::Sender<Arc<Mutex<UploadMap>>>, oneshot::Sender<()>),
 }
 
 impl AsyncCache {
@@ -95,8 +100,8 @@ impl AsyncCache {
                         }
                         drop(callback);
                     }
-                    WorkerRequest::Shutdown(callback) => {
-                        shutdown_callback = Some(callback);
+                    WorkerRequest::Shutdown(closing, done) => {
+                        shutdown_callback = Some((closing, done));
                         break;
                     }
                 };
@@ -104,10 +109,18 @@ impl AsyncCache {
             // Drop write consumer to immediately notify callers that cache is shutting down
             drop(write_consumer);
 
+            let shutdown_callback = if let Some((closing, done)) = shutdown_callback {
+                closing.send(real_cache.requests().unwrap_or_default()).ok();
+                Some(done)
+            } else {
+                None
+            };
+
             // wait for all writers to finish
             while let Some(worker) = workers.next().await {
                 let _ = worker;
             }
+
             if let Some(callback) = shutdown_callback {
                 callback.send(()).ok();
             }
@@ -162,7 +175,7 @@ impl AsyncCache {
     // before checking the cache.
     #[tracing::instrument(skip_all)]
     pub async fn wait(&self) -> Result<(), CacheError> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
+        let (tx, rx) = oneshot::channel();
         self.writer_sender
             .send(WorkerRequest::Flush(tx))
             .await
@@ -172,14 +185,30 @@ impl AsyncCache {
         Ok(())
     }
 
+    /// Shut down the cache, waiting for all workers to finish writing.
+    /// This function returns as soon as the shut down has started,
+    /// returning a channel through which workers can report on their
+    /// progress.
     #[tracing::instrument(skip_all)]
-    pub async fn shutdown(&self) -> Result<(), CacheError> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
+    pub async fn start_shutdown(
+        &self,
+    ) -> Result<(Arc<Mutex<UploadMap>>, oneshot::Receiver<()>), CacheError> {
+        let (closing_tx, closing_rx) = oneshot::channel::<Arc<Mutex<UploadMap>>>();
+        let (closed_tx, closed_rx) = oneshot::channel::<()>();
         self.writer_sender
-            .send(WorkerRequest::Shutdown(tx))
+            .send(WorkerRequest::Shutdown(closing_tx, closed_tx))
             .await
             .map_err(|_| CacheError::CacheShuttingDown)?;
-        rx.await.ok();
+        Ok((closing_rx.await.unwrap(), closed_rx)) // todo
+    }
+
+    /// Shut down the cache, waiting for all workers to finish writing.
+    /// This function returns only when the last worker is complete.
+    /// It is a convenience wrapper around `start_shutdown`.
+    #[tracing::instrument(skip_all)]
+    pub async fn shutdown(&self) -> Result<(), CacheError> {
+        let (_, closed_rx) = self.start_shutdown().await?;
+        closed_rx.await.ok();
         Ok(())
     }
 }
diff --git a/crates/turborepo-cache/src/http.rs b/crates/turborepo-cache/src/http.rs
index 510fa1e8ee795..fb208567f8fb7 100644
--- a/crates/turborepo-cache/src/http.rs
+++ b/crates/turborepo-cache/src/http.rs
@@ -1,5 +1,11 @@
-use std::{backtrace::Backtrace, io::Write};
+use std::{
+    backtrace::Backtrace,
+    collections::HashMap,
+    io::{Cursor, Write},
+    sync::{Arc, Mutex},
+};
 
+use tokio_stream::StreamExt;
 use tracing::debug;
 use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
 use turborepo_analytics::AnalyticsSender;
@@ -11,15 +17,19 @@ use turborepo_api_client::{
 use crate::{
     cache_archive::{CacheReader, CacheWriter},
     signature_authentication::ArtifactSignatureAuthenticator,
+    upload_progress::{UploadProgress, UploadProgressQuery},
     CacheError, CacheHitMetadata, CacheOpts, CacheSource,
 };
 
+pub type UploadMap = HashMap<String, UploadProgressQuery<10, 100>>;
+
 pub struct HTTPCache {
     client: APIClient,
     signer_verifier: Option<ArtifactSignatureAuthenticator>,
     repo_root: AbsoluteSystemPathBuf,
     api_auth: APIAuth,
     analytics_recorder: Option<AnalyticsSender>,
+    uploads: Arc<Mutex<UploadMap>>,
 }
 
 impl HTTPCache {
@@ -53,6 +63,7 @@ impl HTTPCache {
             client,
             signer_verifier,
             repo_root,
+            uploads: Arc::new(Mutex::new(HashMap::new())),
             api_auth,
             analytics_recorder,
         }
@@ -68,6 +79,7 @@ impl HTTPCache {
     ) -> Result<(), CacheError> {
         let mut artifact_body = Vec::new();
         self.write(&mut artifact_body, anchor, files).await?;
+        let bytes = artifact_body.len();
 
         let tag = self
             .signer_verifier
@@ -75,13 +87,29 @@ impl HTTPCache {
             .map(|signer| signer.generate_tag(hash.as_bytes(), &artifact_body))
             .transpose()?;
 
+        let stream = tokio_util::codec::FramedRead::new(
+            Cursor::new(artifact_body),
+            tokio_util::codec::BytesCodec::new(),
+        )
+        .map(|res| {
+            res.map(|bytes| bytes.freeze())
+                .map_err(turborepo_api_client::Error::from)
+        });
+
+        let (progress, query) = UploadProgress::<10, 100, _>::new(stream, Some(bytes));
+
+        {
+            let mut uploads = self.uploads.lock().unwrap();
+            uploads.insert(hash.to_string(), query);
+        }
+
         tracing::debug!("uploading {}", hash);
 
         match self
             .client
             .put_artifact(
                 hash,
-                &artifact_body,
+                progress,
                 duration,
                 tag.as_deref(),
                 &self.api_auth.token,
@@ -237,6 +265,10 @@ impl HTTPCache {
         )))
     }
 
+    pub fn requests(&self) -> Arc<Mutex<UploadMap>> {
+        self.uploads.clone()
+    }
+
     #[tracing::instrument(skip_all)]
     pub(crate) fn restore_tar(
         root: &AbsoluteSystemPath,
diff --git a/crates/turborepo-cache/src/lib.rs b/crates/turborepo-cache/src/lib.rs
index 0f9fbd00de00d..efcafe5124f9b 100644
--- a/crates/turborepo-cache/src/lib.rs
+++ b/crates/turborepo-cache/src/lib.rs
@@ -19,6 +19,7 @@ mod multiplexer;
 pub mod signature_authentication;
 #[cfg(test)]
 mod test_cases;
+mod upload_progress;
 
 use std::{backtrace, backtrace::Backtrace};
 
diff --git a/crates/turborepo-cache/src/multiplexer.rs b/crates/turborepo-cache/src/multiplexer.rs
index 33a8bccb359e2..4a02954a5b437 100644
--- a/crates/turborepo-cache/src/multiplexer.rs
+++ b/crates/turborepo-cache/src/multiplexer.rs
@@ -1,11 +1,18 @@
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{
+    atomic::{AtomicBool, Ordering},
+    Arc, Mutex,
+};
 
 use tracing::{debug, warn};
 use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf};
 use turborepo_analytics::AnalyticsSender;
 use turborepo_api_client::{APIAuth, APIClient};
 
-use crate::{fs::FSCache, http::HTTPCache, CacheError, CacheHitMetadata, CacheOpts};
+use crate::{
+    fs::FSCache,
+    http::{HTTPCache, UploadMap},
+    CacheError, CacheHitMetadata, CacheOpts,
+};
 
 pub struct CacheMultiplexer {
     // We use an `AtomicBool` instead of removing the cache because that would require
@@ -82,6 +89,10 @@ impl CacheMultiplexer {
         }
     }
 
+    pub fn requests(&self) -> Option<Arc<Mutex<UploadMap>>> {
+        self.http.as_ref().map(|http| http.requests())
+    }
+
     #[tracing::instrument(skip_all)]
     pub async fn put(
         &self,
diff --git a/crates/turborepo-cache/src/upload_progress.rs b/crates/turborepo-cache/src/upload_progress.rs
new file mode 100644
index 0000000000000..8d8979698ad37
--- /dev/null
+++ b/crates/turborepo-cache/src/upload_progress.rs
@@ -0,0 +1,160 @@
+use std::{
+    pin::Pin,
+    sync::{Arc, Mutex, Weak},
+    task::{Context, Poll},
+    time::Instant,
+};
+
+use futures::Stream;
+use pin_project::pin_project;
+
+type State<const BUCKETS: usize> = Mutex<(usize, [(usize, usize); BUCKETS])>;
+
+/// Consists of a total file upload time and a ring buffer of bytes sent per
+/// second over some time interval.
+#[pin_project]
+pub struct UploadProgress<const BUCKETS: usize, const INTERVAL: usize, S: Stream> {
+    /// A pair of bucket generation and bytes uploaded in that bucket.
+    ///
+    /// We need to store the generation to ensure that we don't accidentally
+    /// read from an expired bucket if there is a gap in writing.
+    state: Arc<State<BUCKETS>>,
+    start: Instant,
+    #[pin]
+    inner: S,
+}
+
+impl<const BUCKETS: usize, const INTERVAL: usize, S: Stream> UploadProgress<BUCKETS, INTERVAL, S> {
+    /// Create a new `UploadProgress` with the given stream and interval.
+    pub fn new(inner: S, size: Option<usize>) -> (Self, UploadProgressQuery<BUCKETS, INTERVAL>) {
+        let state = Arc::new(Mutex::new((0, [(0, 0); BUCKETS])));
+        let now = Instant::now();
+        let query = UploadProgressQuery::new(now, Arc::downgrade(&state), size);
+
+        (
+            Self {
+                state,
+                start: now,
+                inner,
+            },
+            query,
+        )
+    }
+}
+
+impl<const BUCKETS: usize, const INTERVAL: usize, S: Stream> Stream
+    for UploadProgress<BUCKETS, INTERVAL, S>
+where
+    S::Item: ProgressLen,
+{
+    type Item = S::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.as_mut().project();
+        match this.inner.poll_next(cx) {
+            Poll::Ready(Some(item)) => {
+                // same as `curr_gen_index` but we can't borrow `self` twice
+                let (curr_gen, index) = {
+                    // usize fits 570 million years of milliseconds since start on 64 bit
+                    let gen = (this.start.elapsed().as_millis() as usize) / INTERVAL;
+                    (gen, gen % BUCKETS)
+                };
+                let mut state = this.state.lock().unwrap();
+                let (gen, value) = &mut state.1[index];
+                if *gen != curr_gen {
+                    *gen = curr_gen;
+                    *value = item.len();
+                } else {
+                    *value += item.len();
+                }
+
+                state.0 += item.len();
+
+                Poll::Ready(Some(item))
+            }
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+trait ProgressLen {
+    fn len(&self) -> usize;
+}
+
+impl ProgressLen for bytes::Bytes {
+    fn len(&self) -> usize {
+        self.len()
+    }
+}
+
+impl<T: ProgressLen, E> ProgressLen for Result<T, E> {
+    fn len(&self) -> usize {
+        match self {
+            Ok(t) => t.len(),
+            Err(_) => 0,
+        }
+    }
+}
+
+#[derive(Clone)]
+pub struct UploadProgressQuery<const BUCKETS: usize, const INTERVAL: usize> {
+    start: Instant,
+    state: Weak<State<BUCKETS>>,
+    size: Option<usize>,
+}
+
+impl<const BUCKETS: usize, const INTERVAL: usize> UploadProgressQuery<BUCKETS, INTERVAL> {
+    fn new(start: Instant, state: Weak<State<BUCKETS>>, size: Option<usize>) -> Self {
+        Self { start, state, size }
+    }
+
+    // Note: this usize is since the upload started so, on 64 bit systems, it
+    // should be good for 584.5 million years. Downcasting is probably safe...
+    fn curr_gen(&self) -> usize {
+        let since = self.start.elapsed().as_millis() as usize;
+        since / self.interval_ms()
+    }
+
+    pub const fn interval_ms(&self) -> usize {
+        INTERVAL
+    }
+
+    /// Get the total number of bytes uploaded.
+    ///
+    /// Returns `None` if the `UploadProgress` has been dropped.
+    pub fn bytes(&self) -> Option<usize> {
+        self.state.upgrade().map(|s| s.lock().unwrap().0)
+    }
+
+    pub fn size(&self) -> Option<usize> {
+        self.size
+    }
+
+    pub fn done(&self) -> bool {
+        self.state.strong_count() == 0
+    }
+
+    /// Get the average bytes per second over the last `SIZE` intervals.
+    ///
+    /// Returns `None` if the `UploadProgress` has been dropped.
+    pub fn average_bps(&self) -> Option<f64> {
+        let curr_gen = self.curr_gen();
+        let min_gen = curr_gen.saturating_sub(BUCKETS);
+        self.state.upgrade().map(|s| {
+            let s = s.lock().unwrap();
+            let total_bytes =
+                s.1.iter()
+                    .filter(|(gen, _)| *gen >= min_gen)
+                    .map(|(_, bytes)| *bytes)
+                    .sum::<usize>();
+
+            // buckets * interval = milliseconds, so we multiply by 1000 to get seconds
+            (total_bytes as f64 / (BUCKETS * INTERVAL) as f64) * 1000.0
+        })
+    }
+}
diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml
index 041a1fd31e9a5..27eb0ca243c0b 100644
--- a/crates/turborepo-lib/Cargo.toml
+++ b/crates/turborepo-lib/Cargo.toml
@@ -61,6 +61,7 @@ globwalk = { version = "0.1.0", path = "../turborepo-globwalk" }
 globwatch = { path = "../turborepo-globwatch" }
 go-parse-duration = "0.1.1"
 hex = "0.4.3"
+human_format = "1.1.0"
 humantime = "2.1.0"
 ignore = "0.4.22"
 itertools = { workspace = true }
diff --git a/crates/turborepo-lib/src/config.rs b/crates/turborepo-lib/src/config.rs
index cffd2428577c8..1ab014ac18639 100644
--- a/crates/turborepo-lib/src/config.rs
+++ b/crates/turborepo-lib/src/config.rs
@@ -322,7 +322,10 @@ fn get_env_var_config(
     turbo_mapping.insert(OsString::from("turbo_teamid"), "team_id");
     turbo_mapping.insert(OsString::from("turbo_token"), "token");
     turbo_mapping.insert(OsString::from("turbo_remote_cache_timeout"), "timeout");
-    turbo_mapping.insert(OsString::from("turbo_api_timeout"), "api_timeout");
+    turbo_mapping.insert(
+        OsString::from("turbo_remote_cache_upload_timeout"),
+        "upload_timeout",
+    );
     turbo_mapping.insert(OsString::from("turbo_experimental_ui"), "experimental_ui");
     turbo_mapping.insert(OsString::from("turbo_preflight"), "preflight");
 
diff --git a/crates/turborepo-lib/src/run/cache.rs b/crates/turborepo-lib/src/run/cache.rs
index d4e50b9074783..4f49e9da3789d 100644
--- a/crates/turborepo-lib/src/run/cache.rs
+++ b/crates/turborepo-lib/src/run/cache.rs
@@ -1,10 +1,15 @@
-use std::{io::Write, sync::Arc, time::Duration};
+use std::{
+    io::Write,
+    sync::{Arc, Mutex},
+    time::Duration,
+};
 
+use tokio::sync::oneshot;
 use tracing::{debug, error};
 use turbopath::{
     AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf,
 };
-use turborepo_cache::{AsyncCache, CacheError, CacheHitMetadata, CacheSource};
+use turborepo_cache::{http::UploadMap, AsyncCache, CacheError, CacheHitMetadata, CacheSource};
 use turborepo_repository::package_graph::PackageInfo;
 use turborepo_scm::SCM;
 use turborepo_telemetry::events::{task::PackageTaskEventBuilder, TrackedErrors};
@@ -120,9 +125,11 @@ impl RunCache {
         }
     }
 
-    pub async fn shutdown_cache(&self) {
+    pub async fn shutdown_cache(
+        &self,
+    ) -> Result<(Arc<Mutex<UploadMap>>, oneshot::Receiver<()>), CacheError> {
         // Ignore errors coming from cache already shutting down
-        self.cache.shutdown().await.ok();
+        self.cache.start_shutdown().await
     }
 }
 
diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs
index 2e73d7a48cffd..a2a291b4e40cc 100644
--- a/crates/turborepo-lib/src/run/mod.rs
+++ b/crates/turborepo-lib/src/run/mod.rs
@@ -12,12 +12,12 @@ pub mod task_access;
 pub mod task_id;
 pub mod watch;
 
-use std::{collections::HashSet, io::Write, sync::Arc};
+use std::{collections::HashSet, io::Write, sync::Arc, time::Duration};
 
 pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache};
 use chrono::{DateTime, Local};
 use rayon::iter::ParallelBridge;
-use tokio::task::JoinHandle;
+use tokio::{select, task::JoinHandle};
 use tracing::debug;
 use turbopath::AbsoluteSystemPathBuf;
 use turborepo_api_client::{APIAuth, APIClient};
@@ -161,7 +161,66 @@ impl Run {
             tokio::spawn(async move {
                 let _guard = subscriber.listen().await;
                 let spinner = turborepo_ui::start_spinner("...Finishing writing to cache...");
-                run_cache.shutdown_cache().await;
+                if let Ok((status, closed)) = run_cache.shutdown_cache().await {
+                    let fut = async {
+                        loop {
+                            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+                            // loop through hashmap, extract items that are still running,
+                            // sum up bit per second
+                            let (bytes_per_second, bytes_uploaded, bytes_total) = {
+                                let status = status.lock().unwrap();
+                                let total_bps: f64 = status
+                                    .iter()
+                                    .filter_map(|(_hash, task)| task.average_bps())
+                                    .sum();
+                                let bytes_uploaded: usize =
+                                    status.iter().filter_map(|(_hash, task)| task.bytes()).sum();
+                                let bytes_total: usize = status
+                                    .iter()
+                                    .filter(|(_hash, task)| !task.done())
+                                    .filter_map(|(_hash, task)| task.size())
+                                    .sum();
+                                (total_bps, bytes_uploaded, bytes_total)
+                            };
+
+                            if bytes_total == 0 {
+                                continue;
+                            }
+
+                            // convert to human readable
+                            let mut formatter = human_format::Formatter::new();
+                            let formatter = formatter.with_decimals(2).with_separator("");
+                            let bytes_per_second =
+                                formatter.with_units("B/s").format(bytes_per_second);
+                            let bytes_remaining = formatter
+                                .with_units("B")
+                                .format(bytes_total.saturating_sub(bytes_uploaded) as f64);
+
+                            spinner.set_message(format!(
+                                "...Finishing writing to cache... ({} remaining, {})",
+                                bytes_remaining, bytes_per_second
+                            ));
+                        }
+                    };
+
+                    let interrupt = async {
+                        if let Ok(fut) = crate::commands::run::get_signal() {
+                            fut.await;
+                        } else {
+                            tracing::warn!("could not register ctrl-c handler");
+                            // wait forever
+                            tokio::time::sleep(Duration::MAX).await;
+                        }
+                    };
+
+                    select! {
+                        _ = closed => {}
+                        _ = fut => {}
+                        _ = interrupt => {tracing::debug!("received interrupt, exiting");}
+                    }
+                } else {
+                    tracing::warn!("could not start shutdown, exiting");
+                }
                 spinner.finish_and_clear();
             });
         }