From 1f0ce7a19600fd9790d432b12783da825d14079f Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 14 Oct 2024 18:49:54 +0200 Subject: [PATCH] Fixes and tests for oplog enumeration --- Cargo.toml | 2 +- golem-cli/Cargo.toml | 2 +- golem-cli/tests/worker.rs | 34 +- golem-common/src/model/exports.rs | 5 +- golem-common/src/model/public_oplog.rs | 410 ++++++++++++++++++ .../src/model/public_oplog/mod.rs | 5 +- golem-worker-executor-base/tests/api.rs | 3 + .../tests/observability.rs | 4 +- integration-tests/tests/worker.rs | 4 +- 9 files changed, 459 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 64b2f4432..b17b0b33b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ futures-core = "0.3.29" futures-util = "0.3.29" git-version = "0.3.9" golem-wasm-ast = "1.0.1" -golem-wasm-rpc = { version = "1.0.5", default-features = false, features = [ +golem-wasm-rpc = { version = "1.0.6", default-features = false, features = [ "host", ] } diff --git a/golem-cli/Cargo.toml b/golem-cli/Cargo.toml index 76630eb46..b0d40de91 100644 --- a/golem-cli/Cargo.toml +++ b/golem-cli/Cargo.toml @@ -43,7 +43,7 @@ futures-util = { workspace = true } golem-examples = "1.0.6" golem-wasm-ast = { workspace = true } golem-wasm-rpc = { workspace = true } -golem-wasm-rpc-stubgen = { version = "1.0.5", optional = true } +golem-wasm-rpc-stubgen = { version = "1.0.6", optional = true } h2 = "0.3.24" http = { workspace = true } humansize = { workspace = true } diff --git a/golem-cli/tests/worker.rs b/golem-cli/tests/worker.rs index c51e220c6..7ef54f04e 100644 --- a/golem-cli/tests/worker.rs +++ b/golem-cli/tests/worker.rs @@ -19,6 +19,8 @@ use crate::{RefKind, Tracing}; use anyhow::anyhow; use assert2::assert; use golem_cli::model::component::ComponentView; +use golem_cli::model::text::fmt::TextFormat; +use golem_cli::model::text::worker::WorkerGetView; use golem_cli::model::{Format, IdempotencyKey, WorkersMetadataResponseView}; use golem_client::model::{PublicOplogEntry, UpdateRecord}; use golem_common::model::TargetWorkerId; @@ -29,8 +31,10 @@ use indoc::formatdoc; use serde_json::{json, Value}; use std::io::{BufRead, BufReader}; use std::sync::Arc; +use std::thread::sleep; use std::time::Duration; use test_r::core::{DynamicTestRegistration, TestType}; +use tracing::debug; inherit_test_dep!(EnvBasedTestDependencies); inherit_test_dep!(Tracing); @@ -1037,6 +1041,25 @@ fn worker_update( UpdateRecord::FailedUpdate(_) => panic!("Update failed"), }; + loop { + let mut cli_args = vec!["worker".to_owned(), "get".to_owned()]; + cli_args.append(&mut worker_ref(cfg, ref_kind, &component, &worker_name)); + let result: WorkerGetView = cli.run(&cli_args)?; + + if result.0.component_version == target_version { + break; + } else { + debug!("Waiting for worker to update..."); + sleep(Duration::from_secs(2)); + } + } + + let mut cli_args = vec!["worker".to_owned(), "oplog".to_owned()]; + cli_args.append(&mut worker_ref(cfg, ref_kind, &component, &worker_name)); + let result: Vec<(u64, PublicOplogEntry)> = cli.run(&cli_args)?; + result.print(); + + assert_eq!(result.len(), 3); // create, enqueue update, successful update assert_eq!(original_updates, 0); assert_eq!(target_version, 1); Ok(()) @@ -1106,6 +1129,11 @@ fn worker_invoke_indexed_resource( json!({"typ":{"items":[{"type":"U64"}],"type":"Tuple"},"value":[3]}) ); + let mut cli_args = vec!["worker".to_owned(), "oplog".to_owned()]; + cli_args.append(&mut worker_ref(cfg, ref_kind, &component, &worker_name)); + let result: Vec<(u64, PublicOplogEntry)> = cli.run(&cli_args)?; + result.print(); + Ok(()) } @@ -1231,7 +1259,11 @@ fn worker_get_oplog( let result: Vec<(u64, PublicOplogEntry)> = cli.run(&["worker", "oplog", &cfg.arg('W', "worker"), &url.to_string()])?; - assert_eq!(result.len(), 14); + result.print(); + + // Whether there is an "enqueued invocation" entry or just directly started invocation + // depends on timing + assert!(result.len() >= 12 && result.len() <= 14); Ok(()) } diff --git a/golem-common/src/model/exports.rs b/golem-common/src/model/exports.rs index 7f7648254..5c701ff1b 100644 --- a/golem-common/src/model/exports.rs +++ b/golem-common/src/model/exports.rs @@ -108,7 +108,10 @@ pub fn find_resource_site( resource: resource_name.to_string(), }, ); - if functions.iter().any(|f| f.name == constructor.to_string()) { + if functions + .iter() + .any(|f| f.name == constructor.function().function_name()) + { Some(site) } else { None diff --git a/golem-common/src/model/public_oplog.rs b/golem-common/src/model/public_oplog.rs index c458beda7..e5ec13fc5 100644 --- a/golem-common/src/model/public_oplog.rs +++ b/golem-common/src/model/public_oplog.rs @@ -94,7 +94,9 @@ pub struct DetailsParameter { #[derive(Clone, Debug, Serialize, PartialEq, Deserialize, Object)] pub struct PublicRetryConfig { pub max_attempts: u32, + #[serde(with = "humantime_serde")] pub min_delay: Duration, + #[serde(with = "humantime_serde")] pub max_delay: Duration, pub multiplier: f64, pub max_jitter_factor: Option, @@ -1172,3 +1174,411 @@ impl From for golem_api_grpc::proto::golem::worker::OplogCursor { } } } + +#[cfg(test)] +mod tests { + + use super::{ + ChangeRetryPolicyParameters, CreateParameters, DescribeResourceParameters, Empty, + EndRegionParameters, ErrorParameters, ExportedFunctionCompletedParameters, + ExportedFunctionInvokedParameters, ExportedFunctionParameters, FailedUpdateParameters, + GrowMemoryParameters, ImportedFunctionInvokedParameters, JumpParameters, LogParameters, + PendingUpdateParameters, PendingWorkerInvocationParameters, PublicOplogEntry, + PublicRetryConfig, PublicUpdateDescription, PublicWorkerInvocation, + PublicWrappedFunctionType, ResourceParameters, SnapshotBasedUpdateParameters, + SuccessfulUpdateParameters, TimestampParameter, + }; + use crate::model::oplog::{LogLevel, OplogIndex, WorkerResourceId}; + use crate::model::regions::OplogRegion; + use crate::model::{AccountId, ComponentId, IdempotencyKey, Timestamp, WorkerId}; + use golem_wasm_ast::analysis::analysed_type::{field, list, r#enum, record, s16, str, u64}; + use golem_wasm_rpc::{Value, ValueAndType}; + use poem_openapi::types::ToJSON; + use test_r::test; + use uuid::Uuid; + + fn rounded_ts(ts: Timestamp) -> Timestamp { + Timestamp::from(ts.to_millis()) + } + + #[test] + fn create_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Create(CreateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + worker_id: WorkerId { + component_id: ComponentId( + Uuid::parse_str("13A5C8D4-F05E-4E23-B982-F4D413E181CB").unwrap(), + ), + worker_name: "test1".to_string(), + }, + component_version: 1, + args: vec!["a".to_string(), "b".to_string()], + env: vec![("x".to_string(), "y".to_string())] + .into_iter() + .collect(), + account_id: AccountId { + value: "account_id".to_string(), + }, + parent: Some(WorkerId { + component_id: ComponentId( + Uuid::parse_str("13A5C8D4-F05E-4E23-B982-F4D413E181CB").unwrap(), + ), + worker_name: "test2".to_string(), + }), + component_size: 100_000_000, + initial_total_linear_memory_size: 200_000_000, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn imported_function_invoked_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::ImportedFunctionInvoked(ImportedFunctionInvokedParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + function_name: "test".to_string(), + request: ValueAndType { + value: Value::String("test".to_string()), + typ: str(), + }, + response: ValueAndType { + value: Value::List(vec![Value::U64(1)]), + typ: list(u64()), + }, + wrapped_function_type: PublicWrappedFunctionType::ReadRemote(Empty), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn exported_function_invoked_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::ExportedFunctionInvoked(ExportedFunctionInvokedParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + function_name: "test".to_string(), + request: vec![ + ValueAndType { + value: Value::String("test".to_string()), + typ: str(), + }, + ValueAndType { + value: Value::Record(vec![Value::S16(1), Value::S16(-1)]), + typ: record(vec![field("x", s16()), field("y", s16())]), + }, + ], + idempotency_key: IdempotencyKey::new("idempotency_key".to_string()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn exported_function_completed_serialization_poem_serde_equivalence() { + let entry = + PublicOplogEntry::ExportedFunctionCompleted(ExportedFunctionCompletedParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + response: ValueAndType { + value: Value::Enum(1), + typ: r#enum(&["red", "green", "blue"]), + }, + consumed_fuel: 100, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn suspend_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Suspend(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn error_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Error(ErrorParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + error: "test".to_string(), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn no_op_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::NoOp(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn jump_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Jump(JumpParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + jump: OplogRegion { + start: OplogIndex::from_u64(1), + end: OplogIndex::from_u64(2), + }, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn interrupted_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Interrupted(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn exited_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Exited(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn change_retry_policy_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::ChangeRetryPolicy(ChangeRetryPolicyParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + new_policy: PublicRetryConfig { + max_attempts: 10, + min_delay: std::time::Duration::from_secs(1), + max_delay: std::time::Duration::from_secs(10), + multiplier: 2.0, + max_jitter_factor: Some(0.1), + }, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn begin_atomic_region_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::BeginAtomicRegion(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn end_atomic_region_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::EndAtomicRegion(EndRegionParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + begin_index: OplogIndex::from_u64(1), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn begin_remote_write_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::BeginRemoteWrite(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn end_remote_write_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::EndRemoteWrite(EndRegionParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + begin_index: OplogIndex::from_u64(1), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn pending_worker_invocation_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::PendingWorkerInvocation(PendingWorkerInvocationParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + invocation: PublicWorkerInvocation::ExportedFunction(ExportedFunctionParameters { + idempotency_key: IdempotencyKey::new("idempotency_key".to_string()), + full_function_name: "test".to_string(), + function_input: Some(vec![ + ValueAndType { + value: Value::String("test".to_string()), + typ: str(), + }, + ValueAndType { + value: Value::Record(vec![Value::S16(1), Value::S16(-1)]), + typ: record(vec![field("x", s16()), field("y", s16())]), + }, + ]), + }), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn pending_update_serialization_poem_serde_equivalence_1() { + let entry = PublicOplogEntry::PendingUpdate(PendingUpdateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + target_version: 1, + description: PublicUpdateDescription::SnapshotBased(SnapshotBasedUpdateParameters { + payload: "test".as_bytes().to_vec(), + }), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn pending_update_serialization_poem_serde_equivalence_2() { + let entry = PublicOplogEntry::PendingUpdate(PendingUpdateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + target_version: 1, + description: PublicUpdateDescription::Automatic(Empty), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn successful_update_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::SuccessfulUpdate(SuccessfulUpdateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + target_version: 1, + new_component_size: 100_000_000, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn failed_update_serialization_poem_serde_equivalence_1() { + let entry = PublicOplogEntry::FailedUpdate(FailedUpdateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + target_version: 1, + details: Some("test".to_string()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn failed_update_serialization_poem_serde_equivalence_2() { + let entry = PublicOplogEntry::FailedUpdate(FailedUpdateParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + target_version: 1, + details: None, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn grow_memory_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::GrowMemory(GrowMemoryParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + delta: 100_000_000, + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn create_resource_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::CreateResource(ResourceParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + id: WorkerResourceId(100), + }); + + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn drop_resource_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::DropResource(ResourceParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + id: WorkerResourceId(100), + }); + + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn describe_resource_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::DescribeResource(DescribeResourceParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + id: WorkerResourceId(100), + resource_name: "test".to_string(), + resource_params: vec![ + ValueAndType { + value: Value::String("test".to_string()), + typ: str(), + }, + ValueAndType { + value: Value::Record(vec![Value::S16(1), Value::S16(-1)]), + typ: record(vec![field("x", s16()), field("y", s16())]), + }, + ], + }); + + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn log_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Log(LogParameters { + timestamp: rounded_ts(Timestamp::now_utc()), + level: LogLevel::Stderr, + context: "test".to_string(), + message: "test".to_string(), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + fn restart_serialization_poem_serde_equivalence() { + let entry = PublicOplogEntry::Restart(TimestampParameter { + timestamp: rounded_ts(Timestamp::now_utc()), + }); + let serialized = entry.to_json_string(); + let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } +} diff --git a/golem-worker-executor-base/src/model/public_oplog/mod.rs b/golem-worker-executor-base/src/model/public_oplog/mod.rs index 2b9b0a457..d8128510e 100644 --- a/golem-worker-executor-base/src/model/public_oplog/mod.rs +++ b/golem-worker-executor-base/src/model/public_oplog/mod.rs @@ -807,10 +807,7 @@ fn encode_host_function_request_as_value( Ok(payload.into_value_and_type()) } "golem::rpc::wasm-rpc::generate_unique_local_worker_id" => no_payload(), - "cli::preopens::get_directories" => { - let payload: Result, SerializableError> = try_deserialize(bytes)?; - Ok(payload.into_value_and_type()) - } + "cli::preopens::get_directories" => no_payload(), "filesystem::types::descriptor::stat" => { let payload: String = try_deserialize(bytes)?; Ok(payload.into_value_and_type()) diff --git a/golem-worker-executor-base/tests/api.rs b/golem-worker-executor-base/tests/api.rs index 167350d42..82234df4b 100644 --- a/golem-worker-executor-base/tests/api.rs +++ b/golem-worker-executor-base/tests/api.rs @@ -2393,6 +2393,8 @@ async fn counter_resource_test_2( let (metadata2, _) = executor.get_worker_metadata(&worker_id).await.unwrap(); + let _oplog = executor.get_oplog(&worker_id, OplogIndex::INITIAL).await; + drop(executor); check!(result1 == Ok(vec![Value::U64(5)])); @@ -2461,6 +2463,7 @@ async fn counter_resource_test_2( ) }) .collect::>(); + check!(resources2 == vec![]); } diff --git a/golem-worker-executor-base/tests/observability.rs b/golem-worker-executor-base/tests/observability.rs index 25855729c..714cd4e48 100644 --- a/golem-worker-executor-base/tests/observability.rs +++ b/golem-worker-executor-base/tests/observability.rs @@ -76,7 +76,9 @@ async fn get_oplog_1( drop(executor); - assert_eq!(oplog.len(), 14); + // Whether there is an "enqueued invocation" entry or just directly started invocation + // depends on timing + assert!(oplog.len() >= 12 && oplog.len() <= 14); assert!(matches!(oplog[0], PublicOplogEntry::Create(_))); assert_eq!( oplog diff --git a/integration-tests/tests/worker.rs b/integration-tests/tests/worker.rs index 31778847a..a18c94a55 100644 --- a/integration-tests/tests/worker.rs +++ b/integration-tests/tests/worker.rs @@ -1205,7 +1205,9 @@ async fn get_oplog_1(deps: &EnvBasedTestDependencies, _tracing: &Tracing) { let oplog = deps.get_oplog(&worker_id, OplogIndex::INITIAL).await; - assert_eq!(oplog.len(), 14); + // Whether there is an "enqueued invocation" entry or just directly started invocation + // depends on oplog + assert!(oplog.len() >= 12 && oplog.len() <= 14); assert!(matches!(oplog[0], PublicOplogEntry::Create(_))); assert_eq!( oplog