diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 9aaa238b618f7..9cf33278cff84 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -258,7 +258,9 @@ def compute_table_name(row): from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.io.gcp.bigquery_read_internal import ReadFromBigQueryRequest from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit +from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri from apache_beam.io.gcp.bigquery_tools import RetryStrategy @@ -637,7 +639,7 @@ def __init__( self.project = project self.validate = validate self.flatten_results = flatten_results - self.coder = coder + self.coder = coder or _JsonToDictCoder() self.kms_key = kms_key self.split_result = None self.options = pipeline_options @@ -1926,45 +1928,3 @@ def expand(self, pcoll): sources_to_read | SDFBoundedSourceReader() | _PassThroughThenCleanup(beam.pvalue.AsIter(cleanup_locations))) - - -class ReadFromBigQueryRequest: - """ - Class that defines data to read from BQ. - """ - def __init__( - self, - query: str = None, - use_standard_sql: bool = False, - table: Union[str, TableReference] = None, - flatten_results: bool = False): - """ - Only one of query or table should be specified. - - :param query(str): SQL query to fetch data. - :param use_standard_sql(boolean): - Specifies whether to use BigQuery's standard SQL dialect for this query. - The default value is :data:`True`. If set to :data:`False`, - the query will use BigQuery's legacy SQL dialect. - This parameter is ignored for table inputs. - :param table(str): - The ID of the table to read. The ID must contain only letters - ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should - define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``). - :param flatten_results(boolean): - Flattens all nested and repeated fields in the query results. - The default value is :data:`True`. - """ - self.flatten_results = flatten_results - self.query = query - self.use_standard_sql = use_standard_sql - self.table = table - self.validate() - - def validate(self): - if self.table is not None and self.query is not None: - raise ValueError( - 'Both a BigQuery table and a query were specified.' - ' Please specify only one of these.') - elif self.table is None and self.query is None: - raise ValueError('A BigQuery table or a query must be specified')