From 2b3ab949413b85c3b96344bf0dfe0e832f88f38b Mon Sep 17 00:00:00 2001 From: Anton Nitochkin Date: Wed, 24 Sep 2025 17:57:42 +0000 Subject: [PATCH] Deprecate CreateAutoMLVideoTrainingJobOperator and removed system tests for video tracking and video training. Update generative_model_tuning system test. Update documentation for vertex ai. --- .../unit/always/test_project_structure.py | 1 + .../google/docs/operators/cloud/vertex_ai.rst | 29 ++- .../cloud/operators/vertex_ai/auto_ml.py | 7 + ...xample_vertex_ai_auto_ml_video_tracking.py | 174 ------------------ ...xample_vertex_ai_auto_ml_video_training.py | 162 ---------------- ...ample_vertex_ai_generative_model_tuning.py | 56 +++++- .../cloud/vertex_ai/resources/__init__.py | 16 ++ .../resources/video_tuning_dataset.jsonl | 1 + .../google/cloud/operators/test_vertex_ai.py | 54 +++--- 9 files changed, 116 insertions(+), 384 deletions(-) delete mode 100644 providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py delete mode 100644 providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py create mode 100644 providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py create mode 100644 providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl diff --git a/airflow-core/tests/unit/always/test_project_structure.py b/airflow-core/tests/unit/always/test_project_structure.py index 7ff2e92c5e3b3..cd8f653b924fd 100644 --- a/airflow-core/tests/unit/always/test_project_structure.py +++ b/airflow-core/tests/unit/always/test_project_structure.py @@ -428,6 +428,7 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest "airflow.providers.google.cloud.operators.automl.AutoMLDeleteModelOperator", "airflow.providers.google.cloud.operators.automl.AutoMLListDatasetOperator", "airflow.providers.google.cloud.operators.automl.AutoMLDeleteDatasetOperator", + "airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator", "airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator", "airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator", "airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator", diff --git a/providers/google/docs/operators/cloud/vertex_ai.rst b/providers/google/docs/operators/cloud/vertex_ai.rst index 831d44454e865..5bd81679dd129 100644 --- a/providers/google/docs/operators/cloud/vertex_ai.rst +++ b/providers/google/docs/operators/cloud/vertex_ai.rst @@ -265,36 +265,22 @@ put dataset id to ``dataset_id`` parameter in operator. :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator] :end-before: [END how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator] +.. warning:: + This operator is deprecated and will be removed after March 24, 2026. Please use + :class:`~airflow.providers.google.cloud.operators.vertex_ai.generative_model.SupervisedFineTuningTrainOperator`. + How to run AutoML Video Training Job :class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator` Before start running this Job you must prepare and create ``Video`` dataset. After that you should put dataset id to ``dataset_id`` parameter in operator. -.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py - :language: python - :dedent: 4 - :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] - :end-before: [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] - Additionally, you can create new version of existing AutoML Video Training Job. In this case, the result will be new version of existing Model instead of new Model created in Model Registry. This can be done by specifying ``parent_model`` parameter when running AutoML Video Training Job. -.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py - :language: python - :dedent: 4 - :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator] - :end-before: [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator] - Also you can use vertex_ai AutoML model for video tracking. -.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py - :language: python - :dedent: 4 - :start-after: [START how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator] - :end-before: [END how_to_cloud_vertex_ai_create_auto_ml_video_tracking_job_operator] - You can get a list of AutoML Training Jobs using :class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.ListAutoMLTrainingJobOperator`. @@ -620,6 +606,13 @@ The operator returns the tuned model's endpoint name in :ref:`XCom > move_dataset_file, - create_video_dataset, - ] - >> import_video_dataset - # TEST BODY - >> create_auto_ml_video_training_job - # TEST TEARDOWN - >> delete_auto_ml_video_training_job - >> delete_video_dataset - >> delete_bucket - ) - - from tests_common.test_utils.watcher import watcher - - # This test needs watcher in order to properly mark success/failure - # when "tearDown" task with trigger rule is part of the DAG - list(dag.tasks) >> watcher() - -from tests_common.test_utils.system_tests import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py deleted file mode 100644 index 34e57496910bd..0000000000000 --- a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py +++ /dev/null @@ -1,162 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -""" -Example Airflow DAG for Google Vertex AI service testing Auto ML operations. -""" - -from __future__ import annotations - -import os -from datetime import datetime - -from google.cloud.aiplatform import schema -from google.protobuf.struct_pb2 import Value - -from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( - CreateAutoMLVideoTrainingJobOperator, - DeleteAutoMLTrainingJobOperator, -) -from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( - CreateDatasetOperator, - DeleteDatasetOperator, - ImportDataOperator, -) - -try: - from airflow.sdk import TriggerRule -except ImportError: - # Compatibility for Airflow < 3.1 - from airflow.utils.trigger_rule import TriggerRule # type: ignore[no-redef,attr-defined] - -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "vertex_ai_auto_ml_operations" -REGION = "us-central1" -VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}" -MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}" - -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -VIDEO_GCS_BUCKET_NAME = f"bucket_video_{DAG_ID}_{ENV_ID}".replace("_", "-") - -VIDEO_DATASET = { - "display_name": f"video-dataset-{ENV_ID}", - "metadata_schema_uri": schema.dataset.metadata.video, - "metadata": Value(string_value="video-dataset"), -} -VIDEO_DATA_CONFIG = [ - { - "import_schema_uri": schema.dataset.ioformat.video.classification, - "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/automl/datasets/video/classification.csv"]}, - }, -] - -with DAG( - f"{DAG_ID}_video_training_job", - schedule="@once", - start_date=datetime(2021, 1, 1), - catchup=False, - tags=["example", "vertex_ai", "auto_ml"], -) as dag: - create_video_dataset = CreateDatasetOperator( - task_id="video_dataset", - dataset=VIDEO_DATASET, - region=REGION, - project_id=PROJECT_ID, - ) - video_dataset_id = create_video_dataset.output["dataset_id"] - - import_video_dataset = ImportDataOperator( - task_id="import_video_data", - dataset_id=video_dataset_id, - region=REGION, - project_id=PROJECT_ID, - import_configs=VIDEO_DATA_CONFIG, - ) - - # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] - create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator( - task_id="auto_ml_video_task", - display_name=VIDEO_DISPLAY_NAME, - prediction_type="classification", - model_type="CLOUD", - dataset_id=video_dataset_id, - model_display_name=MODEL_DISPLAY_NAME, - region=REGION, - project_id=PROJECT_ID, - ) - model_id_v1 = create_auto_ml_video_training_job.output["model_id"] - # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] - - # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator] - create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator( - task_id="auto_ml_video_v2_task", - display_name=VIDEO_DISPLAY_NAME, - prediction_type="classification", - model_type="CLOUD", - dataset_id=video_dataset_id, - model_display_name=MODEL_DISPLAY_NAME, - parent_model=model_id_v1, - region=REGION, - project_id=PROJECT_ID, - ) - # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_v2_operator] - - delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator( - task_id="delete_auto_ml_video_training_job", - training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_video_task', " - "key='training_id') }}", - region=REGION, - project_id=PROJECT_ID, - trigger_rule=TriggerRule.ALL_DONE, - ) - - delete_video_dataset = DeleteDatasetOperator( - task_id="delete_video_dataset", - dataset_id=video_dataset_id, - region=REGION, - project_id=PROJECT_ID, - trigger_rule=TriggerRule.ALL_DONE, - ) - - ( - # TEST SETUP - create_video_dataset - >> import_video_dataset - # TEST BODY - >> create_auto_ml_video_training_job - >> create_auto_ml_video_training_job_v2 - # TEST TEARDOWN - >> delete_auto_ml_video_training_job - >> delete_video_dataset - ) - - # ### Everything below this line is not part of example ### - # ### Just for system tests purpose ### - from tests_common.test_utils.watcher import watcher - - # This test needs watcher in order to properly mark success/failure - # when "tearDown" task with trigger rule is part of the DAG - list(dag.tasks) >> watcher() - -from tests_common.test_utils.system_tests import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py index 7beafbdb56e4c..8e7b21893520a 100644 --- a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py +++ b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py @@ -24,14 +24,23 @@ import os from datetime import datetime +from pathlib import Path import requests +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator + try: from airflow.sdk import task except ImportError: # Airflow 2 path from airflow.decorators import task # type: ignore[attr-defined,no-redef] +try: + from airflow.sdk import TriggerRule +except ImportError: + # Compatibility for Airflow < 3.1 + from airflow.utils.trigger_rule import TriggerRule # type: ignore[no-redef,attr-defined] from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.vertex_ai.generative_model import ( SupervisedFineTuningTrainOperator, @@ -52,9 +61,6 @@ def _get_actual_model(key) -> str: try: model_name = model["name"].split("/")[-1] splited_model_name = model_name.split("-") - if not splited_model_name[-1].isdigit(): - # We are not using model aliases because sometimes it is not guaranteed to work - continue if not source_model and "flash" in model_name: source_model = model_name elif ( @@ -88,6 +94,13 @@ def _get_actual_model(key) -> str: TRAIN_DATASET = "gs://cloud-samples-data/ai-platform/generative_ai/gemini-2_0/text/sft_train_data.jsonl" TUNED_MODEL_DISPLAY_NAME = "my_tuned_gemini_model" +BUCKET_NAME = f"bucket_tuning_dag_{PROJECT_ID}" +FILE_NAME = "video_tuning_dataset.jsonl" +UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) +TRAIN_VIDEO_DATASET = f"gs://{BUCKET_NAME}/{FILE_NAME}" +TUNED_VIDEO_MODEL_DISPLAY_NAME = "my_tuned_gemini_video_model" + + with DAG( dag_id=DAG_ID, description="Sample DAG with generative model tuning tasks.", @@ -110,6 +123,21 @@ def get_actual_model(key): get_actual_model_task = get_actual_model(get_gemini_api_key_task) + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=BUCKET_NAME, + project_id=PROJECT_ID, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=UPLOAD_FILE_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME, + ) + + delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME) + # [START how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator] sft_train_task = SupervisedFineTuningTrainOperator( task_id="sft_train_task", @@ -121,7 +149,27 @@ def get_actual_model(key): ) # [END how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator] - get_gemini_api_key_task >> get_actual_model_task >> sft_train_task + # [START how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video] + sft_video_task = SupervisedFineTuningTrainOperator( + task_id="sft_train_video_task", + project_id=PROJECT_ID, + location=REGION, + source_model=SOURCE_MODEL, + train_dataset=TRAIN_VIDEO_DATASET, + tuned_model_display_name=TUNED_VIDEO_MODEL_DISPLAY_NAME, + ) + # [END how_to_cloud_vertex_ai_supervised_fine_tuning_train_operator_for_video] + + delete_bucket.trigger_rule = TriggerRule.ALL_DONE + + ( + get_gemini_api_key_task + >> get_actual_model_task + >> create_bucket + >> upload_file + >> [sft_train_task, sft_video_task] + >> delete_bucket + ) from tests_common.test_utils.watcher import watcher diff --git a/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/cloud/vertex_ai/resources/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl new file mode 100644 index 0000000000000..17d3bfd17ec29 --- /dev/null +++ b/providers/google/tests/system/google/cloud/vertex_ai/resources/video_tuning_dataset.jsonl @@ -0,0 +1 @@ +{"contents": [{"role": "user", "parts": [{"fileData": {"fileUri": "https://www.youtube.com/watch?v=nGeKSiCQkPw", "mimeType": "video/mp4"}}, {"text": "\n You are a video analysis expert. Detect which animal appears in the\n video.The video can only have one of the following animals: dog, cat,\n rabbit.\n Output Format:\n Generate output in the following JSON\n format:\n\n [{\n\n \"animal_name\": \"\",\n\n }]\n"}]}, {"role": "model", "parts": [{"text": "```json\n[{\"animal_name\": \"dog\"}]\n```"}]}], "generationConfig": {"mediaResolution": "MEDIA_RESOLUTION_LOW"}} diff --git a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py index e595eb33944c3..e34cd5593b113 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py +++ b/providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py @@ -1869,19 +1869,20 @@ class TestVertexAICreateAutoMLVideoTrainingJobOperator: @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook")) def test_execute(self, mock_hook, mock_dataset): mock_hook.return_value.create_auto_ml_video_training_job.return_value = (None, "training_id") - op = CreateAutoMLVideoTrainingJobOperator( - task_id=TASK_ID, - gcp_conn_id=GCP_CONN_ID, - impersonation_chain=IMPERSONATION_CHAIN, - display_name=DISPLAY_NAME, - dataset_id=TEST_DATASET_ID, - prediction_type="classification", - model_type="CLOUD", - sync=True, - region=GCP_LOCATION, - project_id=GCP_PROJECT, - parent_model=TEST_PARENT_MODEL, - ) + with pytest.warns(AirflowProviderDeprecationWarning): + op = CreateAutoMLVideoTrainingJobOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + display_name=DISPLAY_NAME, + dataset_id=TEST_DATASET_ID, + prediction_type="classification", + model_type="CLOUD", + sync=True, + region=GCP_LOCATION, + project_id=GCP_PROJECT, + parent_model=TEST_PARENT_MODEL, + ) op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()}) mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) mock_dataset.assert_called_once_with(dataset_name=TEST_DATASET_ID) @@ -1912,19 +1913,20 @@ def test_execute(self, mock_hook, mock_dataset): @mock.patch(VERTEX_AI_PATH.format("auto_ml.AutoMLHook")) def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_dataset): mock_hook.return_value.create_auto_ml_video_training_job.return_value = (None, "training_id") - op = CreateAutoMLVideoTrainingJobOperator( - task_id=TASK_ID, - gcp_conn_id=GCP_CONN_ID, - impersonation_chain=IMPERSONATION_CHAIN, - display_name=DISPLAY_NAME, - dataset_id=TEST_DATASET_ID, - prediction_type="classification", - model_type="CLOUD", - sync=True, - region=GCP_LOCATION, - project_id=GCP_PROJECT, - parent_model=VERSIONED_TEST_PARENT_MODEL, - ) + with pytest.warns(AirflowProviderDeprecationWarning): + op = CreateAutoMLVideoTrainingJobOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + display_name=DISPLAY_NAME, + dataset_id=TEST_DATASET_ID, + prediction_type="classification", + model_type="CLOUD", + sync=True, + region=GCP_LOCATION, + project_id=GCP_PROJECT, + parent_model=VERSIONED_TEST_PARENT_MODEL, + ) op.execute(context={"ti": mock.MagicMock(), "task": mock.MagicMock()}) mock_hook.return_value.create_auto_ml_video_training_job.assert_called_once_with( project_id=GCP_PROJECT,