Skip to content

Commit

Permalink
Merge branch 'main' into cassandra-key-value-storage-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
irach-ramos authored Oct 1, 2024
2 parents 7670636 + b1e83ad commit bc07cf9
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 202 deletions.
2 changes: 1 addition & 1 deletion golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ impl WorkerEvent {
}),
WorkerEvent::StdErr { timestamp, bytes } => Some(OplogEntry::Log {
timestamp: *timestamp,
level: oplog::LogLevel::Stdout,
level: oplog::LogLevel::Stderr,
context: String::new(),
message: String::from_utf8_lossy(bytes).to_string(),
}),
Expand Down
235 changes: 122 additions & 113 deletions golem-test-framework/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ pub trait TestDsl {
async fn get_worker_metadata(
&self,
worker_id: &WorkerId,
) -> crate::Result<Option<WorkerMetadata>>;
) -> crate::Result<Option<(WorkerMetadata, Option<String>)>>;
async fn get_workers_metadata(
&self,
component_id: &ComponentId,
filter: Option<WorkerFilter>,
cursor: ScanCursor,
count: u64,
precise: bool,
) -> crate::Result<(Option<ScanCursor>, Vec<WorkerMetadata>)>;
) -> crate::Result<(Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>)>;
async fn delete_worker(&self, worker_id: &WorkerId) -> crate::Result<()>;

