Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,7 @@ def _prepare_query_configuration(
schema_update_options: Iterable | None = None,
priority: str | None = None,
time_partitioning: dict | None = None,
range_partitioning: dict | None = None,
api_resource_configs: dict | None = None,
cluster_fields: list[str] | None = None,
encryption_configuration: dict | None = None,
Expand All @@ -1714,6 +1715,10 @@ def _prepare_query_configuration(

if time_partitioning is None:
time_partitioning = {}
if range_partitioning is None:
range_partitioning = {}
if time_partitioning and range_partitioning:
raise ValueError("Only one of time_partitioning or range_partitioning can be set.")

if not api_resource_configs:
api_resource_configs = self.hook.api_resource_configs
Expand Down Expand Up @@ -1766,6 +1771,7 @@ def _prepare_query_configuration(
(maximum_billing_tier, "maximumBillingTier", None, int),
(maximum_bytes_billed, "maximumBytesBilled", None, float),
(time_partitioning, "timePartitioning", {}, dict),
(range_partitioning, "rangePartitioning", {}, dict),
(schema_update_options, "schemaUpdateOptions", None, list),
(destination_dataset_table, "destinationTable", None, dict),
(cluster_fields, "clustering", None, dict),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class GCSToBigQueryOperator(BaseOperator):
partition by field, type and expiration as per API specifications.
Note that 'field' is not available in concurrency with
dataset.table$partition.
Ignored if 'range_partitioning' is set.
:param range_partitioning: configure optional range partitioning fields i.e.
partition by field and integer interval as per API specifications.
:param cluster_fields: Request that the result of this load be stored sorted
by one or more columns. BigQuery supports clustering for both partitioned and
non-partitioned tables. The order of columns given determines the sort order.
Expand Down Expand Up @@ -219,6 +222,7 @@ def __init__(
src_fmt_configs=None,
external_table=False,
time_partitioning=None,
range_partitioning=None,
cluster_fields=None,
autodetect=True,
encryption_configuration=None,
Expand Down Expand Up @@ -246,6 +250,10 @@ def __init__(
src_fmt_configs = {}
if time_partitioning is None:
time_partitioning = {}
if range_partitioning is None:
range_partitioning = {}
if range_partitioning and time_partitioning:
raise ValueError("Only one of time_partitioning or range_partitioning can be set.")
self.bucket = bucket
self.source_objects = source_objects
self.schema_object = schema_object
Expand Down Expand Up @@ -283,6 +291,7 @@ def __init__(
self.schema_update_options = schema_update_options
self.src_fmt_configs = src_fmt_configs
self.time_partitioning = time_partitioning
self.range_partitioning = range_partitioning
self.cluster_fields = cluster_fields
self.autodetect = autodetect
self.encryption_configuration = encryption_configuration
Expand Down Expand Up @@ -627,6 +636,8 @@ def _use_existing_table(self):
)
if self.time_partitioning:
self.configuration["load"].update({"timePartitioning": self.time_partitioning})
if self.range_partitioning:
self.configuration["load"].update({"rangePartitioning": self.range_partitioning})

if self.cluster_fields:
self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}})
Expand Down
13 changes: 10 additions & 3 deletions providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,13 +819,20 @@ def test_create_table_succeed(self, mock_bq_client, mock_table):

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Table.from_api_repr")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table):
@pytest.mark.parametrize(
"partitioning",
[
{"timePartitioning": {"field": "created", "type": "DAY"}},
{"rangePartitioning": {"field": "grade", "range": {"start": 0, "end": 100, "interval": 20}}},
],
)
def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table, partitioning):
schema_fields = [
{"name": "id", "type": "STRING", "mode": "REQUIRED"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "created", "type": "DATE", "mode": "REQUIRED"},
{"name": "grade", "type": "INTEGER", "mode": "REQUIRED"},
]
time_partitioning = {"field": "created", "type": "DAY"}
cluster_fields = ["name"]
body = {
"tableReference": {
Expand All @@ -834,9 +841,9 @@ def test_create_table_with_extras_succeed(self, mock_bq_client, mock_table):
"datasetId": DATASET_ID,
},
"schema": {"fields": schema_fields},
"timePartitioning": time_partitioning,
"clustering": {"fields": cluster_fields},
}
body.update(partitioning)
self.hook.create_table(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook

hook.return_value.insert_job.assert_has_calls(calls)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_two_partitionings_should_fail(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=REAL_JOB_ID, error_result=False),
REAL_JOB_ID,
]
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)
with pytest.raises(
ValueError, match=r"Only one of time_partitioning or range_partitioning can be set."
):
GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
max_id_key=MAX_ID_KEY,
write_disposition=WRITE_DISPOSITION,
external_table=False,
project_id=JOB_PROJECT_ID,
time_partitioning={"field": "created", "type": "DAY"},
range_partitioning={"field": "grade", "range": {"start": 0, "end": 100, "interval": 20}},
)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
hook.return_value.insert_job.side_effect = [
Expand Down