diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 319cdc972593c..3cade5678261c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -180,7 +180,7 @@ ) remote_task_handler_kwargs = {} elif remote_base_log_folder.startswith("gs://"): - from airflow.providers.google.cloud.logs.gcs_task_handler import GCSRemoteLogIO + from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None) diff --git a/airflow-core/tests/unit/core/test_logging_config.py b/airflow-core/tests/unit/core/test_logging_config.py index b19e7d9bbe7d6..72864b066b34a 100644 --- a/airflow-core/tests/unit/core/test_logging_config.py +++ b/airflow-core/tests/unit/core/test_logging_config.py @@ -295,6 +295,36 @@ def test_log_group_arns_remote_logging_with_cloudwatch_handler( assert isinstance(remote_io, CloudWatchRemoteLogIO) assert remote_io.log_group_arn == log_group_arn + def test_loading_remote_logging_with_gcs_handler(self): + """Test if logging can be configured successfully for GCS""" + import airflow.logging_config + from airflow.config_templates import airflow_local_settings + from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO + + with conf_vars( + { + ("logging", "remote_logging"): "True", + ("logging", "remote_log_conn_id"): "some_gcs", + ("logging", "remote_base_log_folder"): "gs://some-folder", + ("logging", "google_key_path"): "/gcs-key.json", + ( + "logging", + "remote_task_handler_kwargs", + ): '{"delete_local_copy": true, "project_id": "test-project", "gcp_keyfile_dict": {},"scopes": ["https://www.googleapis.com/auth/devstorage.read_write"]}', + } + ): + importlib.reload(airflow_local_settings) + airflow.logging_config.configure_logging() + + assert isinstance(airflow.logging_config.REMOTE_TASK_LOG, GCSRemoteLogIO) + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "delete_local_copy") is True + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "project_id") == "test-project" + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_keyfile_dict") == {} + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "scopes") == [ + "https://www.googleapis.com/auth/devstorage.read_write" + ] + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_key_path") == "/gcs-key.json" + def test_loading_remote_logging_with_kwargs(self): """Test if logging can be configured successfully with kwargs""" pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider not installed") diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index c598e02ba4a30..249cb80aa852f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -61,13 +61,15 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101 remote_base: str base_log_folder: Path = attrs.field(converter=Path) delete_local_copy: bool + project_id: str gcp_key_path: str | None gcp_keyfile_dict: dict | None scopes: Collection[str] | None - project_id: str - def upload(self, path: os.PathLike, ti: RuntimeTI): + processors = () + + def upload(self, path: os.PathLike | str, ti: RuntimeTI): """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute():