Skip to content

Commit

Permalink
Rollforward add a rate limiter for node actions (#23906)
Browse files Browse the repository at this point in the history
Rollforward add a rate limiter for node actions

Now with the correct nomad env variables - adds the missing max_concurrent_node_actions var to backend.nomad

This reverts commit 3c9da1c7caa47fb3476a42b31dc004a6d7e71378.

GitOrigin-RevId: 2872d0b1a8b90db1cd63b537438872a03eebb4e9
  • Loading branch information
sjudd authored and Convex, Inc. committed Mar 25, 2024
1 parent d082908 commit b0e9358
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 47 deletions.
43 changes: 34 additions & 9 deletions crates/application/src/application_function_runner/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use common::types::UdfType;
use common::types::{
ModuleEnvironment,
UdfType,
};
use metrics::{
log_counter,
log_counter_with_tags,
Expand All @@ -9,6 +12,7 @@ use metrics::{
register_convex_counter,
register_convex_gauge,
register_convex_histogram,
MetricTag,
StatusTimer,
STATUS_LABEL,
};
Expand Down Expand Up @@ -71,42 +75,63 @@ register_convex_gauge!(
APPLICATION_FUNCTION_RUNNER_OUTSTANDING_TOTAL,
"The number of currently outstanding functions of a given type. Includes both running and \
waiting functions",
&["udf_type", "state"]
&["udf_type", "state", "env_type"]
);
pub fn log_outstanding_functions(total: usize, udf_type: UdfType, state: OutstandingFunctionState) {
pub fn log_outstanding_functions(
total: usize,
env: ModuleEnvironment,
udf_type: UdfType,
state: OutstandingFunctionState,
) {
let state_tag = metric_tag_const(match state {
OutstandingFunctionState::Running => "state:running",
OutstandingFunctionState::Waiting => "state:waiting",
});
log_gauge_with_tags(
&APPLICATION_FUNCTION_RUNNER_OUTSTANDING_TOTAL,
total as f64,
vec![udf_type.metric_tag(), state_tag],
vec![udf_type.metric_tag(), state_tag, env.metric_tag()],
)
}

register_convex_histogram!(
APPLICATION_FUNCTION_RUNNER_TOTAL_SECONDS,
"The total time it took to execute a function. This includes wait time and run time. The \
metric is also logged for isolate client code path so we can compare apples to apples.",
&[STATUS_LABEL[0], "udf_type"]
&[STATUS_LABEL[0], "udf_type", "env_type"]
);
pub fn function_total_timer(udf_type: UdfType) -> StatusTimer {
pub fn function_total_timer(env: ModuleEnvironment, udf_type: UdfType) -> StatusTimer {
let mut timer = StatusTimer::new(&APPLICATION_FUNCTION_RUNNER_TOTAL_SECONDS);
timer.add_tag(udf_type.metric_tag());
timer.add_tag(env.metric_tag());
timer
}

trait ModuleEnvironmentExt {
fn metric_tag(&self) -> MetricTag;
}

impl ModuleEnvironmentExt for ModuleEnvironment {
fn metric_tag(&self) -> MetricTag {
let value = match self {
ModuleEnvironment::Isolate => "env_type:isolate",
ModuleEnvironment::Node => "env_type:node",
ModuleEnvironment::Invalid => "env_type:invalid",
};
metric_tag_const(value)
}
}

register_convex_counter!(
APPLICATION_FUNCTION_RUNNER_WAIT_TIMEOUT_TOTAL,
"Total number with running a function has timed out due to instance concurrency limits.",
&["udf_type"],
&["udf_type", "env_type"],
);
pub fn log_function_wait_timeout(udf_type: UdfType) {
pub fn log_function_wait_timeout(env: ModuleEnvironment, udf_type: UdfType) {
log_counter_with_tags(
&APPLICATION_FUNCTION_RUNNER_WAIT_TIMEOUT_TOTAL,
1,
vec![udf_type.metric_tag()],
vec![udf_type.metric_tag(), env.metric_tag()],
);
}

Expand Down
84 changes: 64 additions & 20 deletions crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use common::{
identity::InertIdentity,
knobs::{
APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT,
APPLICATION_MAX_CONCURRENT_ACTIONS,
APPLICATION_MAX_CONCURRENT_MUTATIONS,
APPLICATION_MAX_CONCURRENT_NODE_ACTIONS,
APPLICATION_MAX_CONCURRENT_QUERIES,
APPLICATION_MAX_CONCURRENT_V8_ACTIONS,
ISOLATE_MAX_USER_HEAP_SIZE,
UDF_EXECUTOR_OCC_INITIAL_BACKOFF,
UDF_EXECUTOR_OCC_MAX_BACKOFF,
Expand Down Expand Up @@ -225,16 +226,19 @@ impl<RT: Runtime> FunctionRouter<RT> {
database,
system_env_vars,
query_limiter: Arc::new(Limiter::new(
ModuleEnvironment::Isolate,
UdfType::Query,
*APPLICATION_MAX_CONCURRENT_QUERIES,
)),
mutation_limiter: Arc::new(Limiter::new(
ModuleEnvironment::Isolate,
UdfType::Mutation,
*APPLICATION_MAX_CONCURRENT_MUTATIONS,
)),
action_limiter: Arc::new(Limiter::new(
ModuleEnvironment::Isolate,
UdfType::Action,
*APPLICATION_MAX_CONCURRENT_ACTIONS,
*APPLICATION_MAX_CONCURRENT_V8_ACTIONS,
)),
}
}
Expand All @@ -255,7 +259,8 @@ impl<RT: Runtime> FunctionRouter<RT> {
context: ExecutionContext,
) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> {
anyhow::ensure!(udf_type == UdfType::Query || udf_type == UdfType::Mutation);
let timer = function_total_timer(udf_type);
// All queries and mutations are run in the isolate environment.
let timer = function_total_timer(ModuleEnvironment::Isolate, udf_type);
let (tx, outcome) = self
.function_runner_execute(tx, path_and_args, udf_type, journal, context, None)
.await?;
Expand All @@ -271,7 +276,6 @@ impl<RT: Runtime> FunctionRouter<RT> {
log_line_sender: mpsc::UnboundedSender<LogLine>,
context: ExecutionContext,
) -> anyhow::Result<ActionOutcome> {
let timer = function_total_timer(UdfType::Action);
let (_, outcome) = self
.function_runner_execute(
tx,
Expand All @@ -289,11 +293,10 @@ impl<RT: Runtime> FunctionRouter<RT> {
outcome
)
};
timer.finish();
Ok(outcome)
}

// Execute using the function runner. Can be used for all Udf types including
// Execute using the function runner. Can be used for v8 udfs other than http
// actions.
async fn function_runner_execute(
&self,
Expand All @@ -316,17 +319,9 @@ impl<RT: Runtime> FunctionRouter<RT> {
UdfType::Action => &self.action_limiter,
UdfType::HttpAction => anyhow::bail!("Function runner does not support http actions"),
};
let mut request_guard = limiter.start();
select_biased! {
_ = request_guard.acquire_permit().fuse() => {},
_ = self.rt.wait(*APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT) => {
log_function_wait_timeout(udf_type);
anyhow::bail!(ErrorMetadata::overloaded(
"TooManyConcurrentRequests",
"Too many concurrent requests, backoff and try again.",
));
},
}

let request_guard = limiter.acquire_permit_with_timeout(&self.rt).await?;

let timer = function_run_timer(udf_type);
let (function_tx, outcome, usage_stats) = self
.function_runner
Expand Down Expand Up @@ -384,6 +379,7 @@ impl<RT: Runtime> FunctionRouter<RT> {
// and log gauges for the number of waiting and currently running functions.
struct Limiter {
udf_type: UdfType,
env: ModuleEnvironment,

// Used to limit running functions.
semaphore: Semaphore,
Expand All @@ -394,9 +390,10 @@ struct Limiter {
}

impl Limiter {
fn new(udf_type: UdfType, total_permits: usize) -> Self {
fn new(env: ModuleEnvironment, udf_type: UdfType, total_permits: usize) -> Self {
let limiter = Self {
udf_type,
env,
semaphore: Semaphore::new(total_permits),
total_permits,
total_outstanding: AtomicUsize::new(0),
Expand All @@ -406,6 +403,24 @@ impl Limiter {
limiter
}

async fn acquire_permit_with_timeout<'a, RT: Runtime>(
&'a self,
rt: &'a RT,
) -> anyhow::Result<RequestGuard<'a>> {
let mut request_guard = self.start();
select_biased! {
_ = request_guard.acquire_permit().fuse() => {},
_ = rt.wait(*APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT) => {
log_function_wait_timeout(self.env, self.udf_type);
anyhow::bail!(ErrorMetadata::overloaded(
"TooManyConcurrentRequests",
"Too many concurrent requests, backoff and try again.",
));
},
}
Ok(request_guard)
}

fn start(&self) -> RequestGuard {
self.total_outstanding.fetch_add(1, Ordering::SeqCst);
// Update the gauge to account for the newly waiting request.
Expand All @@ -423,8 +438,18 @@ impl Limiter {
.total_outstanding
.load(Ordering::SeqCst)
.saturating_sub(running);
log_outstanding_functions(running, self.udf_type, OutstandingFunctionState::Running);
log_outstanding_functions(waiting, self.udf_type, OutstandingFunctionState::Waiting);
log_outstanding_functions(
running,
self.env,
self.udf_type,
OutstandingFunctionState::Running,
);
log_outstanding_functions(
waiting,
self.env,
self.udf_type,
OutstandingFunctionState::Waiting,
);
}
}

Expand Down Expand Up @@ -463,6 +488,11 @@ impl<'a> Drop for RequestGuard<'a> {
}
}

/// Executes UDFs for backends.
///
/// This struct directly executes http and node actions. Queries, Mutations and
/// v8 Actions are instead routed through the FunctionRouter and its
/// FunctionRunner implementation.
pub struct ApplicationFunctionRunner<RT: Runtime> {
runtime: RT,
pub(crate) database: Database<RT>,
Expand All @@ -480,6 +510,7 @@ pub struct ApplicationFunctionRunner<RT: Runtime> {

cache_manager: CacheManager<RT>,
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
node_action_limiter: Limiter,
}

impl<RT: Runtime> HeapSize for ApplicationFunctionRunner<RT> {
Expand Down Expand Up @@ -529,6 +560,11 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
function_log,
cache_manager,
system_env_vars,
node_action_limiter: Limiter::new(
ModuleEnvironment::Node,
UdfType::Action,
*APPLICATION_MAX_CONCURRENT_NODE_ACTIONS,
),
}
}

Expand Down Expand Up @@ -1072,6 +1108,8 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
.await?
.context("Missing a valid module_version")?;
let (log_line_sender, log_line_receiver) = mpsc::unbounded();

let timer = function_total_timer(module_version.environment, UdfType::Action);
match module_version.environment {
ModuleEnvironment::Isolate => {
// TODO: This is the only use case of clone. We should get rid of clone,
Expand All @@ -1098,6 +1136,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
let memory_in_mb: u64 = (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20))
.try_into()
.unwrap();
timer.finish();
Ok(ActionCompletion {
outcome,
execution_time: start.elapsed(),
Expand All @@ -1110,6 +1149,10 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
})
},
ModuleEnvironment::Node => {
let _request_guard = self
.node_action_limiter
.acquire_permit_with_timeout(&self.runtime)
.await?;
let mut source_maps = BTreeMap::new();
if let Some(source_map) = module_version.source_map.clone() {
source_maps.insert(name.module().clone(), source_map);
Expand Down Expand Up @@ -1204,6 +1247,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
syscall_trace: node_outcome.syscall_trace,
udf_server_version,
};
timer.finish();
let memory_in_mb = node_outcome.memory_used_in_mb;
Ok(ActionCompletion {
outcome,
Expand Down
4 changes: 2 additions & 2 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ use common::{
execution_context::ExecutionContext,
http::fetch::FetchClient,
knobs::{
APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS,
BACKEND_ISOLATE_ACTIVE_THREADS_PERCENT,
MAX_JOBS_CANCEL_BATCH,
SNAPSHOT_LIST_LIMIT,
UDF_ISOLATE_MAX_EXEC_THREADS,
V8_ACTION_MAX_ISOLATE_EXEC_THREADS,
},
log_lines::run_function_and_collect_log_lines,
log_streaming::LogSender,
Expand Down Expand Up @@ -510,7 +510,7 @@ impl<RT: Runtime> Application<RT> {
let actions_isolate = IsolateClient::new(
runtime.clone(),
actions_isolate_worker,
*V8_ACTION_MAX_ISOLATE_EXEC_THREADS,
*APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS,
true,
instance_name.clone(),
instance_secret,
Expand Down
Loading

0 comments on commit b0e9358

Please sign in to comment.