Skip to content

Commit

Permalink
Include timeSaved metric when skipping cache check (#4952)
Browse files Browse the repository at this point in the history
After this change, the `timeSaved` metric will be included in:
`--summarize` otutput file when the daemon does not need to restore cache
and will also be sent to Spaces.

Co-authored-by: Nicholas Yang <nicholas.yang@vercel.com>
Co-authored-by: Chris Olszewski <chris.olszewski@vercel.com>
  • Loading branch information
3 people authored May 31, 2023
1 parent 3097876 commit d5897fd
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 19 deletions.
10 changes: 5 additions & 5 deletions cli/internal/daemonclient/daemonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ func New(client *connector.Client) *DaemonClient {
}

// GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) {
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) {
resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs,
})
if err != nil {
return nil, err
return nil, 0, err
}

return resp.ChangedOutputGlobs, nil
return resp.ChangedOutputGlobs, int(resp.TimeSaved), nil
}

// NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error {
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error {
_, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs.Inclusions,
OutputExclusionGlobs: repoRelativeOutputGlobs.Exclusions,
TimeSaved: uint64(timeSaved),
})
return err
}
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/runcache/output_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
// OutputWatcher instances are responsible for tracking changes to task outputs
type OutputWatcher interface {
// GetChangedOutputs returns which of the given globs have changed since the specified hash was last run
GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error)
GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error)
// NotifyOutputsWritten tells the watcher that the given globs have been cached with the specified hash
NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error
NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error
}

// NoOpOutputWatcher implements OutputWatcher, but always considers every glob to have changed
Expand All @@ -21,12 +21,12 @@ var _ OutputWatcher = (*NoOpOutputWatcher)(nil)

// GetChangedOutputs implements OutputWatcher.GetChangedOutputs.
// Since this is a no-op watcher, no tracking is done.
func (NoOpOutputWatcher) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) {
return repoRelativeOutputGlobs, nil
func (NoOpOutputWatcher) GetChangedOutputs(_ context.Context, _ string, repoRelativeOutputGlobs []string) ([]string, int, error) {
return repoRelativeOutputGlobs, 0, nil
}

// NotifyOutputsWritten implements OutputWatcher.NotifyOutputsWritten.
// Since this is a no-op watcher, consider all globs to have changed
func (NoOpOutputWatcher) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs) error {
func (NoOpOutputWatcher) NotifyOutputsWritten(_ context.Context, _ string, _ fs.TaskOutputs, _ int) error {
return nil
}
8 changes: 5 additions & 3 deletions cli/internal/runcache/runcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}

changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions)
changedOutputGlobs, timeSavedFromDaemon, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions)

if err != nil {
progressLogger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err))
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)))
Expand Down Expand Up @@ -149,13 +150,14 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}

if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil {
if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, timeSavedFromDaemon); err != nil {
// Don't fail the whole operation just because we failed to watch the outputs
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)))
}
} else {
// If no outputs have changed, that means we have a local cache hit.
cacheStatus.Local = true
timeSaved = timeSavedFromDaemon
prefixedUI.Warn(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID))
}

Expand Down Expand Up @@ -279,7 +281,7 @@ func (tc *TaskCache) SaveOutputs(ctx context.Context, logger hclog.Logger, termi
if err = tc.rc.cache.Put(tc.rc.repoRoot, tc.hash, duration, relativePaths); err != nil {
return err
}
err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs)
err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs, duration)
if err != nil {
// Don't fail the cache write because we also failed to record it, we will just do
// extra I/O in the future restoring files that haven't changed from cache
Expand Down
2 changes: 2 additions & 0 deletions cli/internal/turbodprotocol/turbod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message NotifyOutputsWrittenRequest {
repeated string output_globs = 1;
string hash = 2;
repeated string output_exclusion_globs = 3;
uint64 time_saved = 4;
}

message NotifyOutputsWrittenResponse {}
Expand All @@ -45,6 +46,7 @@ message GetChangedOutputsRequest {

message GetChangedOutputsResponse {
repeated string changed_output_globs = 1;
uint64 time_saved = 2;
}

message DaemonStatus {
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ impl DaemonClient<DaemonConnector> {
hash: String,
output_globs: Vec<String>,
output_exclusion_globs: Vec<String>,
time_saved: u64,
) -> Result<(), DaemonError> {
self.client
.notify_outputs_written(proto::NotifyOutputsWrittenRequest {
hash,
output_globs,
output_exclusion_globs,
time_saved,
})
.await?;

Expand Down
23 changes: 17 additions & 6 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
//! globs, and to query for changes for those globs.

use std::{
collections::HashSet,
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex as StdMutux,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -58,6 +58,8 @@ pub struct DaemonServer<T: Watcher> {
shutdown_rx: Option<Receiver<()>>,

running: Arc<AtomicBool>,

times_saved: Arc<std::sync::Mutex<HashMap<String, u64>>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -98,6 +100,7 @@ impl DaemonServer<notify::RecommendedWatcher> {
shutdown_rx: Some(recv_shutdown),

running: Arc::new(AtomicBool::new(true)),
times_saved: Arc::new(StdMutux::new(HashMap::new())),
})
}
}
Expand Down Expand Up @@ -255,6 +258,10 @@ impl<T: Watcher + Send + 'static> proto::turbod_server::Turbod for DaemonServer<
) -> Result<tonic::Response<proto::NotifyOutputsWrittenResponse>, tonic::Status> {
let inner = request.into_inner();

{
let mut times_saved = self.times_saved.lock().expect("times saved lock poisoned");
times_saved.insert(inner.hash.clone(), inner.time_saved);
}
match self
.watcher
.watch_globs(
Expand All @@ -277,17 +284,21 @@ impl<T: Watcher + Send + 'static> proto::turbod_server::Turbod for DaemonServer<
request: tonic::Request<proto::GetChangedOutputsRequest>,
) -> Result<tonic::Response<proto::GetChangedOutputsResponse>, tonic::Status> {
let inner = request.into_inner();
let hash = Arc::new(inner.hash);
let changed = self
.watcher
.changed_globs(
&Arc::new(inner.hash),
HashSet::from_iter(inner.output_globs),
)
.changed_globs(&hash, HashSet::from_iter(inner.output_globs))
.await;

let time_saved = {
let times_saved = self.times_saved.lock().expect("times saved lock poisoned");
times_saved.get(hash.as_str()).copied().unwrap_or_default()
};

match changed {
Ok(changed) => Ok(tonic::Response::new(proto::GetChangedOutputsResponse {
changed_output_globs: changed.into_iter().collect(),
time_saved: time_saved,
})),
Err(e) => {
error!("flush directory operation failed: {:?}", e);
Expand Down

0 comments on commit d5897fd

Please sign in to comment.