Skip to content

Commit

Permalink
Add optional independent --remote-cache-address and `--remote-cache…
Browse files Browse the repository at this point in the history
…-headers` options.

# Building wheels and fs_util will be skipped. Delete if not intended.
[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jun 16, 2022
1 parent 8278eca commit 679a2c8
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,13 @@ def __init__(
remoting_options = PyRemotingOptions(
execution_enable=execution_options.remote_execution,
store_address=execution_options.remote_store_address,
cache_address=execution_options.remote_cache_address,
execution_address=execution_options.remote_execution_address,
execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
instance_name=execution_options.remote_instance_name,
root_ca_certs_path=execution_options.remote_ca_certs_path,
store_headers=execution_options.remote_store_headers,
cache_headers=execution_options.remote_cache_headers,
store_chunk_bytes=execution_options.remote_store_chunk_bytes,
store_chunk_upload_timeout=execution_options.remote_store_chunk_upload_timeout_seconds,
store_rpc_retries=execution_options.remote_store_rpc_retries,
Expand Down
97 changes: 85 additions & 12 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ class AuthPluginState(Enum):
class AuthPluginResult:
"""The return type for a function specified via `--remote-auth-plugin`.
The returned `store_headers` and `execution_headers` will replace whatever headers Pants would
have used normally, e.g. what is set with `--remote-store-headers`. This allows you to control
the merge strategy if your plugin sets conflicting headers. Usually, you will want to preserve
the `initial_store_headers` and `initial_execution_headers` passed to the plugin.
The returned `*_headers` will replace whatever headers Pants would have used normally, e.g. what
is set with `--remote-store-headers`. This allows you to control the merge strategy if your
plugin sets conflicting headers. Usually, you will want to preserve the `initial_*_headers`
passed to the plugin.
If set, the returned `instance_name` will override by `--remote-instance-name`, `store_address`
will override `--remote-store-address`, and `execution_address` will override
Expand All @@ -136,7 +136,9 @@ class AuthPluginResult:
state: AuthPluginState
store_headers: dict[str, str]
execution_headers: dict[str, str]
cache_headers: dict[str, str] | None = None
store_address: str | None = None
cache_address: str | None = None
execution_address: str | None = None
instance_name: str | None = None
expiration: datetime | None = None
Expand All @@ -152,6 +154,7 @@ def assert_valid_address(addr: str | None, field_name: str) -> None:
)

assert_valid_address(self.store_address, "store_address")
assert_valid_address(self.cache_address, "cache_address")
assert_valid_address(self.execution_address, "execution_address")

@property
Expand All @@ -168,8 +171,10 @@ class DynamicRemoteOptions:
cache_write: bool
instance_name: str | None
store_address: str | None
cache_address: str | None
execution_address: str | None
store_headers: dict[str, str]
cache_headers: dict[str, str]
execution_headers: dict[str, str]
parallelism: int
store_rpc_concurrency: int
Expand All @@ -184,8 +189,10 @@ def disabled(cls) -> DynamicRemoteOptions:
cache_write=False,
instance_name=None,
store_address=None,
cache_address=None,
execution_address=None,
store_headers={},
cache_headers={},
execution_headers={},
parallelism=DEFAULT_EXECUTION_OPTIONS.process_execution_remote_parallelism,
store_rpc_concurrency=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_concurrency,
Expand All @@ -206,10 +213,12 @@ def from_options(
cache_read = cast(bool, bootstrap_options.remote_cache_read)
cache_write = cast(bool, bootstrap_options.remote_cache_write)
store_address = cast("str | None", bootstrap_options.remote_store_address)
cache_address = cast("str | None", bootstrap_options.remote_cache_address)
execution_address = cast("str | None", bootstrap_options.remote_execution_address)
instance_name = cast("str | None", bootstrap_options.remote_instance_name)
execution_headers = cast("dict[str, str]", bootstrap_options.remote_execution_headers)
store_headers = cast("dict[str, str]", bootstrap_options.remote_store_headers)
cache_headers = cast("dict[str, str]", bootstrap_options.remote_cache_headers)
parallelism = cast(int, bootstrap_options.process_execution_remote_parallelism)
store_rpc_concurrency = cast(int, bootstrap_options.remote_store_rpc_concurrency)
cache_rpc_concurrency = cast(int, bootstrap_options.remote_cache_rpc_concurrency)
Expand Down Expand Up @@ -248,6 +257,7 @@ def from_options(
auth_plugin_func(
initial_execution_headers=execution_headers,
initial_store_headers=store_headers,
initial_cache_headers=cache_headers,
options=full_options,
env=dict(env),
prior_result=prior_result,
Expand Down Expand Up @@ -294,6 +304,17 @@ def from_options(
)
)
store_address = auth_plugin_result.store_address
if (
auth_plugin_result.cache_address is not None
and auth_plugin_result.cache_address != cache_address
):
logger.debug(
overridden_opt_log.format(
f"--remote-cache-address={repr(cache_address)}",
repr(auth_plugin_result.cache_address),
)
)
cache_address = auth_plugin_result.cache_address
if (
auth_plugin_result.execution_address is not None
and auth_plugin_result.execution_address != execution_address
Expand All @@ -313,15 +334,18 @@ def from_options(
re.sub(r"^grpc", "http", execution_address) if execution_address else None
)
store_address = re.sub(r"^grpc", "http", store_address) if store_address else None
cache_address = re.sub(r"^grpc", "http", cache_address) if cache_address else None

opts = DynamicRemoteOptions(
execution=execution,
cache_read=cache_read,
cache_write=cache_write,
instance_name=instance_name,
store_address=store_address,
cache_address=cache_address,
execution_address=execution_address,
store_headers=store_headers,
cache_headers=cache_headers,
execution_headers=execution_headers,
parallelism=parallelism,
store_rpc_concurrency=store_rpc_concurrency,
Expand Down Expand Up @@ -365,6 +389,8 @@ class ExecutionOptions:
remote_store_rpc_concurrency: int
remote_store_batch_api_size_limit: int

remote_cache_address: str | None
remote_cache_headers: dict[str, str]
remote_cache_eager_fetch: bool
remote_cache_warnings: RemoteCacheWarningsBehavior
remote_cache_rpc_concurrency: int
Expand Down Expand Up @@ -409,6 +435,12 @@ def from_options(
remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency,
remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit,
# Remote cache setup.
remote_cache_address=(
dynamic_remote_options.cache_address or dynamic_remote_options.store_address
),
remote_cache_headers=(
dynamic_remote_options.cache_headers or dynamic_remote_options.store_headers
),
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
remote_cache_rpc_concurrency=dynamic_remote_options.cache_rpc_concurrency,
Expand Down Expand Up @@ -495,6 +527,8 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
remote_store_rpc_concurrency=128,
remote_store_batch_api_size_limit=4194304,
# Remote cache setup.
remote_cache_address=None,
remote_cache_headers={},
remote_cache_eager_fetch=True,
remote_cache_warnings=RemoteCacheWarningsBehavior.backoff,
remote_cache_rpc_concurrency=128,
Expand Down Expand Up @@ -1274,16 +1308,19 @@ class BootstrapOptions:
Format: `path.to.module:my_func`. Pants will import your module and run your
function. Update the `--pythonpath` option to ensure your file is loadable.
The function should take the kwargs `initial_store_headers: dict[str, str]`,
`initial_execution_headers: dict[str, str]`, `options: Options` (from
pants.option.options), `env: dict[str, str]`, and
`prior_result: AuthPluginResult | None`. It should return an instance of
`AuthPluginResult` from `pants.option.global_options`.
The function should take the kwargs:
* `initial_store_headers: dict[str, str]`
* `initial_cache_headers: dict[str, str]`
* `initial_execution_headers: dict[str, str]`
* `options: Options` (from pants.option.options)
* `env: dict[str, str]`
* `prior_result: AuthPluginResult | None`
It should return an instance of `AuthPluginResult` from `pants.option.global_options`.
Pants will replace the headers it would normally use with whatever your plugin
returns; usually, you should include the `initial_store_headers` and
`initial_execution_headers` in your result so that options like
`--remote-store-headers` still work.
returns; usually, you should include the `initial_*_headers` in your result so that
options like `--remote-store-headers` still work.
If you return `instance_name`, Pants will replace `--remote-instance-name`
with this value.
Expand Down Expand Up @@ -1358,6 +1395,40 @@ class BootstrapOptions:
default=DEFAULT_EXECUTION_OPTIONS.remote_store_batch_api_size_limit,
help="The maximum total size of blobs allowed to be sent in a single batch API call to the remote store.",
)
remote_cache_address = StrOption(
"--remote-cache-address",
advanced=True,
default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_cache_address),
help=softwrap(
"""
The URI of a server used for the remote "action"/process cache.
If not set, defaults to `--remote-store-address`.
Format: `scheme://host:port`. The supported schemes are `grpc` and `grpcs`, i.e. gRPC
with TLS enabled. If `grpc` is used, TLS will be disabled.
"""
),
)
remote_cache_headers = DictOption(
"--remote-cache-headers",
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_headers,
help=softwrap(
"""
Headers to set on remote cache requests.
If not set, defaults to `--remote-store-headers`.
Format: header=value. Pants may add additional headers.
See `--remote-execution-headers` and `--remote-store-headers`.
"""
),
default_help_repr=repr(DEFAULT_EXECUTION_OPTIONS.remote_cache_headers).replace(
VERSION, "<pants_version>"
),
)
remote_cache_warnings = EnumOption(
"--remote-cache-warnings",
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_warnings,
Expand Down Expand Up @@ -1813,6 +1884,7 @@ def validate_remote_address(opt_name: str) -> None:

validate_remote_address("remote_execution_address")
validate_remote_address("remote_store_address")
validate_remote_address("remote_cache_address")

# Ensure that remote headers are ASCII.
def validate_remote_headers(opt_name: str) -> None:
Expand All @@ -1831,6 +1903,7 @@ def validate_remote_headers(opt_name: str) -> None:

validate_remote_headers("remote_execution_headers")
validate_remote_headers("remote_store_headers")
validate_remote_headers("remote_cache_headers")

illegal_build_ignores = [i for i in opts.build_ignore if i.startswith("!")]
if illegal_build_ignores:
Expand Down
10 changes: 5 additions & 5 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct Core {
pub struct RemotingOptions {
pub execution_enable: bool,
pub store_address: Option<String>,
pub cache_address: Option<String>,
pub execution_address: Option<String>,
pub execution_process_cache_namespace: Option<String>,
pub instance_name: Option<String>,
Expand All @@ -89,6 +90,7 @@ pub struct RemotingOptions {
pub store_rpc_retries: usize,
pub store_rpc_concurrency: usize,
pub store_batch_api_size_limit: usize,
pub cache_headers: BTreeMap<String, String>,
pub cache_warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_eager_fetch: bool,
pub cache_rpc_concurrency: usize,
Expand Down Expand Up @@ -277,12 +279,11 @@ impl Core {
}

///
/// Wraps the given runner in any configured caches.
/// Wraps the given runner in the remote cache runner.
///
fn make_remote_cached_runner(
inner_runner: Arc<dyn CommandRunner>,
full_store: &Store,
remote_store_address: &Option<String>,
executor: &Executor,
process_execution_metadata: &ProcessMetadata,
root_ca_certs: &Option<Vec<u8>>,
Expand All @@ -294,9 +295,9 @@ impl Core {
process_execution_metadata.clone(),
executor.clone(),
full_store.clone(),
remote_store_address.as_ref().unwrap(),
remoting_opts.cache_address.as_ref().unwrap(),
root_ca_certs.clone(),
remoting_opts.store_headers.clone(),
remoting_opts.cache_headers.clone(),
Platform::current()?,
exec_strategy_opts.remote_cache_read,
exec_strategy_opts.remote_cache_write,
Expand Down Expand Up @@ -339,7 +340,6 @@ impl Core {
Some(Self::make_remote_cached_runner(
leaf_runner.clone(),
full_store,
&remoting_opts.store_address,
executor,
process_execution_metadata,
root_ca_certs,
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,13 @@ impl PyRemotingOptions {
fn __new__(
execution_enable: bool,
store_address: Option<String>,
cache_address: Option<String>,
execution_address: Option<String>,
execution_process_cache_namespace: Option<String>,
instance_name: Option<String>,
root_ca_certs_path: Option<PathBuf>,
store_headers: BTreeMap<String, String>,
cache_headers: BTreeMap<String, String>,
store_chunk_bytes: usize,
store_chunk_upload_timeout: u64,
store_rpc_retries: usize,
Expand All @@ -297,11 +299,13 @@ impl PyRemotingOptions {
Self(RemotingOptions {
execution_enable,
store_address,
cache_address,
execution_address,
execution_process_cache_namespace,
instance_name,
root_ca_certs_path,
store_headers,
cache_headers,
store_chunk_bytes,
store_chunk_upload_timeout: Duration::from_secs(store_chunk_upload_timeout),
store_rpc_retries,
Expand Down

0 comments on commit 679a2c8

Please sign in to comment.