Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import datetime
import os
import shutil
from glob import glob
from unittest import mock

Expand All @@ -40,20 +41,22 @@ class TestFileToGcsOperator:
"chunk_size": 262144,
}

def setup_method(self):
@pytest.fixture(autouse=True)
def setup_method_fixture(self, tmp_path):
args = {"owner": "airflow", "start_date": datetime.datetime(2017, 1, 1)}
self.dag = DAG("test_dag_id", schedule=None, default_args=args)
self.testfile1 = "/tmp/fake1.csv"
tmp_dir = tmp_path / "tmp"
tmp_dir.mkdir(exist_ok=True, parents=True)
self.tmpdir_posix = tmp_dir.as_posix()
self.testfile1 = f"{self.tmpdir_posix}/fake1.csv"
with open(self.testfile1, "wb") as f:
f.write(b"x" * 393216)
self.testfile2 = "/tmp/fake2.csv"
self.testfile2 = f"{self.tmpdir_posix}/fake2.csv"
with open(self.testfile2, "wb") as f:
f.write(b"x" * 393216)
self.testfiles = [self.testfile1, self.testfile2]

def teardown_method(self):
os.remove(self.testfile1)
os.remove(self.testfile2)
yield
shutil.rmtree(tmp_dir, ignore_errors=True)

def test_init(self):
operator = LocalFilesystemToGCSOperator(
Expand Down Expand Up @@ -129,11 +132,15 @@ def test_execute_multiple(self, mock_hook):
def test_execute_wildcard(self, mock_hook):
mock_instance = mock_hook.return_value
operator = LocalFilesystemToGCSOperator(
task_id="file_to_gcs_operator", dag=self.dag, src="/tmp/fake*.csv", dst="test/", **self._config
task_id="file_to_gcs_operator",
dag=self.dag,
src=f"{self.tmpdir_posix}/fake*.csv",
dst="test/",
**self._config,
)
operator.execute(None)
object_names = ["test/" + os.path.basename(fp) for fp in glob("/tmp/fake*.csv")]
files_objects = zip(glob("/tmp/fake*.csv"), object_names)
files_objects = zip(glob(f"{self.tmpdir_posix}/fake*.csv"), object_names)
calls = [
mock.call(
bucket_name=self._config["bucket"],
Expand All @@ -150,9 +157,9 @@ def test_execute_wildcard(self, mock_hook):
@pytest.mark.parametrize(
("src", "dst"),
[
("/tmp/fake*.csv", "test/test1.csv"),
("/tmp/fake*.csv", "test"),
("/tmp/fake*.csv", "test/dir"),
("fake*.csv", "test/test1.csv"),
("fake*.csv", "test"),
("fake*.csv", "test/dir"),
],
)
@mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True)
Expand All @@ -161,7 +168,7 @@ def test_execute_negative(self, mock_hook, src, dst):
operator = LocalFilesystemToGCSOperator(
task_id="file_to_gcs_operator",
dag=self.dag,
src=src,
src=f"{self.tmpdir_posix}/{src}",
dst=dst,
**self._config,
)
Expand All @@ -172,10 +179,10 @@ def test_execute_negative(self, mock_hook, src, dst):
@pytest.mark.parametrize(
("src", "dst", "expected_input", "expected_output", "symlink"),
[
("/tmp/fake*.csv", "test/", "/tmp", "test", True),
("/tmp/../tmp/fake*.csv", "test/", "/tmp", "test", True),
("/tmp/fake1.csv", "test/test1.csv", "/tmp/fake1.csv", "test/test1.csv", False),
("/tmp/fake1.csv", "test/pre", "/tmp/fake1.csv", "test/pre", False),
("fake*.csv", "test/", "", "test", True),
("../tmp/fake*.csv", "test/", "", "test", True),
("fake1.csv", "test/test1.csv", "fake1.csv", "test/test1.csv", False),
("fake1.csv", "test/pre", "fake1.csv", "test/pre", False),
],
)
def test_get_openlineage_facets_on_start_with_string_src(
Expand All @@ -184,11 +191,12 @@ def test_get_openlineage_facets_on_start_with_string_src(
operator = LocalFilesystemToGCSOperator(
task_id="gcs_to_file_sensor",
dag=self.dag,
src=src,
src=f"{self.tmpdir_posix}/{src}",
dst=dst,
**self._config,
)
result = operator.get_openlineage_facets_on_start()
expected_input = self.tmpdir_posix + ("/" + expected_input if expected_input else "")
assert not result.job_facets
assert not result.run_facets
assert len(result.outputs) == 1
Expand All @@ -199,21 +207,22 @@ def test_get_openlineage_facets_on_start_with_string_src(
assert result.inputs[0].name == expected_input
if symlink:
assert result.inputs[0].facets["symlink"] == SymlinksDatasetFacet(
identifiers=[Identifier(namespace="file", name=src, type="file")]
identifiers=[Identifier(namespace="file", name=f"{self.tmpdir_posix}/{src}", type="file")]
)

@pytest.mark.parametrize(
("src", "dst", "expected_inputs", "expected_output"),
[
(["/tmp/fake1.csv", "/tmp/fake2.csv"], "test/", ["/tmp/fake1.csv", "/tmp/fake2.csv"], "test"),
(["/tmp/fake1.csv", "/tmp/fake2.csv"], "", ["/tmp/fake1.csv", "/tmp/fake2.csv"], "/"),
(["fake1.csv", "fake2.csv"], "test/", ["fake1.csv", "fake2.csv"], "test"),
(["fake1.csv", "fake2.csv"], "", ["fake1.csv", "fake2.csv"], "/"),
],
)
def test_get_openlineage_facets_on_start_with_list_src(self, src, dst, expected_inputs, expected_output):
expected_inputs = [f"{self.tmpdir_posix}/{item}" for item in expected_inputs]
operator = LocalFilesystemToGCSOperator(
task_id="gcs_to_file_sensor",
dag=self.dag,
src=src,
src=[f"{self.tmpdir_posix}/{src_item}" for src_item in src],
dst=dst,
**self._config,
)
Expand Down