Skip to content

Commit

Permalink
add counter metrics for remote execution (#11155)
Browse files Browse the repository at this point in the history
Add counters to the remote execution code to provide greater visibility into its operation.
  • Loading branch information
Tom Dyas authored Nov 14, 2020
1 parent 43bb218 commit f084d4f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
57 changes: 50 additions & 7 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ impl CommandRunner {
loop {
// If we are currently retrying a request, then delay using an exponential backoff.
if num_retries > 0 {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCRetries, 1);

let multiplier = thread_rng().gen_range(0, 2_u32.pow(num_retries) + 1);
let sleep_time = self.retry_interval_duration * multiplier;
let sleep_time = sleep_time.min(MAX_BACKOFF_DURATION);
Expand All @@ -554,6 +558,9 @@ impl CommandRunner {
"no current operation: submitting execute request: build_id={}; execute_request={:?}",
context.build_id, execute_request
);
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCExecute, 1);
self
.execution_client
.execute_opt(&execute_request, call_opt)
Expand All @@ -566,6 +573,9 @@ impl CommandRunner {
"existing operation: reconnecting to operation stream: build_id={}; operation_name={}",
context.build_id, operation_name
);
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCWaitExecution, 1);
let mut wait_execution_request = WaitExecutionRequest::new();
wait_execution_request.set_name(operation_name.to_owned());
self
Expand Down Expand Up @@ -601,6 +611,9 @@ impl CommandRunner {
// of retries allowed since the last successful connection. (There is no point in
// continually submitting a request if ultimately futile.)
if num_retries >= MAX_RETRIES {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCErrors, 1);
return Err(
"Too many failures from server. The last event was the server disconnecting with no error given.".to_owned(),
);
Expand All @@ -622,6 +635,9 @@ impl CommandRunner {
OperationOrStatus::Status(status)
}
_ => {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCErrors, 1);
return Err(format!("gRPC error: {}", err));
}
},
Expand All @@ -630,13 +646,21 @@ impl CommandRunner {
match self.extract_execute_response(actionable_result).await {
Ok(result) => return Ok(result),
Err(err) => match err {
ExecutionError::Fatal(e) => return Err(e),
ExecutionError::Fatal(e) => {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCErrors, 1);
return Err(e);
}
ExecutionError::Retryable(e) => {
// Check if the number of request attempts sent thus far have exceeded the number
// of retries allowed since the last successful connection. (There is no point in
// continually submitting a request if ultimately futile.)
trace!("retryable error: {}", e);
if num_retries >= MAX_RETRIES {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRPCErrors, 1);
return Err(format!(
"Too many failures from server. The last error was: {}",
e
Expand All @@ -659,6 +683,9 @@ impl CommandRunner {
.await?;
}
ExecutionError::Timeout => {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionTimeouts, 1);
return populate_fallible_execution_result_for_timeout(
&self.store,
&process.description,
Expand All @@ -667,7 +694,7 @@ impl CommandRunner {
Vec::new(),
self.platform,
)
.await
.await;
}
},
}
Expand All @@ -683,10 +710,6 @@ impl crate::CommandRunner for CommandRunner {
request: MultiPlatformProcess,
context: Context,
) -> Result<FallibleProcessResultWithPlatform, String> {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRequests, 1);

// Retrieve capabilities for this server.
let capabilities = self.get_capabilities().await?;
trace!("RE capabilities: {:?}", &capabilities);
Expand Down Expand Up @@ -754,6 +777,9 @@ impl crate::CommandRunner for CommandRunner {
.await?;

// Submit the execution request to the RE server for execution.
context
.workunit_store
.increment_counter(Metric::RemoteExecutionRequests, 1);
let result_fut = self.run_execute_request(execute_request, request, &context);
let timeout_fut = tokio::time::timeout(deadline_duration, result_fut);
let response = with_workunit(
Expand All @@ -773,13 +799,30 @@ impl crate::CommandRunner for CommandRunner {
},
)
.await;

// Detect whether the operation ran or hit the deadline timeout.
match response {
Ok(r) => r,
Ok(result) => {
if result.is_ok() {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionSuccess, 1);
} else {
context
.workunit_store
.increment_counter(Metric::RemoteExecutionErrors, 1);
}
result
}
Err(_) => {
// The Err in this match arm originates from the timeout future.
debug!(
"remote execution for build_id={} timed out after {:?}",
&build_id, deadline_duration
);
context
.workunit_store
.increment_counter(Metric::RemoteExecutionTimeouts, 1);
Err(format!(
"remote execution timed out after {:?}",
deadline_duration
Expand Down
18 changes: 16 additions & 2 deletions src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@ pub enum Metric {
LocalCacheReadErrors,
LocalCacheWriteErrors,
LocalExecutionRequests,
RemoteExecutionRequests,
RemoteCacheRequests,
RemoteCacheRequestsCached,
RemoteCacheRequestsUncached,
RemoteCacheReadErrors,
RemoteCacheWriteErrors,
RemoteExecutionErrors,
RemoteExecutionRequests,
RemoteExecutionRPCErrors,
RemoteExecutionRPCExecute,
RemoteExecutionRPCRetries,
RemoteExecutionRPCWaitExecution,
RemoteExecutionSuccess,
RemoteExecutionTimeouts,
}

impl Metric {
Expand All @@ -54,12 +61,19 @@ impl Metric {
LocalCacheReadErrors => "local_cache_read_errors",
LocalCacheWriteErrors => "local_cache_write_errors",
LocalExecutionRequests => "local_execution_requests",
RemoteExecutionRequests => "remote_execution_requests",
RemoteCacheRequests => "remote_cache_requests",
RemoteCacheRequestsCached => "remote_cache_requests_cached",
RemoteCacheRequestsUncached => "remote_cache_requests_uncached",
RemoteCacheReadErrors => "remote_cache_read_errors",
RemoteCacheWriteErrors => "remote_cache_write_errors",
RemoteExecutionErrors => "remote_execution_errors",
RemoteExecutionRequests => "remote_execution_requests",
RemoteExecutionRPCRetries => "remote_execution_rpc_retries",
RemoteExecutionRPCErrors => "remote_execution_rpc_errors",
RemoteExecutionRPCExecute => "remote_execution_rpc_execute",
RemoteExecutionRPCWaitExecution => "remote_execution_rpc_wait_execution",
RemoteExecutionSuccess => "remote_execution_success",
RemoteExecutionTimeouts => "remote_execution_timeouts",
}
}
}

0 comments on commit f084d4f

Please sign in to comment.