diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py index 5fec3a31b8ddd..6a4345bfe2ed8 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py @@ -1702,6 +1702,7 @@ def _prepare_query_configuration( schema_update_options: Iterable | None = None, priority: str | None = None, time_partitioning: dict | None = None, + range_partitioning: dict | None = None, api_resource_configs: dict | None = None, cluster_fields: list[str] | None = None, encryption_configuration: dict | None = None, @@ -1714,6 +1715,10 @@ def _prepare_query_configuration( if time_partitioning is None: time_partitioning = {} + if range_partitioning is None: + range_partitioning = {} + if time_partitioning and range_partitioning: + raise ValueError("Only one of time_partitioning or range_partitioning can be set.") if not api_resource_configs: api_resource_configs = self.hook.api_resource_configs @@ -1766,6 +1771,7 @@ def _prepare_query_configuration( (maximum_billing_tier, "maximumBillingTier", None, int), (maximum_bytes_billed, "maximumBytesBilled", None, float), (time_partitioning, "timePartitioning", {}, dict), + (range_partitioning, "rangePartitioning", {}, dict), (schema_update_options, "schemaUpdateOptions", None, list), (destination_dataset_table, "destinationTable", None, dict), (cluster_fields, "clustering", None, dict), diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 0b778cdae4335..cf57ddf6b4f7b 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -144,6 +144,9 @@ class GCSToBigQueryOperator(BaseOperator): partition by field, type and expiration as per API specifications. Note that 'field' is not available in concurrency with dataset.table$partition. + Ignored if 'range_partitioning' is set. + :param range_partitioning: configure optional range partitioning fields i.e. + partition by field and integer interval as per API specifications. :param cluster_fields: Request that the result of this load be stored sorted by one or more columns. BigQuery supports clustering for both partitioned and non-partitioned tables. The order of columns given determines the sort order. @@ -219,6 +222,7 @@ def __init__( src_fmt_configs=None, external_table=False, time_partitioning=None, + range_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, @@ -246,6 +250,10 @@ def __init__( src_fmt_configs = {} if time_partitioning is None: time_partitioning = {} + if range_partitioning is None: + range_partitioning = {} + if range_partitioning and time_partitioning: + raise ValueError("Only one of time_partitioning or range_partitioning can be set.") self.bucket = bucket self.source_objects = source_objects self.schema_object = schema_object @@ -283,6 +291,7 @@ def __init__( self.schema_update_options = schema_update_options self.src_fmt_configs = src_fmt_configs self.time_partitioning = time_partitioning + self.range_partitioning = range_partitioning self.cluster_fields = cluster_fields self.autodetect = autodetect self.encryption_configuration = encryption_configuration @@ -627,6 +636,8 @@ def _use_existing_table(self): ) if self.time_partitioning: self.configuration["load"].update({"timePartitioning": self.time_partitioning}) + if self.range_partitioning: + self.configuration["load"].update({"rangePartitioning": self.range_partitioning}) if self.cluster_fields: self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}}) diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py index b5d7310ff4f32..650eed7a0359c 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py @@ -819,13 +819,20 @@ def test_create_table_succeed(self, mock_bq_client, mock_table): @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Table.from_api_repr") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client") - def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table): + @pytest.mark.parametrize( + "partitioning", + [ + {"timePartitioning": {"field": "created", "type": "DAY"}}, + {"rangePartitioning": {"field": "grade", "range": {"start": 0, "end": 100, "interval": 20}}}, + ], + ) + def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table, partitioning): schema_fields = [ {"name": "id", "type": "STRING", "mode": "REQUIRED"}, {"name": "name", "type": "STRING", "mode": "NULLABLE"}, {"name": "created", "type": "DATE", "mode": "REQUIRED"}, + {"name": "grade", "type": "INTEGER", "mode": "REQUIRED"}, ] - time_partitioning = {"field": "created", "type": "DAY"} cluster_fields = ["name"] body = { "tableReference": { @@ -834,9 +841,9 @@ def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table): "datasetId": DATASET_ID, }, "schema": {"fields": schema_fields}, - "timePartitioning": time_partitioning, "clustering": {"fields": cluster_fields}, } + body.update(partitioning) self.hook.create_table( project_id=PROJECT_ID, dataset_id=DATASET_ID, diff --git a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py index 9f2fd692eecc9..ee2cc8ff2c204 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py @@ -221,6 +221,31 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook hook.return_value.insert_job.assert_has_calls(calls) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_two_partitionings_should_fail(self, hook): + hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=REAL_JOB_ID, error_result=False), + REAL_JOB_ID, + ] + hook.return_value.generate_job_id.return_value = REAL_JOB_ID + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + with pytest.raises( + ValueError, match=r"Only one of time_partitioning or range_partitioning can be set." + ): + GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + schema_fields=SCHEMA_FIELDS, + max_id_key=MAX_ID_KEY, + write_disposition=WRITE_DISPOSITION, + external_table=False, + project_id=JOB_PROJECT_ID, + time_partitioning={"field": "created", "type": "DAY"}, + range_partitioning={"field": "grade", "range": {"start": 0, "end": 100, "interval": 20}}, + ) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): hook.return_value.insert_job.side_effect = [