async fn invoke(
Expand Down Expand Up @@ -283,7 +283,7 @@ impl<T: TestDependencies + Send + Sync> TestDsl for T {
async fn get_worker_metadata(
&self,
worker_id: &WorkerId,
) -> crate::Result<Option<WorkerMetadata>> {
) -> crate::Result<Option<(WorkerMetadata, Option<String>)>> {
let worker_id: golem_api_grpc::proto::golem::worker::WorkerId = worker_id.clone().into();
let response = self
.worker_service()
Expand Down Expand Up @@ -319,7 +319,7 @@ impl<T: TestDependencies + Send + Sync> TestDsl for T {
cursor: ScanCursor,
count: u64,
precise: bool,
) -> crate::Result<(Option<ScanCursor>, Vec<WorkerMetadata>)> {
) -> crate::Result<(Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>)> {
let component_id: golem_api_grpc::proto::golem::component::ComponentId =
component_id.clone().into();
let response = self
Expand Down Expand Up @@ -945,117 +945,120 @@ pub fn worker_error_message(error: &Error) -> String {

pub fn to_worker_metadata(
metadata: &golem_api_grpc::proto::golem::worker::WorkerMetadata,
) -> WorkerMetadata {
WorkerMetadata {
worker_id: metadata
.worker_id
.clone()
.expect("no worker_id")
.clone()
.try_into()
.expect("invalid worker_id"),
args: metadata.args.clone(),
env: metadata
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>(),
account_id: metadata
.account_id
.clone()
.expect("no account_id")
.clone()
.into(),
created_at: metadata
.created_at
.as_ref()
.expect("no created_at")
.clone()
.into(),
last_known_status: WorkerStatusRecord {
oplog_idx: OplogIndex::default(),
status: metadata.status.try_into().expect("invalid status"),
overridden_retry_config: None, // not passed through gRPC
deleted_regions: DeletedRegions::new(),
pending_invocations: vec![],
pending_updates: metadata
.updates
) -> (WorkerMetadata, Option<String>) {
(
WorkerMetadata {
worker_id: metadata
.worker_id
.clone()
.expect("no worker_id")
.clone()
.try_into()
.expect("invalid worker_id"),
args: metadata.args.clone(),
env: metadata
.env
.iter()
.filter_map(|u| match &u.update {
Some(Update::Pending(_)) => Some(TimestampedUpdateDescription {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on update record")
.clone()
.into(),
oplog_index: OplogIndex::from_u64(0),
description: UpdateDescription::Automatic {
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>(),
account_id: metadata
.account_id
.clone()
.expect("no account_id")
.clone()
.into(),
created_at: metadata
.created_at
.as_ref()
.expect("no created_at")
.clone()
.into(),
last_known_status: WorkerStatusRecord {
oplog_idx: OplogIndex::default(),
status: metadata.status.try_into().expect("invalid status"),
overridden_retry_config: None, // not passed through gRPC
deleted_regions: DeletedRegions::new(),
pending_invocations: vec![],
pending_updates: metadata
.updates
.iter()
.filter_map(|u| match &u.update {
Some(Update::Pending(_)) => Some(TimestampedUpdateDescription {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on update record")
.clone()
.into(),
oplog_index: OplogIndex::from_u64(0),
description: UpdateDescription::Automatic {
target_version: u.target_version,
},
}),
_ => None,
})
.collect(),
failed_updates: metadata
.updates
.iter()
.filter_map(|u| match &u.update {
Some(Update::Failed(failed_update)) => Some(FailedUpdateRecord {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on update record")
.clone()
.into(),
target_version: u.target_version,
},
}),
_ => None,
})
.collect(),
failed_updates: metadata
.updates
.iter()
.filter_map(|u| match &u.update {
Some(Update::Failed(failed_update)) => Some(FailedUpdateRecord {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on update record")
.clone()
.into(),
target_version: u.target_version,
details: failed_update.details.clone(),
}),
_ => None,
})
.collect(),
successful_updates: metadata
.updates
.iter()
.filter_map(|u| match &u.update {
Some(Update::Successful(_)) => Some(SuccessfulUpdateRecord {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on update record")
.clone()
.into(),
target_version: u.target_version,
}),
_ => None,
})
.collect(),
invocation_results: HashMap::new(),
current_idempotency_key: None,
component_version: metadata.component_version,
component_size: metadata.component_size,
total_linear_memory_size: metadata.total_linear_memory_size,
owned_resources: metadata
.owned_resources
.iter()
.map(|(k, v)| {
(
WorkerResourceId(*k),
WorkerResourceDescription {
created_at: v
.created_at
details: failed_update.details.clone(),
}),
_ => None,
})
.collect(),
successful_updates: metadata
.updates
.iter()
.filter_map(|u| match &u.update {
Some(Update::Successful(_)) => Some(SuccessfulUpdateRecord {
timestamp: u
.timestamp
.as_ref()
.expect("no timestamp on resource metadata")
.expect("no timestamp on update record")
.clone()
.into(),
indexed_resource_key: v.indexed.clone().map(|i| i.into()),
},
)
})
.collect(),
target_version: u.target_version,
}),
_ => None,
})
.collect(),
invocation_results: HashMap::new(),
current_idempotency_key: None,
component_version: metadata.component_version,
component_size: metadata.component_size,
total_linear_memory_size: metadata.total_linear_memory_size,
owned_resources: metadata
.owned_resources
.iter()
.map(|(k, v)| {
(
WorkerResourceId(*k),
WorkerResourceDescription {
created_at: v
.created_at
.as_ref()
.expect("no timestamp on resource metadata")
.clone()
.into(),
indexed_resource_key: v.indexed.clone().map(|i| i.into()),
},
)
})
.collect(),
},
parent: None,
},
parent: None,
}
metadata.last_error.clone(),
)
}

fn dump_component_info(path: &Path) -> golem_common::model::component_metadata::ComponentMetadata {
Expand Down Expand Up @@ -1121,15 +1124,18 @@ pub trait TestDslUnsafe {
args: Vec<String>,
env: HashMap<String, String>,
) -> Result<WorkerId, Error>;
async fn get_worker_metadata(&self, worker_id: &WorkerId) -> Option<WorkerMetadata>;
async fn get_worker_metadata(
&self,
worker_id: &WorkerId,
) -> Option<(WorkerMetadata, Option<String>)>;
async fn get_workers_metadata(
&self,
component_id: &ComponentId,
filter: Option<WorkerFilter>,
cursor: ScanCursor,
count: u64,
precise: bool,
) -> (Option<ScanCursor>, Vec<WorkerMetadata>);
) -> (Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>);
async fn delete_worker(&self, worker_id: &WorkerId) -> ();

async fn invoke(
Expand Down Expand Up @@ -1246,7 +1252,10 @@ impl<T: TestDsl + Sync> TestDslUnsafe for T {
.expect("Failed to start worker")
}

async fn get_worker_metadata(&self, worker_id: &WorkerId) -> Option<WorkerMetadata> {
async fn get_worker_metadata(
&self,
worker_id: &WorkerId,
) -> Option<(WorkerMetadata, Option<String>)> {
<T as TestDsl>::get_worker_metadata(self, worker_id)
.await
.expect("Failed to get worker metadata")
Expand All @@ -1259,7 +1268,7 @@ impl<T: TestDsl + Sync> TestDslUnsafe for T {
cursor: ScanCursor,
count: u64,
precise: bool,
) -> (Option<ScanCursor>, Vec<WorkerMetadata>) {
) -> (Option<ScanCursor>, Vec<(WorkerMetadata, Option<String>)>) {
<T as TestDsl>::get_workers_metadata(self, component_id, filter, cursor, count, precise)
.await
.expect("Failed to get workers metadata")
Expand Down
17 changes: 13 additions & 4 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,11 +1394,13 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
None
} else {
let mut first_error = None;
let mut last_error_index = idx;
let result = loop {
let oplog_entry = this.oplog_service().read(owned_worker_id, idx, 1).await;
match oplog_entry.first_key_value() {
Some((_, OplogEntry::Error { error, .. })) => {
retry_count += 1;
last_error_index = idx;
if first_error.is_none() {
first_error = Some(error.clone());
}
Expand All @@ -1409,7 +1411,8 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
break Some(LastError {
error: first_error.unwrap(),
retry_count,
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
stderr: recover_stderr_logs(this, owned_worker_id, last_error_index)
.await,
});
}
}
Expand All @@ -1424,7 +1427,12 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
break Some(LastError {
error,
retry_count,
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
stderr: recover_stderr_logs(
this,
owned_worker_id,
last_error_index,
)
.await,
})
}
None => break None,
Expand All @@ -1436,7 +1444,8 @@ async fn last_error_and_retry_count<T: HasOplogService + HasConfig>(
break Some(LastError {
error,
retry_count,
stderr: recover_stderr_logs(this, owned_worker_id, idx).await,
stderr: recover_stderr_logs(this, owned_worker_id, last_error_index)
.await,
})
}
None => break None,
Expand Down Expand Up @@ -1488,7 +1497,7 @@ pub(crate) async fn recover_stderr_logs<T: HasOplogService + HasConfig>(
}
}
stderr_entries.reverse();
stderr_entries.join("\n")
stderr_entries.join("")
}

/// Indicates which step of the http request handling is responsible for closing an open
Expand Down
16 changes: 9 additions & 7 deletions golem-worker-executor-base/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,13 +820,15 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
)
.await?;

let result: Vec<golem::worker::WorkerMetadata> = workers
.into_iter()
.map(|worker| {
let status = worker.last_known_status.clone();
Self::create_proto_metadata(worker, status, None)
})
.collect();
let mut result = Vec::new();

for worker in workers {
let status = worker.last_known_status.clone();
let last_error_and_retry_count =
Ctx::get_last_error_and_retry_count(self, &worker.owned_worker_id()).await;
let metadata = Self::create_proto_metadata(worker, status, last_error_and_retry_count);
result.push(metadata);
}

Ok((
new_cursor.map(|cursor| Cursor {
Expand Down
Loading

0 comments on commit bc07cf9

Please sign in to comment.