From 98c4e08e25f1baa0134c61147ee04f736917ef28 Mon Sep 17 00:00:00 2001 From: allada Date: Sat, 16 Apr 2022 11:46:28 -0500 Subject: [PATCH] Add worker config definitions and rename Metadata to Priority * Adds the initial config for workers to use use * Rename PropertyType::Metadata to PropertyType::Priority --- cas/scheduler/platform_property_manager.rs | 16 ++-- config/cas_server.rs | 74 ++++++++++++++++++- config/examples/basic_cas.json | 39 +++++++++- config/examples/filesystem_cas.json | 16 ++-- .../s3_backend_with_local_fast_cas.json | 14 ++-- 5 files changed, 135 insertions(+), 24 deletions(-) diff --git a/cas/scheduler/platform_property_manager.rs b/cas/scheduler/platform_property_manager.rs index 90843ca5a..8f0f848e9 100644 --- a/cas/scheduler/platform_property_manager.rs +++ b/cas/scheduler/platform_property_manager.rs @@ -43,14 +43,17 @@ impl PlatformProperties { /// Minimum - Means that workers must have at least this number available. When /// a worker executes a task that has this value, the worker will have /// this value subtracted from the available resources of the worker. -/// Metadata - Means the worker is given this information, but does not restrict +/// Priority - Means the worker is given this information, but does not restrict /// what workers can take this value. However, the worker must have the /// associated key present to be matched. +/// TODO(allada) In the future this will be used by the scheduler and +/// worker to cause the scheduler to prefer certain workers over others, +/// but not restrict them based on these values. #[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)] pub enum PlatformPropertyValue { Exact(String), Minimum(u64), - Metadata(String), + Priority(String), } impl PlatformPropertyValue { @@ -66,9 +69,10 @@ impl PlatformPropertyValue { } false } - // Metadata is used to pass info to the worker and not restrict which - // workers can be selected. - PlatformPropertyValue::Metadata(_) => true, + // Priority is used to pass info to the worker and not restrict which + // workers can be selected, but might be used to prefer certain workers + // over others. + PlatformPropertyValue::Priority(_) => true, // Success exact case is handled above. PlatformPropertyValue::Exact(_) => false, } @@ -105,7 +109,7 @@ impl PlatformPropertyManager { })?, )), PropertyType::Exact => Ok(PlatformPropertyValue::Exact(value.to_string())), - PropertyType::Metadata => Ok(PlatformPropertyValue::Metadata(value.to_string())), + PropertyType::Priority => Ok(PlatformPropertyValue::Priority(value.to_string())), }; } Err(make_input_err!("Unknown platform property '{}'", key)) diff --git a/config/cas_server.rs b/config/cas_server.rs index b068fcd0f..b46b32032 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -82,7 +82,7 @@ pub struct CapabilitiesConfig { /// When the scheduler matches tasks to workers that are capable of running /// the task, this value will be used to determine how the property is treated. -#[derive(Deserialize, Debug, Clone, Copy)] +#[derive(Deserialize, Debug, Clone, Copy, Hash, Eq, PartialEq)] pub enum PropertyType { /// Requires the platform property to be a u64 and when the scheduler looks /// for appropriate worker nodes that are capable of executing the task, @@ -97,7 +97,10 @@ pub enum PropertyType { /// Does not restrict on this value and instead will be passed to the worker /// as an informational piece. - Metadata, + /// TODO(allada) In the future this will be used by the scheduler and worker + /// to cause the scheduler to prefer certain workers over others, but not + /// restrict them based on these values. + Priority, } #[derive(Deserialize, Debug)] @@ -173,12 +176,79 @@ pub struct ServerConfig { pub services: Option, } +#[allow(non_camel_case_types)] +#[derive(Deserialize, Debug)] +pub enum WrokerProperty { + /// List of static values. + /// Note: Generally there should only ever be 1 value, but if the platform + /// property key is PropertyType::Priority it may have more than one value. + values(Vec), + + /// A dynamic configuration. The string will be executed as a command + /// (not sell) and will be split by "\n" (new line character). + query_cmd(String), +} + +#[derive(Deserialize, Debug)] +pub struct LocalWorker { + /// Endpoint which the worker will connect to the scheduler's WorkerApiService. + pub worker_api_endpoint: String, + + /// The command to execute on every execution request. This will be parsed as + /// a command + arguments (not shell). + /// '$@' has a special meaning in that all the arguments will expand into this + /// location. + /// Example: "run.sh $@" and a job with command: "sleep 5" will result in a + /// command like: "run.sh sleep 5". + pub entrypoint_cmd: String, + + /// Reference to a filesystem store (runtime enforced). This store will be used + /// to store a local cache of files for optimization purposes. + /// Must be a reference to a store implementing backends::FilesystemStore. + pub local_filesystem_store_ref: StoreRefName, + + /// Underlying CAS store that the worker will use to download CAS artifacts. + /// This store must have the same objects that the scheduler/client-cas uses. + /// The scheduler will send job requests that will reference objects stored + /// in this store. If the objects referenced in the job request don't exist + /// in this store an error may be returned. + pub cas_store: StoreRefName, + + /// Underlying AC store that the worker will use to publish execution results + /// into. Objects placed in this store should be reachable from the + /// scheduler/client-cas after they have finished updating. + pub ac_store: StoreRefName, + + /// The directory work jobs will be executed from. This directory will be fully + /// managed by the worker service and will be purged on startup. + /// This directory and the directory referenced in local_filesystem_store_ref's + /// backends::FilesystemStore::content_path must be on the same filesystem. + /// Hardlinks will be used when placing files that are accessible to the jobs + /// that are sourced from local_filesystem_store_ref's content_path. + pub work_directory: String, + + /// Properties of this worker. This configuration will be sent to the scheduler + /// and used to tell the scheduler to restrict what should be executed on this + /// worker. + pub platform_properties: HashMap, +} + +#[allow(non_camel_case_types)] +#[derive(Deserialize, Debug)] +pub enum WorkerConfig { + /// A worker type that executes jobs locally on this machine. + local(LocalWorker), +} + #[derive(Deserialize, Debug)] pub struct CasConfig { /// List of stores available to use in this config. /// The keys can be used in other configs when needing to reference a store. pub stores: HashMap, + /// Worker configurations used to execute jobs. + pub workers: Option>, + /// List of schedulers available to use in this config. /// The keys can be used in other configs when needing to reference a /// scheduler. diff --git a/config/examples/basic_cas.json b/config/examples/basic_cas.json index cf4702a1c..14923a134 100644 --- a/config/examples/basic_cas.json +++ b/config/examples/basic_cas.json @@ -15,6 +15,16 @@ "max_bytes": 10000000, } } + }, + "WORKER_FILESYSTEM_STORE": { + "filesystem": { + "content_path": "/tmp/turbo_cache/data/content_path-cas", + "temp_path": "/tmp/turbo_cache/data/tmp_path-cas", + "eviction_policy": { + // 2gb. + "max_bytes": 2000000000, + } + } } }, "schedulers": { @@ -34,10 +44,37 @@ "cpu_arch": "Exact", "cpu_model": "Exact", "kernel_version": "Exact", - "docker_image": "Metadata", + "docker_image": "Priority", } } }, + "workers": [{ + "local": { + "worker_api_endpoint": "127.0.0.1:50061", + "entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@", + "local_filesystem_store_ref": "WORKER_FILESYSTEM_STORE", + "cas_store": "CAS_MAIN_STORE", + "ac_store": "AC_MAIN_STORE", + "work_directory": "/tmp/turbo_cache/work", + "platform_properties": { + "cpu_count": { + "values": ["1"], + }, + "memory_kb": { + "values": ["500000"], + }, + "network_kbps": { + "values": ["100000"], + }, + "cpu_arch": { + "values": ["x86_64"], + }, + "docker_image": { + "query_cmd": "docker images --format {{.Repository}}:{{.Tag}}", + } + } + } + }], "servers": [{ "listen_address": "0.0.0.0:50051", "services": { diff --git a/config/examples/filesystem_cas.json b/config/examples/filesystem_cas.json index 0bb2b4a46..75601f266 100644 --- a/config/examples/filesystem_cas.json +++ b/config/examples/filesystem_cas.json @@ -1,5 +1,5 @@ // This configuration will place objects in various folders in -// `/tmp/turbo_cache_data`. It will store all data on disk and +// `/tmp/turbo_cache/data`. It will store all data on disk and // allows for restarts of the underlying service. It is optimized // so objects are compressed, deduplicated and uses some in-memory // optimizations for certain hot paths. @@ -12,8 +12,8 @@ }, "backend": { "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-cas", - "temp_path": "/tmp/turbo_cache_data/tmp_path-cas", + "content_path": "/tmp/turbo_cache/data/content_path-cas", + "temp_path": "/tmp/turbo_cache/data/tmp_path-cas", "eviction_policy": { // 2gb. "max_bytes": 2000000000, @@ -56,8 +56,8 @@ }, "slow": { "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-index", - "temp_path": "/tmp/turbo_cache_data/tmp_path-index", + "content_path": "/tmp/turbo_cache/data/content_path-index", + "temp_path": "/tmp/turbo_cache/data/tmp_path-index", "eviction_policy": { // 500mb. "max_bytes": 500000000, @@ -81,8 +81,8 @@ }, "AC_MAIN_STORE": { "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-ac", - "temp_path": "/tmp/turbo_cache_data/tmp_path-ac", + "content_path": "/tmp/turbo_cache/data/content_path-ac", + "temp_path": "/tmp/turbo_cache/data/tmp_path-ac", "eviction_policy": { // 500mb. "max_bytes": 500000000, @@ -107,7 +107,7 @@ "cpu_arch": "Exact", "cpu_model": "Exact", "kernel_version": "Exact", - "docker_image": "Metadata", + "docker_image": "Priority", } } }, diff --git a/config/examples/s3_backend_with_local_fast_cas.json b/config/examples/s3_backend_with_local_fast_cas.json index 0d019203c..2e4110002 100644 --- a/config/examples/s3_backend_with_local_fast_cas.json +++ b/config/examples/s3_backend_with_local_fast_cas.json @@ -8,8 +8,8 @@ "fast_slow": { "fast": { "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-index", - "temp_path": "/tmp/turbo_cache_data/tmp_path-index", + "content_path": "/tmp/turbo_cache/data/content_path-index", + "temp_path": "/tmp/turbo_cache/data/tmp_path-index", "eviction_policy": { // 500mb. "max_bytes": 500000000, @@ -40,8 +40,8 @@ "fast_slow": { "fast": { "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-content", - "temp_path": "/tmp/turbo_cache_data/tmp_path-content", + "content_path": "/tmp/turbo_cache/data/content_path-content", + "temp_path": "/tmp/turbo_cache/data/tmp_path-content", "eviction_policy": { // 2gb. "max_bytes": 2000000000, @@ -81,8 +81,8 @@ } }, "filesystem": { - "content_path": "/tmp/turbo_cache_data/content_path-ac", - "temp_path": "/tmp/turbo_cache_data/tmp_path-ac", + "content_path": "/tmp/turbo_cache/data/content_path-ac", + "temp_path": "/tmp/turbo_cache/data/tmp_path-ac", "eviction_policy": { // 500mb. "max_bytes": 500000000, @@ -122,7 +122,7 @@ "cpu_arch": "Exact", "cpu_model": "Exact", "kernel_version": "Exact", - "docker_image": "Metadata", + "docker_image": "Priority", } } },