diff --git a/crates/application/src/application_function_runner/metrics.rs b/crates/application/src/application_function_runner/metrics.rs index c516fd8f..f8b9318f 100644 --- a/crates/application/src/application_function_runner/metrics.rs +++ b/crates/application/src/application_function_runner/metrics.rs @@ -1,4 +1,7 @@ -use common::types::UdfType; +use common::types::{ + ModuleEnvironment, + UdfType, +}; use metrics::{ log_counter, log_counter_with_tags, @@ -9,6 +12,7 @@ use metrics::{ register_convex_counter, register_convex_gauge, register_convex_histogram, + MetricTag, StatusTimer, STATUS_LABEL, }; @@ -71,9 +75,14 @@ register_convex_gauge!( APPLICATION_FUNCTION_RUNNER_OUTSTANDING_TOTAL, "The number of currently outstanding functions of a given type. Includes both running and \ waiting functions", - &["udf_type", "state"] + &["udf_type", "state", "env_type"] ); -pub fn log_outstanding_functions(total: usize, udf_type: UdfType, state: OutstandingFunctionState) { +pub fn log_outstanding_functions( + total: usize, + env: ModuleEnvironment, + udf_type: UdfType, + state: OutstandingFunctionState, +) { let state_tag = metric_tag_const(match state { OutstandingFunctionState::Running => "state:running", OutstandingFunctionState::Waiting => "state:waiting", @@ -81,7 +90,7 @@ pub fn log_outstanding_functions(total: usize, udf_type: UdfType, state: Outstan log_gauge_with_tags( &APPLICATION_FUNCTION_RUNNER_OUTSTANDING_TOTAL, total as f64, - vec![udf_type.metric_tag(), state_tag], + vec![udf_type.metric_tag(), state_tag, env.metric_tag()], ) } @@ -89,24 +98,40 @@ register_convex_histogram!( APPLICATION_FUNCTION_RUNNER_TOTAL_SECONDS, "The total time it took to execute a function. This includes wait time and run time. The \ metric is also logged for isolate client code path so we can compare apples to apples.", - &[STATUS_LABEL[0], "udf_type"] + &[STATUS_LABEL[0], "udf_type", "env_type"] ); -pub fn function_total_timer(udf_type: UdfType) -> StatusTimer { +pub fn function_total_timer(env: ModuleEnvironment, udf_type: UdfType) -> StatusTimer { let mut timer = StatusTimer::new(&APPLICATION_FUNCTION_RUNNER_TOTAL_SECONDS); timer.add_tag(udf_type.metric_tag()); + timer.add_tag(env.metric_tag()); timer } +trait ModuleEnvironmentExt { + fn metric_tag(&self) -> MetricTag; +} + +impl ModuleEnvironmentExt for ModuleEnvironment { + fn metric_tag(&self) -> MetricTag { + let value = match self { + ModuleEnvironment::Isolate => "env_type:isolate", + ModuleEnvironment::Node => "env_type:node", + ModuleEnvironment::Invalid => "env_type:invalid", + }; + metric_tag_const(value) + } +} + register_convex_counter!( APPLICATION_FUNCTION_RUNNER_WAIT_TIMEOUT_TOTAL, "Total number with running a function has timed out due to instance concurrency limits.", - &["udf_type"], + &["udf_type", "env_type"], ); -pub fn log_function_wait_timeout(udf_type: UdfType) { +pub fn log_function_wait_timeout(env: ModuleEnvironment, udf_type: UdfType) { log_counter_with_tags( &APPLICATION_FUNCTION_RUNNER_WAIT_TIMEOUT_TOTAL, 1, - vec![udf_type.metric_tag()], + vec![udf_type.metric_tag(), env.metric_tag()], ); } diff --git a/crates/application/src/application_function_runner/mod.rs b/crates/application/src/application_function_runner/mod.rs index bc893838..e351a52b 100644 --- a/crates/application/src/application_function_runner/mod.rs +++ b/crates/application/src/application_function_runner/mod.rs @@ -19,9 +19,10 @@ use common::{ identity::InertIdentity, knobs::{ APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT, - APPLICATION_MAX_CONCURRENT_ACTIONS, APPLICATION_MAX_CONCURRENT_MUTATIONS, + APPLICATION_MAX_CONCURRENT_NODE_ACTIONS, APPLICATION_MAX_CONCURRENT_QUERIES, + APPLICATION_MAX_CONCURRENT_V8_ACTIONS, ISOLATE_MAX_USER_HEAP_SIZE, UDF_EXECUTOR_OCC_INITIAL_BACKOFF, UDF_EXECUTOR_OCC_MAX_BACKOFF, @@ -225,16 +226,19 @@ impl FunctionRouter { database, system_env_vars, query_limiter: Arc::new(Limiter::new( + ModuleEnvironment::Isolate, UdfType::Query, *APPLICATION_MAX_CONCURRENT_QUERIES, )), mutation_limiter: Arc::new(Limiter::new( + ModuleEnvironment::Isolate, UdfType::Mutation, *APPLICATION_MAX_CONCURRENT_MUTATIONS, )), action_limiter: Arc::new(Limiter::new( + ModuleEnvironment::Isolate, UdfType::Action, - *APPLICATION_MAX_CONCURRENT_ACTIONS, + *APPLICATION_MAX_CONCURRENT_V8_ACTIONS, )), } } @@ -255,7 +259,8 @@ impl FunctionRouter { context: ExecutionContext, ) -> anyhow::Result<(Transaction, FunctionOutcome)> { anyhow::ensure!(udf_type == UdfType::Query || udf_type == UdfType::Mutation); - let timer = function_total_timer(udf_type); + // All queries and mutations are run in the isolate environment. + let timer = function_total_timer(ModuleEnvironment::Isolate, udf_type); let (tx, outcome) = self .function_runner_execute(tx, path_and_args, udf_type, journal, context, None) .await?; @@ -271,7 +276,6 @@ impl FunctionRouter { log_line_sender: mpsc::UnboundedSender, context: ExecutionContext, ) -> anyhow::Result { - let timer = function_total_timer(UdfType::Action); let (_, outcome) = self .function_runner_execute( tx, @@ -289,11 +293,10 @@ impl FunctionRouter { outcome ) }; - timer.finish(); Ok(outcome) } - // Execute using the function runner. Can be used for all Udf types including + // Execute using the function runner. Can be used for v8 udfs other than http // actions. async fn function_runner_execute( &self, @@ -316,17 +319,9 @@ impl FunctionRouter { UdfType::Action => &self.action_limiter, UdfType::HttpAction => anyhow::bail!("Function runner does not support http actions"), }; - let mut request_guard = limiter.start(); - select_biased! { - _ = request_guard.acquire_permit().fuse() => {}, - _ = self.rt.wait(*APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT) => { - log_function_wait_timeout(udf_type); - anyhow::bail!(ErrorMetadata::overloaded( - "TooManyConcurrentRequests", - "Too many concurrent requests, backoff and try again.", - )); - }, - } + + let request_guard = limiter.acquire_permit_with_timeout(&self.rt).await?; + let timer = function_run_timer(udf_type); let (function_tx, outcome, usage_stats) = self .function_runner @@ -384,6 +379,7 @@ impl FunctionRouter { // and log gauges for the number of waiting and currently running functions. struct Limiter { udf_type: UdfType, + env: ModuleEnvironment, // Used to limit running functions. semaphore: Semaphore, @@ -394,9 +390,10 @@ struct Limiter { } impl Limiter { - fn new(udf_type: UdfType, total_permits: usize) -> Self { + fn new(env: ModuleEnvironment, udf_type: UdfType, total_permits: usize) -> Self { let limiter = Self { udf_type, + env, semaphore: Semaphore::new(total_permits), total_permits, total_outstanding: AtomicUsize::new(0), @@ -406,6 +403,24 @@ impl Limiter { limiter } + async fn acquire_permit_with_timeout<'a, RT: Runtime>( + &'a self, + rt: &'a RT, + ) -> anyhow::Result> { + let mut request_guard = self.start(); + select_biased! { + _ = request_guard.acquire_permit().fuse() => {}, + _ = rt.wait(*APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT) => { + log_function_wait_timeout(self.env, self.udf_type); + anyhow::bail!(ErrorMetadata::overloaded( + "TooManyConcurrentRequests", + "Too many concurrent requests, backoff and try again.", + )); + }, + } + Ok(request_guard) + } + fn start(&self) -> RequestGuard { self.total_outstanding.fetch_add(1, Ordering::SeqCst); // Update the gauge to account for the newly waiting request. @@ -423,8 +438,18 @@ impl Limiter { .total_outstanding .load(Ordering::SeqCst) .saturating_sub(running); - log_outstanding_functions(running, self.udf_type, OutstandingFunctionState::Running); - log_outstanding_functions(waiting, self.udf_type, OutstandingFunctionState::Waiting); + log_outstanding_functions( + running, + self.env, + self.udf_type, + OutstandingFunctionState::Running, + ); + log_outstanding_functions( + waiting, + self.env, + self.udf_type, + OutstandingFunctionState::Waiting, + ); } } @@ -463,6 +488,11 @@ impl<'a> Drop for RequestGuard<'a> { } } +/// Executes UDFs for backends. +/// +/// This struct directly executes http and node actions. Queries, Mutations and +/// v8 Actions are instead routed through the FunctionRouter and its +/// FunctionRunner implementation. pub struct ApplicationFunctionRunner { runtime: RT, pub(crate) database: Database, @@ -480,6 +510,7 @@ pub struct ApplicationFunctionRunner { cache_manager: CacheManager, system_env_vars: BTreeMap, + node_action_limiter: Limiter, } impl HeapSize for ApplicationFunctionRunner { @@ -529,6 +560,11 @@ impl ApplicationFunctionRunner { function_log, cache_manager, system_env_vars, + node_action_limiter: Limiter::new( + ModuleEnvironment::Node, + UdfType::Action, + *APPLICATION_MAX_CONCURRENT_NODE_ACTIONS, + ), } } @@ -1072,6 +1108,8 @@ impl ApplicationFunctionRunner { .await? .context("Missing a valid module_version")?; let (log_line_sender, log_line_receiver) = mpsc::unbounded(); + + let timer = function_total_timer(module_version.environment, UdfType::Action); match module_version.environment { ModuleEnvironment::Isolate => { // TODO: This is the only use case of clone. We should get rid of clone, @@ -1098,6 +1136,7 @@ impl ApplicationFunctionRunner { let memory_in_mb: u64 = (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20)) .try_into() .unwrap(); + timer.finish(); Ok(ActionCompletion { outcome, execution_time: start.elapsed(), @@ -1110,6 +1149,10 @@ impl ApplicationFunctionRunner { }) }, ModuleEnvironment::Node => { + let _request_guard = self + .node_action_limiter + .acquire_permit_with_timeout(&self.runtime) + .await?; let mut source_maps = BTreeMap::new(); if let Some(source_map) = module_version.source_map.clone() { source_maps.insert(name.module().clone(), source_map); @@ -1204,6 +1247,7 @@ impl ApplicationFunctionRunner { syscall_trace: node_outcome.syscall_trace, udf_server_version, }; + timer.finish(); let memory_in_mb = node_outcome.memory_used_in_mb; Ok(ActionCompletion { outcome, diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index b3abeb8f..4765604b 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -48,11 +48,11 @@ use common::{ execution_context::ExecutionContext, http::fetch::FetchClient, knobs::{ + APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS, BACKEND_ISOLATE_ACTIVE_THREADS_PERCENT, MAX_JOBS_CANCEL_BATCH, SNAPSHOT_LIST_LIMIT, UDF_ISOLATE_MAX_EXEC_THREADS, - V8_ACTION_MAX_ISOLATE_EXEC_THREADS, }, log_lines::run_function_and_collect_log_lines, log_streaming::LogSender, @@ -510,7 +510,7 @@ impl Application { let actions_isolate = IsolateClient::new( runtime.clone(), actions_isolate_worker, - *V8_ACTION_MAX_ISOLATE_EXEC_THREADS, + *APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS, true, instance_name.clone(), instance_secret, diff --git a/crates/common/src/knobs.rs b/crates/common/src/knobs.rs index 7169d92e..f2831b3e 100644 --- a/crates/common/src/knobs.rs +++ b/crates/common/src/knobs.rs @@ -570,18 +570,6 @@ pub static ISOLATE_IDLE_TIMEOUT: LazyLock = pub static V8_ACTION_SYSTEM_TIMEOUT: LazyLock = LazyLock::new(|| Duration::from_secs(env_config("V8_ACTION_SYSTEM_TIMEOUT_SECONDS", 15))); -/// Number of threads to execute V8 actions. -pub static V8_ACTION_MAX_ISOLATE_EXEC_THREADS: LazyLock = LazyLock::new(|| { - env_config( - "V8_ACTION_MAX_ISOLATE_EXEC_THREADS", - if cfg!(any(test, feature = "testing")) { - 2 - } else { - 16 - }, - ) -}); - /// The maximum amount of time pub static APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT: LazyLock = LazyLock::new(|| { @@ -591,18 +579,89 @@ pub static APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT: LazyLock = )) }); -/// The maximum number of concurrent queries that can be ran by application. +/// The maximum number of queries that can be run concurrently by an +/// application. +/// +/// This is a per backend limit applied before FunctionRunner implementations. +/// +/// The value here may be overridden by big brain. pub static APPLICATION_MAX_CONCURRENT_QUERIES: LazyLock = LazyLock::new(|| env_config("APPLICATION_MAX_CONCURRENT_QUERIES", 16)); -/// The maximum number of concurrent mutations that can be ran by application. +/// The maximum number of mutations that can be run concurrently by an +/// application. +/// +/// This is a per backend limit applied before FunctionRunner implementations. +/// +/// The value here may be overridden by big brain. pub static APPLICATION_MAX_CONCURRENT_MUTATIONS: LazyLock = LazyLock::new(|| env_config("APPLICATION_MAX_CONCURRENT_MUTATIONS", 16)); -/// The maximum number of concurrent actions that can be ran by application. -pub static APPLICATION_MAX_CONCURRENT_ACTIONS: LazyLock = +/// The maximum number of v8 actions that can be run concurrently by an +/// application. +/// +/// This is a higher level limit applied before FunctionRunner implementations. +/// +/// This does NOT apply to: +/// 1. Http actions +/// 2. Node actions +/// +/// Node actions are limited by the APPLICATION_MAX_CONCURRENT_NODE_ACTIONS +/// knob. Http actions are limited by APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS +/// knob. +/// +/// The value here may be overridden by big brain. +pub static APPLICATION_MAX_CONCURRENT_V8_ACTIONS: LazyLock = LazyLock::new(|| { + env_config( + "APPLICATION_MAX_CONCURRENT_V8_ACTIONS", + *APPLICATION_MAX_CONCURRENT_ACTIONS, + ) +}); +// TODO(CX-6067): Remove APPLICATION_MAX_CONCURRENT_ACTIONS and merge the +// default into *_V8_ACTIONS +static APPLICATION_MAX_CONCURRENT_ACTIONS: LazyLock = LazyLock::new(|| env_config("APPLICATION_MAX_CONCURRENT_ACTIONS", 16)); +/// The maximum number of node actions that can be run concurrently by an +/// application +/// +/// Node actions are not sent through FunctionRunner implementations, so this is +/// a limit on the number of actions sent to AWS. AWS also has a global maximum +/// number of total concurrent actions across all backends. If we hit the AWS +/// limit, we'll see 429 error responses for node actions. +/// +/// The value here may be overridden by big brain. +// TODO(CX-6067): Reduce this back down to 16. +pub static APPLICATION_MAX_CONCURRENT_NODE_ACTIONS: LazyLock = + LazyLock::new(|| env_config("APPLICATION_MAX_CONCURRENT_NODE_ACTIONS", 1000)); + +/// Number of threads to execute V8 actions. +/// +/// Http actions are not sent through FunctionRunner implementations. This is a +/// maximum on the number of http actions that will be executed in process in a +/// particular backend. +/// +/// The value here may be overridden by big brain. +pub static APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS: LazyLock = LazyLock::new(|| { + env_config( + "APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS", + *V8_ACTION_MAX_ISOLATE_EXEC_THREADS, + ) +}); + +// TODO(CX-6067): Remove V8_ACTION_MAX_ISOLATE_EXEC_THREADS and merge the +// default into *_HTTP_ACTIONS +static V8_ACTION_MAX_ISOLATE_EXEC_THREADS: LazyLock = LazyLock::new(|| { + env_config( + "V8_ACTION_MAX_ISOLATE_EXEC_THREADS", + if cfg!(any(test, feature = "testing")) { + 2 + } else { + 16 + }, + ) +}); + /// Set a 64MB limit on the heap size. pub static ISOLATE_MAX_USER_HEAP_SIZE: LazyLock = LazyLock::new(|| env_config("ISOLATE_MAX_USER_HEAP_SIZE", 1 << 26));