Skip to content

Commit

Permalink
Remove the requirement of using reference for action_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Apr 27, 2022
1 parent f7b565f commit 5daa4ff
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 115 deletions.
3 changes: 2 additions & 1 deletion cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ impl ExecutionServer {
.await
.err_tip(|| "Failed to schedule task")?;

let receiver_stream = Box::pin(WatchStream::new(rx).map(|action_update| Ok(action_update.as_ref().into())));
let receiver_stream =
Box::pin(WatchStream::new(rx).map(|action_update| Ok(action_update.as_ref().clone().into())));
Ok(tonic::Response::new(receiver_stream))
}
}
Expand Down
18 changes: 5 additions & 13 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use config::cas_server::{SchedulerConfig, WorkerApiConfig};
use error::{Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, NodeProperties,
OutputDirectory, OutputFile, OutputSymlink,
ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, OutputDirectory, OutputFile,
OutputSymlink,
};
use proto::com::github::allada::turbo_cache::remote_execution::{
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteFinishedResult, ExecuteResult,
Expand Down Expand Up @@ -298,21 +298,13 @@ pub mod execution_response_tests {
digest: Some(DigestInfo::new([8u8; 32], 124).into()),
is_executable: true,
contents: Default::default(), // We don't implement this.
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(99).into()),
unix_mode: Some(12),
}),
node_properties: None,
}],
output_file_symlinks: Default::default(), // Bazel deprecated this.
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: Some(NodeProperties {
properties: Default::default(), // We don't implement this.
mtime: Some(make_system_time(97).into()),
unix_mode: Some(10),
}),
node_properties: None,
}],
output_directories: vec![OutputDirectory {
path: "some path4".to_string(),
Expand Down Expand Up @@ -376,7 +368,7 @@ pub mod execution_response_tests {

// We just checked if conversion from ExecuteResponse into ActionStage was an exact mach.
// Now check if we cast the ActionStage into an ExecuteResponse we get the exact same struct.
assert_eq!(execute_response, (&client_given_state.stage).into());
assert_eq!(execute_response, client_given_state.stage.clone().into());
}
Ok(())
}
Expand Down
139 changes: 44 additions & 95 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use prost::Message;
use prost_types::Any;
use proto::build::bazel::remote::execution::v2::{
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, NodeProperties, OutputDirectory, OutputFile,
OutputSymlink, SymlinkNode,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink,
SymlinkNode,
};
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};

Expand Down Expand Up @@ -135,11 +135,12 @@ impl ActionInfo {
}
}

