Skip to content

Commit

Permalink
[batch] restricts operations to GCS buckets using hot storage (#13200)
Browse files Browse the repository at this point in the history
Closes #13003.
  • Loading branch information
iris-garden authored Sep 13, 2023
1 parent adda775 commit c95f2d1
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 107 deletions.
2 changes: 2 additions & 0 deletions hail/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ python/hail/docs/vds/hail.vds.filter_chromosomes.rst
python/hail/docs/vds/hail.vds.filter_intervals.rst
python/hail/docs/vds/hail.vds.filter_samples.rst
python/hail/docs/vds/hail.vds.filter_variants.rst
python/hail/docs/vds/hail.vds.impute_sex_chr_ploidy_from_interval_coverage.rst
python/hail/docs/vds/hail.vds.impute_sex_chromosome_ploidy.rst
python/hail/docs/vds/hail.vds.interval_coverage.rst
python/hail/docs/vds/hail.vds.lgt_to_gt.rst
python/hail/docs/vds/hail.vds.read_vds.rst
python/hail/docs/vds/hail.vds.sample_qc.rst
python/hail/docs/vds/hail.vds.split_multi.rst
python/hail/docs/vds/hail.vds.store_ref_block_max_length.rst
python/hail/docs/vds/hail.vds.to_dense_mt.rst
python/hail/docs/vds/hail.vds.to_merged_sparse_mt.rst
python/hail/docs/vds/hail.vds.local_to_global.rst
Expand Down
6 changes: 3 additions & 3 deletions hail/python/hail/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ def __init__(self):
self._references = {}

@abc.abstractmethod
def stop(self):
pass
def validate_file(self, uri: str):
raise NotImplementedError

@abc.abstractmethod
def validate_file_scheme(self, url):
def stop(self):
pass

@abc.abstractmethod
Expand Down
7 changes: 4 additions & 3 deletions hail/python/hail/backend/local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from hailtop.utils import find_spark_home
from hailtop.fs.router_fs import RouterFS
from hailtop.aiotools.validators import validate_file


_installed = False
Expand Down Expand Up @@ -181,6 +182,9 @@ def __init__(self, tmpdir, log, quiet, append, branching_factor,

self._initialize_flags({})

def validate_file(self, uri: str) -> None:
validate_file(uri, self._fs.afs)

def jvm(self):
return self._jvm

Expand Down Expand Up @@ -213,9 +217,6 @@ def register_ir_function(self,
def _is_registered_ir_function_name(self, name: str) -> bool:
return name in self._registered_ir_function_names

def validate_file_scheme(self, url):
pass

def stop(self):
self._jhc.stop()
self._jhc = None
Expand Down
22 changes: 12 additions & 10 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from ..builtin_references import BUILTIN_REFERENCES
from ..ir import BaseIR
from ..utils import ANY_REGION
from hailtop.aiotools.validators import validate_file


ReferenceGenomeConfig = Dict[str, Any]
Expand Down Expand Up @@ -205,7 +206,8 @@ async def create(*,
name_prefix: Optional[str] = None,
token: Optional[str] = None,
regions: Optional[List[str]] = None,
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None):
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None,
gcs_bucket_allow_list: Optional[List[str]] = None):
billing_project = configuration_of(ConfigVariable.BATCH_BILLING_PROJECT, billing_project, None)
if billing_project is None:
raise ValueError(
Expand All @@ -216,7 +218,10 @@ async def create(*,
gcs_requester_pays_configuration = get_gcs_requester_pays_configuration(
gcs_requester_pays_configuration=gcs_requester_pays_configuration,
)
async_fs = RouterAsyncFS(gcs_kwargs={'gcs_requester_pays_configuration': gcs_requester_pays_configuration})
async_fs = RouterAsyncFS(
gcs_kwargs={'gcs_requester_pays_configuration': gcs_requester_pays_configuration},
gcs_bucket_allow_list=gcs_bucket_allow_list
)
sync_fs = RouterFS(async_fs)
if batch_client is None:
batch_client = await aiohb.BatchClient.create(billing_project, _token=token)
Expand Down Expand Up @@ -279,7 +284,7 @@ async def create(*,
worker_cores=worker_cores,
worker_memory=worker_memory,
name_prefix=name_prefix or '',
regions=regions,
regions=regions
)
sb._initialize_flags(flags)
return sb
Expand Down Expand Up @@ -321,6 +326,9 @@ def __init__(self,
self.name_prefix = name_prefix
self.regions = regions

def validate_file(self, uri: str) -> None:
validate_file(uri, self._async_fs, validate_scheme=True)

def debug_info(self) -> Dict[str, Any]:
return {
'jar_spec': str(self.jar_spec),
Expand All @@ -343,12 +351,6 @@ def fs(self) -> FS:
def logger(self):
return log

def validate_file_scheme(self, url):
assert isinstance(self._async_fs, RouterAsyncFS)
if self._async_fs.get_scheme(url) == 'file':
raise ValueError(
f'Found local filepath {url} when using Query on Batch. Specify a remote filepath instead.')

def stop(self):
async_to_blocking(self._async_fs.close())
async_to_blocking(self.async_bc.close())
Expand Down Expand Up @@ -663,7 +665,7 @@ async def inputs(infile, _):
def add_sequence(self, name, fasta_file, index_file): # pylint: disable=unused-argument
# FIXME Not only should this be in the cloud, it should be in the *right* cloud
for blob in (fasta_file, index_file):
self.validate_file_scheme(blob)
self.validate_file(blob)

def remove_sequence(self, name): # pylint: disable=unused-argument
pass
Expand Down
12 changes: 9 additions & 3 deletions hail/python/hail/backend/spark_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from hail.ir.renderer import CSERenderer
from hail.table import Table
from hail.matrixtable import MatrixTable
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.aiotools.validators import validate_file

from .py4j_backend import Py4JBackend, handle_java_exception
from ..hail_logging import Logger
Expand Down Expand Up @@ -242,6 +244,13 @@ def __init__(self, idempotent, sc, spark_conf, app_name, master,

self._initialize_flags({})

self._router_async_fs = RouterAsyncFS(
gcs_kwargs={"gcs_requester_pays_configuration": gcs_requester_pays_project}
)

def validate_file(self, uri: str) -> None:
validate_file(uri, self._router_async_fs)

def jvm(self):
return self._jvm

Expand All @@ -251,9 +260,6 @@ def hail_package(self):
def utils_package_object(self):
return self._utils_package_object

def validate_file_scheme(self, url):
pass

def stop(self):
self._jbackend.close()
self._jhc.stop()
Expand Down
22 changes: 15 additions & 7 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Union, Tuple, List
from typing import Optional, Union, Tuple, List, Dict
import warnings
import sys
import os
Expand Down Expand Up @@ -170,7 +170,8 @@ def stop(self):
worker_cores=nullable(oneof(str, int)),
worker_memory=nullable(str),
gcs_requester_pays_configuration=nullable(oneof(str, sized_tupleof(str, sequenceof(str)))),
regions=nullable(sequenceof(str)))
regions=nullable(sequenceof(str)),
gcs_bucket_allow_list=nullable(dictof(str, sequenceof(str))))
def init(sc=None,
app_name=None,
master=None,
Expand All @@ -195,7 +196,8 @@ def init(sc=None,
worker_cores=None,
worker_memory=None,
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None,
regions: Optional[List[str]] = None):
regions: Optional[List[str]] = None,
gcs_bucket_allow_list: Optional[Dict[str, List[str]]] = None):
"""Initialize and configure Hail.
This function will be called with default arguments if any Hail functionality is used. If you
Expand Down Expand Up @@ -311,7 +313,9 @@ def init(sc=None,
List of regions to run jobs in when using the Batch backend. Use :data:`.ANY_REGION` to specify any region is allowed
or use `None` to use the underlying default regions from the hailctl environment configuration. For example, use
`hailctl config set batch/regions region1,region2` to set the default regions to use.
gcs_bucket_allow_list:
A list of buckets that Hail should be permitted to read from or write to, even if their default policy is to
use "cold" storage. Should look like ``["bucket1", "bucket2"]``.
"""
if Env._hc:
if idempotent:
Expand Down Expand Up @@ -347,7 +351,8 @@ def init(sc=None,
worker_memory=worker_memory,
name_prefix=app_name,
gcs_requester_pays_configuration=gcs_requester_pays_configuration,
regions=regions
regions=regions,
gcs_bucket_allow_list=gcs_bucket_allow_list
))
if backend == 'spark':
return init_spark(
Expand Down Expand Up @@ -469,7 +474,8 @@ def init_spark(sc=None,
name_prefix=nullable(str),
token=nullable(str),
gcs_requester_pays_configuration=nullable(oneof(str, sized_tupleof(str, sequenceof(str)))),
regions=nullable(sequenceof(str))
regions=nullable(sequenceof(str)),
gcs_bucket_allow_list=nullable(sequenceof(str))
)
async def init_batch(
*,
Expand All @@ -492,6 +498,7 @@ async def init_batch(
token: Optional[str] = None,
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None,
regions: Optional[List[str]] = None,
gcs_bucket_allow_list: Optional[List[str]] = None
):
from hail.backend.service_backend import ServiceBackend
# FIXME: pass local_tmpdir and use on worker and driver
Expand All @@ -506,7 +513,8 @@ async def init_batch(
name_prefix=name_prefix,
token=token,
regions=regions,
gcs_requester_pays_configuration=gcs_requester_pays_configuration)
gcs_requester_pays_configuration=gcs_requester_pays_configuration,
gcs_bucket_allow_list=gcs_bucket_allow_list)

log = _get_log(log)
if tmpdir is None:
Expand Down
53 changes: 53 additions & 0 deletions hail/python/hail/docs/configuration_reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
.. role:: python(code)
:language: python
:class: highlight

.. role:: bash(code)
:language: bash
:class: highlight

.. _sec-configuration-reference:

Configuration Reference
=======================

Configuration variables can be set for Hail Query by:

#. passing them as keyword arguments to :func:`.init`,
#. running a command of the form :bash:`hailctl config set <VARIABLE_NAME> <VARIABLE_VALUE>` from the command line, or
#. setting them as shell environment variables by running a command of the form
:bash:`export <VARIABLE_NAME>=<VARIABLE_VALUE>` in a terminal, which will set the variable for the current terminal
session.

Each method for setting configuration variables listed above overrides variables set by any and all methods below it.
For example, setting a configuration variable by passing it to :func:`.init` will override any values set for the
variable using either :bash:`hailctl` or shell environment variables.

.. warning::
Some environment variables are shared between Hail Query and Hail Batch. Setting one of these variables via
:func:`.init`, :bash:`hailctl`, or environment variables will affect both Query and Batch. However, when
instantiating a class specific to one of the two, passing configuration to that class will not affect the other.
For example, if one value for :python:`gcs_bucket_allow_list` is passed to :func:`.init`, a different value
may be passed to the constructor for Batch's :python:`ServiceBackend`, which will only affect that instance of the
class (which can only be used within Batch), and won't affect Query.

Supported Configuration Variables
---------------------------------

.. list-table:: GCS Bucket Allowlist
:widths: 50 50

* - Keyword Argument Name
- :python:`gcs_bucket_allow_list`
* - Keyword Argument Format
- :python:`["bucket1", "bucket2"]`
* - :bash:`hailctl` Variable Name
- :bash:`gcs/bucket_allow_list`
* - Environment Variable Name
- :bash:`HAIL_GCS_BUCKET_ALLOW_LIST`
* - :bash:`hailctl` and Environment Variable Format
- :bash:`bucket1,bucket2`
* - Effect
- Prevents Hail Query from erroring if the default storage policy for any of the given locations is to use cold storage.
* - Shared between Query and Batch
- Yes
1 change: 1 addition & 0 deletions hail/python/hail/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Contents
Hail on the Cloud <hail_on_the_cloud>
Tutorials <tutorials-landing>
Reference (Python API) <root_api>
Configuration Reference <configuration_reference>
Overview <overview/index>
How-To Guides <guides>
Cheatsheets <cheatsheets>
Expand Down
10 changes: 5 additions & 5 deletions hail/python/hail/linalg/blockmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def write(self, path, overwrite=False, force_row_major=False, stage_locally=Fals
If ``True``, major output will be written to temporary local storage
before being copied to ``output``.
"""
hl.current_backend().validate_file_scheme(path)
hl.current_backend().validate_file(path)

writer = BlockMatrixNativeWriter(path, overwrite, force_row_major, stage_locally)
Env.backend().execute(BlockMatrixWrite(self._bmir, writer))
Expand Down Expand Up @@ -647,7 +647,7 @@ def checkpoint(self, path, overwrite=False, force_row_major=False, stage_locally
If ``True``, major output will be written to temporary local storage
before being copied to ``output``.
"""
hl.current_backend().validate_file_scheme(path)
hl.current_backend().validate_file(path)
self.write(path, overwrite, force_row_major, stage_locally)
return BlockMatrix.read(path, _assert_type=self._bmir._type)

Expand Down Expand Up @@ -729,7 +729,7 @@ def write_from_entry_expr(entry_expr, path, overwrite=False, mean_impute=False,
block_size: :obj:`int`, optional
Block size. Default given by :meth:`.BlockMatrix.default_block_size`.
"""
hl.current_backend().validate_file_scheme(path)
hl.current_backend().validate_file(path)

if not block_size:
block_size = BlockMatrix.default_block_size()
Expand Down Expand Up @@ -1193,7 +1193,7 @@ def tofile(self, uri):
--------
:meth:`.to_numpy`
"""
hl.current_backend().validate_file_scheme(uri)
hl.current_backend().validate_file(uri)

_check_entries_size(self.n_rows, self.n_cols)

Expand Down Expand Up @@ -1975,7 +1975,7 @@ def export(path_in, path_out, delimiter='\t', header=None, add_index=False, para
Describes which entries to export. One of:
``'full'``, ``'lower'``, ``'strict_lower'``, ``'upper'``, ``'strict_upper'``.
"""
hl.current_backend().validate_file_scheme(path_out)
hl.current_backend().validate_file(path_out)

export_type = ExportType.default(parallel)

Expand Down
4 changes: 2 additions & 2 deletions hail/python/hail/matrixtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2677,7 +2677,7 @@ def checkpoint(self, output: str, overwrite: bool = False, stage_locally: bool =
--------
>>> dataset = dataset.checkpoint('output/dataset_checkpoint.mt')
"""
hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
self.write(output=output, overwrite=overwrite, stage_locally=stage_locally, _codec_spec=_codec_spec)
Expand Down Expand Up @@ -2727,7 +2727,7 @@ def write(self, output: str, overwrite: bool = False, stage_locally: bool = Fals
If ``True``, overwrite an existing file at the destination.
"""

hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

if _partitions is not None:
_partitions, _partitions_type = hl.utils._dumps_partitions(_partitions, self.row_key.dtype)
Expand Down
8 changes: 4 additions & 4 deletions hail/python/hail/methods/impex.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def export_gen(dataset, output, precision=4, gp=None, id1=None, id2=None,

require_biallelic(dataset, 'export_gen')

hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

if gp is None:
if 'GP' in dataset.entry and dataset.GP.dtype == tarray(tfloat64):
Expand Down Expand Up @@ -238,7 +238,7 @@ def export_bgen(mt, output, gp=None, varid=None, rsid=None, parallel=None, compr
require_row_key_variant(mt, 'export_bgen')
require_col_key_str(mt, 'export_bgen')

hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

if gp is None:
if 'GP' in mt.entry and mt.GP.dtype == tarray(tfloat64):
Expand Down Expand Up @@ -364,7 +364,7 @@ def export_plink(dataset, output, call=None, fam_id=None, ind_id=None, pat_id=No

require_biallelic(dataset, 'export_plink', tolerate_generic_locus=True)

hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

if ind_id is None:
require_col_key_str(dataset, "export_plink")
Expand Down Expand Up @@ -539,7 +539,7 @@ def export_vcf(dataset, output, append_to_header=None, parallel=None, metadata=N
**Note**: This feature is experimental, and the interface and defaults
may change in future versions.
"""
hl.current_backend().validate_file_scheme(output)
hl.current_backend().validate_file(output)

_, ext = os.path.splitext(output)
if ext == '.gz':
Expand Down
Loading

0 comments on commit c95f2d1

Please sign in to comment.