diff --git a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py index 2716ce9a4aab3..8142ebaa18b05 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py @@ -19,6 +19,7 @@ import datetime import os +import shutil from glob import glob from unittest import mock @@ -40,20 +41,22 @@ class TestFileToGcsOperator: "chunk_size": 262144, } - def setup_method(self): + @pytest.fixture(autouse=True) + def setup_method_fixture(self, tmp_path): args = {"owner": "airflow", "start_date": datetime.datetime(2017, 1, 1)} self.dag = DAG("test_dag_id", schedule=None, default_args=args) - self.testfile1 = "/tmp/fake1.csv" + tmp_dir = tmp_path / "tmp" + tmp_dir.mkdir(exist_ok=True, parents=True) + self.tmpdir_posix = tmp_dir.as_posix() + self.testfile1 = f"{self.tmpdir_posix}/fake1.csv" with open(self.testfile1, "wb") as f: f.write(b"x" * 393216) - self.testfile2 = "/tmp/fake2.csv" + self.testfile2 = f"{self.tmpdir_posix}/fake2.csv" with open(self.testfile2, "wb") as f: f.write(b"x" * 393216) self.testfiles = [self.testfile1, self.testfile2] - - def teardown_method(self): - os.remove(self.testfile1) - os.remove(self.testfile2) + yield + shutil.rmtree(tmp_dir, ignore_errors=True) def test_init(self): operator = LocalFilesystemToGCSOperator( @@ -129,11 +132,15 @@ def test_execute_multiple(self, mock_hook): def test_execute_wildcard(self, mock_hook): mock_instance = mock_hook.return_value operator = LocalFilesystemToGCSOperator( - task_id="file_to_gcs_operator", dag=self.dag, src="/tmp/fake*.csv", dst="test/", **self._config + task_id="file_to_gcs_operator", + dag=self.dag, + src=f"{self.tmpdir_posix}/fake*.csv", + dst="test/", + **self._config, ) operator.execute(None) object_names = ["test/" + os.path.basename(fp) for fp in glob("/tmp/fake*.csv")] - files_objects = zip(glob("/tmp/fake*.csv"), object_names) + files_objects = zip(glob(f"{self.tmpdir_posix}/fake*.csv"), object_names) calls = [ mock.call( bucket_name=self._config["bucket"], @@ -150,9 +157,9 @@ def test_execute_wildcard(self, mock_hook): @pytest.mark.parametrize( ("src", "dst"), [ - ("/tmp/fake*.csv", "test/test1.csv"), - ("/tmp/fake*.csv", "test"), - ("/tmp/fake*.csv", "test/dir"), + ("fake*.csv", "test/test1.csv"), + ("fake*.csv", "test"), + ("fake*.csv", "test/dir"), ], ) @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) @@ -161,7 +168,7 @@ def test_execute_negative(self, mock_hook, src, dst): operator = LocalFilesystemToGCSOperator( task_id="file_to_gcs_operator", dag=self.dag, - src=src, + src=f"{self.tmpdir_posix}/{src}", dst=dst, **self._config, ) @@ -172,10 +179,10 @@ def test_execute_negative(self, mock_hook, src, dst): @pytest.mark.parametrize( ("src", "dst", "expected_input", "expected_output", "symlink"), [ - ("/tmp/fake*.csv", "test/", "/tmp", "test", True), - ("/tmp/../tmp/fake*.csv", "test/", "/tmp", "test", True), - ("/tmp/fake1.csv", "test/test1.csv", "/tmp/fake1.csv", "test/test1.csv", False), - ("/tmp/fake1.csv", "test/pre", "/tmp/fake1.csv", "test/pre", False), + ("fake*.csv", "test/", "", "test", True), + ("../tmp/fake*.csv", "test/", "", "test", True), + ("fake1.csv", "test/test1.csv", "fake1.csv", "test/test1.csv", False), + ("fake1.csv", "test/pre", "fake1.csv", "test/pre", False), ], ) def test_get_openlineage_facets_on_start_with_string_src( @@ -184,11 +191,12 @@ def test_get_openlineage_facets_on_start_with_string_src( operator = LocalFilesystemToGCSOperator( task_id="gcs_to_file_sensor", dag=self.dag, - src=src, + src=f"{self.tmpdir_posix}/{src}", dst=dst, **self._config, ) result = operator.get_openlineage_facets_on_start() + expected_input = self.tmpdir_posix + ("/" + expected_input if expected_input else "") assert not result.job_facets assert not result.run_facets assert len(result.outputs) == 1 @@ -199,21 +207,22 @@ def test_get_openlineage_facets_on_start_with_string_src( assert result.inputs[0].name == expected_input if symlink: assert result.inputs[0].facets["symlink"] == SymlinksDatasetFacet( - identifiers=[Identifier(namespace="file", name=src, type="file")] + identifiers=[Identifier(namespace="file", name=f"{self.tmpdir_posix}/{src}", type="file")] ) @pytest.mark.parametrize( ("src", "dst", "expected_inputs", "expected_output"), [ - (["/tmp/fake1.csv", "/tmp/fake2.csv"], "test/", ["/tmp/fake1.csv", "/tmp/fake2.csv"], "test"), - (["/tmp/fake1.csv", "/tmp/fake2.csv"], "", ["/tmp/fake1.csv", "/tmp/fake2.csv"], "/"), + (["fake1.csv", "fake2.csv"], "test/", ["fake1.csv", "fake2.csv"], "test"), + (["fake1.csv", "fake2.csv"], "", ["fake1.csv", "fake2.csv"], "/"), ], ) def test_get_openlineage_facets_on_start_with_list_src(self, src, dst, expected_inputs, expected_output): + expected_inputs = [f"{self.tmpdir_posix}/{item}" for item in expected_inputs] operator = LocalFilesystemToGCSOperator( task_id="gcs_to_file_sensor", dag=self.dag, - src=src, + src=[f"{self.tmpdir_posix}/{src_item}" for src_item in src], dst=dst, **self._config, )