impl Into<ExecuteRequest> for &ActionInfo {
impl Into<ExecuteRequest> for ActionInfo {
fn into(self) -> ExecuteRequest {
let digest = self.digest().into();
ExecuteRequest {
instance_name: self.instance_name.clone(),
action_digest: Some(self.digest().into()),
instance_name: self.instance_name,
action_digest: Some(digest),
skip_cache_lookup: true, // The worker should never cache lookup.
execution_policy: None, // Not used in the worker.
results_cache_policy: None, // Not used in the worker.
Expand Down Expand Up @@ -236,26 +237,20 @@ pub struct FileInfo {
pub name_or_path: NameOrPath,
pub digest: DigestInfo,
pub is_executable: bool,
pub mtime: SystemTime,
pub permissions: u32,
}

impl Into<FileNode> for &FileInfo {
impl Into<FileNode> for FileInfo {
fn into(self) -> FileNode {
let name = if let NameOrPath::Name(name) = &self.name_or_path {
name.clone()
let name = if let NameOrPath::Name(name) = self.name_or_path {
name
} else {
panic!("Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
};
FileNode {
name,
digest: Some((&self.digest).into()),
is_executable: self.is_executable,
node_properties: Some(NodeProperties {
properties: Default::default(),
mtime: Some(self.mtime.into()),
unix_mode: Some(self.permissions),
}),
node_properties: Default::default(), // Not supported.
}
}
}
Expand All @@ -264,32 +259,21 @@ impl TryFrom<OutputFile> for FileInfo {
type Error = Error;

fn try_from(output_file: OutputFile) -> Result<Self, Error> {
let node_properties = output_file
.node_properties
.err_tip(|| "Expected node_properties to exist on OutputFile")?;
Ok(FileInfo {
name_or_path: NameOrPath::Path(output_file.path),
digest: output_file
.digest
.err_tip(|| "Expected digest to exist on OutputFile")?
.try_into()?,
is_executable: output_file.is_executable,
mtime: node_properties
.mtime
.err_tip(|| "Expected mtime to exist in OutputFile")?
.try_into()?,
permissions: node_properties
.unix_mode
.err_tip(|| "Expected unix_mode to exist in OutputFile")?
.try_into()?,
})
}
}

impl Into<OutputFile> for &FileInfo {
impl Into<OutputFile> for FileInfo {
fn into(self) -> OutputFile {
let path = if let NameOrPath::Path(path) = &self.name_or_path {
path.clone()
let path = if let NameOrPath::Path(path) = self.name_or_path {
path
} else {
panic!("Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()");
};
Expand All @@ -298,11 +282,7 @@ impl Into<OutputFile> for &FileInfo {
digest: Some((&self.digest).into()),
is_executable: self.is_executable,
contents: Default::default(),
node_properties: Some(NodeProperties {
properties: Default::default(),
mtime: Some(self.mtime.into()),
unix_mode: Some(self.permissions),
}),
node_properties: Default::default(), // Not supported.
}
}
}
Expand All @@ -313,47 +293,30 @@ impl Into<OutputFile> for &FileInfo {
pub struct SymlinkInfo {
pub name_or_path: NameOrPath,
pub target: String,
pub mtime: SystemTime,
pub permissions: u32,
}

impl TryFrom<SymlinkNode> for SymlinkInfo {
type Error = Error;

fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
let node_properties = symlink_node
.node_properties
.err_tip(|| "Expected node_properties to exist on SymlinkNode")?;
Ok(SymlinkInfo {
name_or_path: NameOrPath::Name(symlink_node.name),
target: symlink_node.target,
mtime: node_properties
.mtime
.err_tip(|| "Expected mtime to exist in SymlinkNode")?
.try_into()?,
permissions: node_properties
.unix_mode
.err_tip(|| "Expected unix_mode to exist in SymlinkNode")?
.try_into()?,
})
}
}

impl Into<SymlinkNode> for &SymlinkInfo {
impl Into<SymlinkNode> for SymlinkInfo {
fn into(self) -> SymlinkNode {
let name = if let NameOrPath::Name(name) = &self.name_or_path {
name.clone()
let name = if let NameOrPath::Name(name) = self.name_or_path {
name
} else {
panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
};
SymlinkNode {
name,
target: self.target.clone(),
node_properties: Some(NodeProperties {
properties: Default::default(),
mtime: Some(self.mtime.into()),
unix_mode: Some(self.permissions),
}),
target: self.target,
node_properties: Default::default(), // Not supported.
}
}
}
Expand All @@ -362,39 +325,24 @@ impl TryFrom<OutputSymlink> for SymlinkInfo {
type Error = Error;

fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
let node_properties = output_symlink
.node_properties
.err_tip(|| "Expected node_properties to exist on OutputSymlink")?;
Ok(SymlinkInfo {
name_or_path: NameOrPath::Path(output_symlink.path),
target: output_symlink.target,
mtime: node_properties
.mtime
.err_tip(|| "Expected mtime to exist in OutputSymlink")?
.try_into()?,
permissions: node_properties
.unix_mode
.err_tip(|| "Expected unix_mode to exist in OutputSymlink")?
.try_into()?,
})
}
}

impl Into<OutputSymlink> for &SymlinkInfo {
impl Into<OutputSymlink> for SymlinkInfo {
fn into(self) -> OutputSymlink {
let path = if let NameOrPath::Path(path) = &self.name_or_path {
path.clone()
let path = if let NameOrPath::Path(path) = self.name_or_path {
path
} else {
panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
};
OutputSymlink {
path,
target: self.target.clone(),
node_properties: Some(NodeProperties {
properties: Default::default(),
mtime: Some(self.mtime.into()),
unix_mode: Some(self.permissions),
}),
target: self.target,
node_properties: Default::default(), // Not supported.
}
}
}
Expand All @@ -421,11 +369,11 @@ impl TryFrom<OutputDirectory> for DirectoryInfo {
}
}

impl Into<OutputDirectory> for &DirectoryInfo {
impl Into<OutputDirectory> for DirectoryInfo {
fn into(self) -> OutputDirectory {
OutputDirectory {
path: self.path.clone(),
tree_digest: Some((&self.tree_digest).into()),
path: self.path,
tree_digest: Some(self.tree_digest.into()),
}
}
}
Expand All @@ -446,10 +394,10 @@ pub struct ExecutionMetadata {
pub output_upload_completed_timestamp: SystemTime,
}

impl Into<ExecutedActionMetadata> for &ExecutionMetadata {
impl Into<ExecutedActionMetadata> for ExecutionMetadata {
fn into(self) -> ExecutedActionMetadata {
ExecutedActionMetadata {
worker: self.worker.clone(),
worker: self.worker,
queued_timestamp: Some(self.queued_timestamp.into()),
worker_start_timestamp: Some(self.worker_start_timestamp.into()),
worker_completed_timestamp: Some(self.worker_completed_timestamp.into()),
Expand Down Expand Up @@ -576,7 +524,7 @@ impl Into<execution_stage::Value> for &ActionStage {
}
}

impl Into<ExecuteResponse> for &ActionStage {
impl Into<ExecuteResponse> for ActionStage {
fn into(self) -> ExecuteResponse {
let (error, action_result, was_from_cache) = match self {
// We don't have an execute response if we don't have the results. It is defined
Expand All @@ -588,12 +536,12 @@ impl Into<ExecuteResponse> for &ActionStage {

ActionStage::Completed(action_result) => (None, action_result, false),
ActionStage::CompletedFromCache(action_result) => (None, action_result, true),
ActionStage::Error((error, action_result)) => (Some(error.clone()), action_result, false),
ActionStage::Error((error, action_result)) => (Some(error), action_result, false),
};
let mut server_logs = HashMap::with_capacity(action_result.server_logs.len());
for (k, v) in &action_result.server_logs {
for (k, v) in action_result.server_logs {
server_logs.insert(
k.clone(),
k,
LogFile {
digest: Some(v.into()),
human_readable: false,
Expand All @@ -603,13 +551,13 @@ impl Into<ExecuteResponse> for &ActionStage {

ExecuteResponse {
result: Some(ProtoActionResult {
output_files: action_result.output_files.iter().map(|v| v.into()).collect(),
output_symlinks: action_result.output_symlinks.iter().map(|v| v.into()).collect(),
output_directories: action_result.output_folders.iter().map(|v| v.into()).collect(),
output_files: action_result.output_files.into_iter().map(|v| v.into()).collect(),
output_symlinks: action_result.output_symlinks.into_iter().map(|v| v.into()).collect(),
output_directories: action_result.output_folders.into_iter().map(|v| v.into()).collect(),
exit_code: action_result.exit_code,
stdout_digest: Some((&action_result.stdout_digest).into()),
stderr_digest: Some((&action_result.stderr_digest).into()),
execution_metadata: Some((&action_result.execution_metadata).into()),
stdout_digest: Some(action_result.stdout_digest.into()),
stderr_digest: Some(action_result.stderr_digest.into()),
execution_metadata: Some(action_result.execution_metadata.into()),
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
stdout_raw: Default::default(),
Expand Down Expand Up @@ -678,10 +626,11 @@ pub struct ActionState {
pub stage: ActionStage,
}

impl Into<Operation> for &ActionState {
impl Into<Operation> for ActionState {
fn into(self) -> Operation {
let has_action_result = self.stage.has_action_result();
let execute_response: ExecuteResponse = (&self.stage).into();
let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
let execute_response: ExecuteResponse = self.stage.into();

let serialized_response = if has_action_result {
execute_response.encode_to_vec()
Expand All @@ -690,15 +639,15 @@ impl Into<Operation> for &ActionState {
};

let metadata = ExecuteOperationMetadata {
stage: Into::<execution_stage::Value>::into(&self.stage) as i32,
stage,
action_digest: Some((&self.action_digest).into()),
// TODO(blaise.bruer) We should support stderr/stdout streaming.
stdout_stream_name: Default::default(),
stderr_stream_name: Default::default(),
};

Operation {
name: self.name.clone(),
name: self.name,
metadata: Some(Any {
type_url: "build.bazel.remote.execution.v2.ExecuteOperationMetadata".to_string(),
value: metadata.encode_to_vec(),
Expand Down
4 changes: 0 additions & 4 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ mod scheduler_tests {
name_or_path: NameOrPath::Name("hello".to_string()),
digest: DigestInfo::new([5u8; 32], 18),
is_executable: true,
mtime: make_system_time(111),
permissions: 55,
}],
output_folders: vec![DirectoryInfo {
path: "123".to_string(),
Expand All @@ -482,8 +480,6 @@ mod scheduler_tests {
output_symlinks: vec![SymlinkInfo {
name_or_path: NameOrPath::Name("foo".to_string()),
target: "bar".to_string(),
mtime: make_system_time(99),
permissions: 445,
}],
exit_code: 0,
stdout_digest: DigestInfo::new([6u8; 32], 19),
Expand Down
3 changes: 2 additions & 1 deletion cas/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ impl Worker {
}

fn run_action(&mut self, action_info: Arc<ActionInfo>) -> Result<(), Error> {
let action_info_clone = action_info.as_ref().clone();
self.running_action_info = Some(action_info.clone());
self.reduce_platform_properties(&action_info.platform_properties);
self.send_msg_to_worker(update_for_worker::Update::StartAction(StartExecute {
execute_request: Some(self.running_action_info.as_ref().unwrap().as_ref().into()),
execute_request: Some(action_info_clone.into()),
salt: *action_info.salt(),
}))
}
Expand Down
Loading

0 comments on commit 5daa4ff

Please sign in to comment.