|
12 | 12 | from mock import patch |
13 | 13 |
|
14 | 14 | import dvc |
| 15 | +from dvc.cache import Cache |
15 | 16 | from dvc.exceptions import DvcException |
16 | 17 | from dvc.exceptions import RecursiveAddingWhileUsingFilename |
17 | 18 | from dvc.exceptions import StageFileCorruptedError |
|
24 | 25 | from dvc.utils import file_md5 |
25 | 26 | from dvc.utils import LARGE_DIR_SIZE |
26 | 27 | from dvc.utils import relpath |
27 | | -from dvc.utils.compat import range |
| 28 | +from dvc.utils.compat import range, fspath |
| 29 | +from dvc.utils.fs import path_isin |
28 | 30 | from dvc.utils.stage import load_stage_file |
29 | 31 | from tests.basic_env import TestDvc |
30 | 32 | from tests.utils import get_gitignore_content |
@@ -511,15 +513,56 @@ def test(self): |
511 | 513 | self.assertTrue(System.is_hardlink(self.FOO)) |
512 | 514 |
|
513 | 515 |
|
| 516 | +@pytest.fixture |
| 517 | +def temporary_windows_drive(tmp_path_factory): |
| 518 | + import string |
| 519 | + import win32api |
| 520 | + from ctypes import windll |
| 521 | + from win32con import DDD_REMOVE_DEFINITION |
| 522 | + |
| 523 | + drives = [ |
| 524 | + s[0].upper() |
| 525 | + for s in win32api.GetLogicalDriveStrings().split("\000") |
| 526 | + if len(s) > 0 |
| 527 | + ] |
| 528 | + |
| 529 | + new_drive_name = [ |
| 530 | + letter for letter in string.ascii_uppercase if letter not in drives |
| 531 | + ][0] |
| 532 | + new_drive = "{}:".format(new_drive_name) |
| 533 | + |
| 534 | + target_path = tmp_path_factory.mktemp("tmp_windows_drive") |
| 535 | + |
| 536 | + set_up_result = windll.kernel32.DefineDosDeviceW( |
| 537 | + 0, new_drive, fspath(target_path) |
| 538 | + ) |
| 539 | + if set_up_result == 0: |
| 540 | + raise RuntimeError("Failed to mount windows drive!") |
| 541 | + |
| 542 | + # NOTE: new_drive has form of `A:` and joining it with some relative |
| 543 | + # path might result in non-existing path (A:path\\to) |
| 544 | + yield os.path.join(new_drive, os.sep) |
| 545 | + |
| 546 | + tear_down_result = windll.kernel32.DefineDosDeviceW( |
| 547 | + DDD_REMOVE_DEFINITION, new_drive, fspath(target_path) |
| 548 | + ) |
| 549 | + if tear_down_result == 0: |
| 550 | + raise RuntimeError("Could not unmount windows drive!") |
| 551 | + |
| 552 | + |
514 | 553 | @pytest.mark.skipif(os.name != "nt", reason="Windows specific") |
515 | 554 | def test_windows_should_add_when_cache_on_different_drive( |
516 | | - dvc_repo, repo_dir, temporary_windows_drive |
| 555 | + tmp_dir, dvc, temporary_windows_drive |
517 | 556 | ): |
518 | | - ret = main(["config", "cache.dir", temporary_windows_drive]) |
519 | | - assert ret == 0 |
| 557 | + dvc.config.set("cache", "dir", temporary_windows_drive) |
| 558 | + dvc.cache = Cache(dvc) |
| 559 | + |
| 560 | + stage, = tmp_dir.dvc_gen({"file": "file"}) |
| 561 | + cache_path = stage.outs[0].cache_path |
520 | 562 |
|
521 | | - ret = main(["add", repo_dir.DATA]) |
522 | | - assert ret == 0 |
| 563 | + assert path_isin(cache_path, temporary_windows_drive) |
| 564 | + assert os.path.isfile(cache_path) |
| 565 | + filecmp.cmp("file", cache_path) |
523 | 566 |
|
524 | 567 |
|
525 | 568 | def test_readding_dir_should_not_unprotect_all(tmp_dir, dvc, mocker): |
|
0 commit comments