diff --git a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java index 919baa03b19..74a70f0830c 100644 --- a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java +++ b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java @@ -25,8 +25,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GCSFileSystemProvider implements FileSystemProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(GCSFileSystemProvider.class); + @Override public FileSystem getFileSystem(Path path, Map config) throws IOException { Configuration configuration = new Configuration(); @@ -35,6 +39,7 @@ public FileSystem getFileSystem(Path path, Map config) throws IO configuration.set(k.replace("gravitino.bypass.", ""), v); }); + LOGGER.info("Creating GCS file system with config: {}", config); return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration); } diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java index db1d01336ca..cca13b77047 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java @@ -44,7 +44,7 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT { @Override public void startIntegrationTest() throws Exception { - // Do nothing. + // Just overwrite super, do nothing. } @BeforeAll diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index e5a565ce0d6..8f1b2008ab9 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -14,11 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import os from enum import Enum from pathlib import PurePosixPath from typing import Dict, Tuple import re +import importlib import fsspec from cachetools import TTLCache, LRUCache @@ -26,7 +27,7 @@ from fsspec.implementations.local import LocalFileSystem from fsspec.implementations.arrow import ArrowFSWrapper from fsspec.utils import infer_storage_options -from pyarrow.fs import HadoopFileSystem + from readerwriterlock import rwlock from gravitino.audit.caller_context import CallerContext, CallerContextHolder from gravitino.audit.fileset_audit_constants import FilesetAuditConstants @@ -47,6 +48,7 @@ class StorageType(Enum): HDFS = "hdfs" LOCAL = "file" + GCS = "gs" class FilesetContextPair: @@ -66,7 +68,7 @@ def filesystem(self): class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem): - """This is a virtual file system which users can access `fileset` and + """This is a virtual file system that users can access `fileset` and other resources. It obtains the actual storage location corresponding to the resource from the @@ -149,6 +151,7 @@ def __init__( self._cache_lock = rwlock.RWLockFair() self._catalog_cache = LRUCache(maxsize=100) self._catalog_cache_lock = rwlock.RWLockFair() + self._options = options super().__init__(**kwargs) @@ -309,7 +312,9 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): ) dst_actual_path = dst_context_pair.actual_file_location() - if storage_type == StorageType.HDFS: + # convert the following to in + + if storage_type in [StorageType.HDFS, StorageType.GCS]: src_context_pair.filesystem().mv( self._strip_storage_protocol(storage_type, src_actual_path), self._strip_storage_protocol(storage_type, dst_actual_path), @@ -540,7 +545,11 @@ def _convert_actual_path( :param virtual_location: Virtual location :return A virtual path """ - if storage_location.startswith(f"{StorageType.HDFS.value}://"): + + # If the storage path starts with hdfs, gcs, we should use the path as the prefix. + if storage_location.startswith( + f"{StorageType.HDFS.value}://" + ) or storage_location.startswith(f"{StorageType.GCS.value}://"): actual_prefix = infer_storage_options(storage_location)["path"] elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] @@ -681,6 +690,8 @@ def _recognize_storage_type(path: str): return StorageType.HDFS if path.startswith(f"{StorageType.LOCAL.value}:/"): return StorageType.LOCAL + if path.startswith(f"{StorageType.GCS.value}://"): + return StorageType.GCS raise GravitinoRuntimeException( f"Storage type doesn't support now. Path:{path}" ) @@ -705,10 +716,11 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): :param path: The path :return: The stripped path """ - if storage_type == StorageType.HDFS: + if storage_type in (StorageType.HDFS, StorageType.GCS): return path if storage_type == StorageType.LOCAL: return path[len(f"{StorageType.LOCAL.value}:") :] + raise GravitinoRuntimeException( f"Storage type:{storage_type} doesn't support now." ) @@ -774,9 +786,12 @@ def _get_filesystem(self, actual_file_location: str): if cache_value is not None: return cache_value if storage_type == StorageType.HDFS: - fs = ArrowFSWrapper(HadoopFileSystem.from_uri(actual_file_location)) + fs_class = importlib.import_module("pyarrow.fs").HadoopFileSystem + fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location)) elif storage_type == StorageType.LOCAL: fs = LocalFileSystem() + elif storage_type == StorageType.GCS: + fs = ArrowFSWrapper(self._get_gcs_filesystem()) else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." @@ -786,5 +801,23 @@ def _get_filesystem(self, actual_file_location: str): finally: write_lock.release() + def _get_gcs_filesystem(self): + # get All keys from the options that start with 'gravitino.bypass.gcs.' and remove the prefix + gcs_options = { + key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value + for key, value in self._options.items() + if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) + } + + # get 'service-account-key' from gcs_options, if the key is not found, throw an exception + service_account_key_path = gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE) + if service_account_key_path is None: + raise GravitinoRuntimeException( + "Service account key is not found in the options." + ) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path + + return importlib.import_module("pyarrow.fs").GcsFileSystem() + fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py index eb5733b56be..618565c70eb 100644 --- a/clients/client-python/gravitino/filesystem/gvfs_config.py +++ b/clients/client-python/gravitino/filesystem/gvfs_config.py @@ -31,3 +31,7 @@ class GVFSConfig: OAUTH2_CREDENTIAL = "oauth2_credential" OAUTH2_PATH = "oauth2_path" OAUTH2_SCOPE = "oauth2_scope" + + GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass" + GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs." + GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path" diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index 7242082b77c..a330f738a1f 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -22,4 +22,5 @@ dataclasses-json==0.6.6 readerwriterlock==1.0.9 fsspec==2024.3.1 pyarrow==15.0.2 -cachetools==5.3.3 \ No newline at end of file +cachetools==5.3.3 +google-auth==2.35.0 \ No newline at end of file diff --git a/clients/client-python/tests/integration/integration_test_env.py b/clients/client-python/tests/integration/integration_test_env.py index cfe6c0eda09..d0d39a06da7 100644 --- a/clients/client-python/tests/integration/integration_test_env.py +++ b/clients/client-python/tests/integration/integration_test_env.py @@ -21,6 +21,7 @@ import subprocess import time import sys +import shutil import requests @@ -80,6 +81,12 @@ def setUpClass(cls): ) sys.exit(0) + # remove data dir under gravitino_home + data_dir = os.path.join(cls.gravitino_home, "data") + if os.path.exists(data_dir): + logger.info("Remove Gravitino data directory: %s", data_dir) + shutil.rmtree(data_dir) + logger.info("Starting integration test environment...") # Start Gravitino Server @@ -141,6 +148,12 @@ def restart_server(cls): "project root directory." ) + # remove data dir under gravitino_home + data_dir = os.path.join(gravitino_home, "data") + if os.path.exists(data_dir): + logger.info("Remove Gravitino data directory: %s", data_dir) + shutil.rmtree(data_dir) + # Restart Gravitino Server env_vars = os.environ.copy() env_vars["HADOOP_USER_NAME"] = "anonymous" diff --git a/clients/client-python/tests/integration/test_catalog.py b/clients/client-python/tests/integration/test_catalog.py index 58580b8c518..22fd3a5b303 100644 --- a/clients/client-python/tests/integration/test_catalog.py +++ b/clients/client-python/tests/integration/test_catalog.py @@ -39,7 +39,8 @@ class TestCatalog(IntegrationTestEnv): metalake_name: str = "TestSchema_metalake" + str(randint(1, 10000)) - catalog_name: str = "testCatalog" + catalog_name: str = "testCatalog" + str(randint(1, 10000)) + catalog_name_bak = catalog_name catalog_comment: str = "catalogComment" catalog_location_prop: str = "location" # Fileset Catalog must set `location` catalog_provider: str = "hadoop" @@ -81,21 +82,25 @@ def clean_test_data(self): ) try: logger.info( - "Drop catalog %s[%s]", + "TestCatalog: drop catalog %s[%s]", self.catalog_ident, self.gravitino_client.drop_catalog(name=self.catalog_name, force=True), ) except GravitinoRuntimeException: - logger.warning("Failed to drop catalog %s", self.catalog_name) + logger.warning("TestCatalog: failed to drop catalog %s", self.catalog_name) try: logger.info( - "Drop metalake %s[%s]", + "TestCatalog: drop metalake %s[%s]", self.metalake_name, self.gravitino_admin_client.drop_metalake(self.metalake_name), ) except GravitinoRuntimeException: - logger.warning("Failed to drop metalake %s", self.metalake_name) + logger.warning( + "TestCatalog: failed to drop metalake %s", self.metalake_name + ) + + self.catalog_name = self.catalog_name_bak def test_list_catalogs(self): self.create_catalog(self.catalog_name) diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py b/clients/client-python/tests/integration/test_gvfs_with_gcs.py new file mode 100644 index 00000000000..16f84dff3b1 --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py @@ -0,0 +1,173 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from fsspec.implementations.arrow import ArrowFSWrapper +from pyarrow.fs import GcsFileSystem + +from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.exceptions.base import GravitinoRuntimeException + + +logger = logging.getLogger(__name__) + + +@unittest.skip("This test require GCS service account key file") +class TestGvfsWithGCS(TestGvfsWithHDFS): + # Before running this test, please set the make sure gcp-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + key_file = "your_key_file.json" + bucket_name = "your_bucket_name" + metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = {"gravitino.bypass.gcs.service-account-key-path": self.key_file} + + def tearDown(self): + self.options = {} + + @classmethod + def setUpClass(cls): + cls._get_gravitino_home() + + cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf" + # restart the server + cls.restart_server() + # create entity + cls._init_test_entities() + + @classmethod + def tearDownClass(cls): + cls._clean_test_data() + # reset server conf in case of other ITs like HDFS has changed it and fail + # to reset it + cls._reset_conf(cls.config, cls.hadoop_conf_path) + # restart server + cls.restart_server() + + # clear all config in the conf_path + @classmethod + def _reset_conf(cls, config, conf_path): + logger.info("Reset %s.", conf_path) + if not os.path.exists(conf_path): + raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.") + filtered_lines = [] + with open(conf_path, mode="r", encoding="utf-8") as file: + origin_lines = file.readlines() + + for line in origin_lines: + line = line.strip() + if line.startswith("#"): + # append annotations directly + filtered_lines.append(line + "\n") + + with open(conf_path, mode="w", encoding="utf-8") as file: + for line in filtered_lines: + file.write(line) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "gcs", + "gravitino.bypass.fs.gs.auth.service.account.enable": "true", + "gravitino.bypass.fs.gs.auth.service.account.json.keyfile": cls.key_file, + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file + arrow_gcs_fs = GcsFileSystem() + cls.fs = ArrowFSWrapper(arrow_gcs_fs) + + def test_modified(self): + modified_dir = self.fileset_gvfs_location + "/test_modified" + modified_actual_dir = self.fileset_storage_location + "/test_modified" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.fs.mkdir(modified_actual_dir) + self.assertTrue(self.fs.exists(modified_actual_dir)) + self.assertTrue(fs.exists(modified_dir)) + + # GCP only supports getting the `object` modify time, so the modified time will be None + # if it's a directory. + # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3') + # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3') + # >>> print(r) + # None + self.assertIsNone(fs.modified(modified_dir)) + + # create a file under the dir 'modified_dir'. + file_path = modified_dir + "/test.txt" + fs.touch(file_path) + self.assertTrue(fs.exists(file_path)) + self.assertIsNotNone(fs.modified(file_path)) + + @unittest.skip( + "This test will fail for https://github.com/apache/arrow/issues/44438" + ) + def test_pandas(self): + pass + + @unittest.skip( + "This test will fail for https://github.com/apache/arrow/issues/44438" + ) + def test_pyarrow(self): + pass diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py index 8bc6597b455..5be5914f15b 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py @@ -89,6 +89,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv): uri="http://localhost:8090" ) gravitino_client: GravitinoClient = None + options = {} @classmethod def setUpClass(cls): @@ -124,7 +125,8 @@ def tearDownClass(cls): BaseHadoopEnvironment.clear_hadoop_env() finally: # close hdfs container - cls.hdfs_container.close() + if cls.hdfs_container is not None: + cls.hdfs_container.close() @classmethod def _init_test_entities(cls): @@ -159,7 +161,7 @@ def _init_test_entities(cls): properties=cls.fileset_properties, ) arrow_hadoop_fs = HadoopFileSystem(host=cls.hdfs_container.get_ip(), port=9000) - cls.hdfs = ArrowFSWrapper(arrow_hadoop_fs) + cls.fs = ArrowFSWrapper(arrow_hadoop_fs) cls.conf: Dict = {"fs.defaultFS": f"hdfs://{cls.hdfs_container.get_ip()}:9000/"} @classmethod @@ -234,15 +236,16 @@ def test_ls(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(ls_actual_dir) - self.assertTrue(self.hdfs.exists(ls_actual_dir)) + self.fs.mkdir(ls_actual_dir) + self.assertTrue(self.fs.exists(ls_actual_dir)) ls_file = self.fileset_gvfs_location + "/test_ls/test.file" ls_actual_file = self.fileset_storage_location + "/test_ls/test.file" - self.hdfs.touch(ls_actual_file) - self.assertTrue(self.hdfs.exists(ls_actual_file)) + self.fs.touch(ls_actual_file) + self.assertTrue(self.fs.exists(ls_actual_file)) # test detail = false file_list_without_detail = fs.ls(ls_dir, detail=False) @@ -260,15 +263,16 @@ def test_info(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(info_actual_dir) - self.assertTrue(self.hdfs.exists(info_actual_dir)) + self.fs.mkdir(info_actual_dir) + self.assertTrue(self.fs.exists(info_actual_dir)) info_file = self.fileset_gvfs_location + "/test_info/test.file" info_actual_file = self.fileset_storage_location + "/test_info/test.file" - self.hdfs.touch(info_actual_file) - self.assertTrue(self.hdfs.exists(info_actual_file)) + self.fs.touch(info_actual_file) + self.assertTrue(self.fs.exists(info_actual_file)) dir_info = fs.info(info_dir) self.assertEqual(dir_info["name"], info_dir[len("gvfs://") :]) @@ -282,16 +286,17 @@ def test_exist(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(exist_actual_dir) - self.assertTrue(self.hdfs.exists(exist_actual_dir)) + self.fs.mkdir(exist_actual_dir) + self.assertTrue(self.fs.exists(exist_actual_dir)) self.assertTrue(fs.exists(exist_dir)) exist_file = self.fileset_gvfs_location + "/test_exist/test.file" exist_actual_file = self.fileset_storage_location + "/test_exist/test.file" - self.hdfs.touch(exist_actual_file) - self.assertTrue(self.hdfs.exists(exist_actual_file)) + self.fs.touch(exist_actual_file) + self.assertTrue(self.fs.exists(exist_actual_file)) self.assertTrue(fs.exists(exist_file)) def test_cp_file(self): @@ -300,19 +305,20 @@ def test_cp_file(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(cp_file_actual_dir) - self.assertTrue(self.hdfs.exists(cp_file_actual_dir)) + self.fs.mkdir(cp_file_actual_dir) + self.assertTrue(self.fs.exists(cp_file_actual_dir)) self.assertTrue(fs.exists(cp_file_dir)) cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file" cp_file_actual_file = self.fileset_storage_location + "/test_cp_file/test.file" - self.hdfs.touch(cp_file_actual_file) - self.assertTrue(self.hdfs.exists(cp_file_actual_file)) + self.fs.touch(cp_file_actual_file) + self.assertTrue(self.fs.exists(cp_file_actual_file)) self.assertTrue(fs.exists(cp_file_file)) - with self.hdfs.open(cp_file_actual_file, "wb") as f: + with self.fs.open(cp_file_actual_file, "wb") as f: f.write(b"test_file_1") cp_file_new_file = self.fileset_gvfs_location + "/test_cp_file/test_cp.file" @@ -322,7 +328,7 @@ def test_cp_file(self): fs.cp_file(cp_file_file, cp_file_new_file) self.assertTrue(fs.exists(cp_file_new_file)) - with self.hdfs.open(cp_file_new_actual_file, "rb") as f: + with self.fs.open(cp_file_new_actual_file, "rb") as f: result = f.read() self.assertEqual(b"test_file_1", result) @@ -332,10 +338,11 @@ def test_mv(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(mv_actual_dir) - self.assertTrue(self.hdfs.exists(mv_actual_dir)) + self.fs.mkdir(mv_actual_dir) + self.assertTrue(self.fs.exists(mv_actual_dir)) self.assertTrue(fs.exists(mv_dir)) mv_new_dir = self.fileset_gvfs_location + "/test_mv_new" @@ -343,16 +350,17 @@ def test_mv(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(mv_new_actual_dir) - self.assertTrue(self.hdfs.exists(mv_new_actual_dir)) + self.fs.mkdir(mv_new_actual_dir) + self.assertTrue(self.fs.exists(mv_new_actual_dir)) self.assertTrue(fs.exists(mv_new_dir)) mv_file = self.fileset_gvfs_location + "/test_mv/test.file" mv_actual_file = self.fileset_storage_location + "/test_mv/test.file" - self.hdfs.touch(mv_actual_file) - self.assertTrue(self.hdfs.exists(mv_actual_file)) + self.fs.touch(mv_actual_file) + self.assertTrue(self.fs.exists(mv_actual_file)) self.assertTrue(fs.exists(mv_file)) mv_new_file = self.fileset_gvfs_location + "/test_mv_new/test_new.file" @@ -362,7 +370,7 @@ def test_mv(self): fs.mv(mv_file, mv_new_file) self.assertTrue(fs.exists(mv_new_file)) - self.assertTrue(self.hdfs.exists(mv_new_actual_file)) + self.assertTrue(self.fs.exists(mv_new_actual_file)) # test rename without sub path, which should throw an exception with self.assertRaises(GravitinoRuntimeException): @@ -374,16 +382,17 @@ def test_rm(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(rm_actual_dir) - self.assertTrue(self.hdfs.exists(rm_actual_dir)) + self.fs.mkdir(rm_actual_dir) + self.assertTrue(self.fs.exists(rm_actual_dir)) self.assertTrue(fs.exists(rm_dir)) rm_file = self.fileset_gvfs_location + "/test_rm/test.file" rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" - self.hdfs.touch(rm_file) - self.assertTrue(self.hdfs.exists(rm_actual_file)) + fs.touch(rm_file) + self.assertTrue(self.fs.exists(rm_actual_file)) self.assertTrue(fs.exists(rm_file)) # test delete file @@ -393,8 +402,8 @@ def test_rm(self): # test delete dir with recursive = false rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file" rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file" - self.hdfs.touch(rm_new_actual_file) - self.assertTrue(self.hdfs.exists(rm_new_actual_file)) + self.fs.touch(rm_new_actual_file) + self.assertTrue(self.fs.exists(rm_new_actual_file)) self.assertTrue(fs.exists(rm_new_file)) with self.assertRaises(ValueError): fs.rm(rm_dir, recursive=False) @@ -409,16 +418,17 @@ def test_rm_file(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(rm_file_actual_dir) - self.assertTrue(self.hdfs.exists(rm_file_actual_dir)) + self.fs.mkdir(rm_file_actual_dir) + self.assertTrue(self.fs.exists(rm_file_actual_dir)) self.assertTrue(fs.exists(rm_file_dir)) rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file" rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file" - self.hdfs.touch(rm_file_actual_file) - self.assertTrue(self.hdfs.exists(rm_file_actual_file)) + self.fs.touch(rm_file_actual_file) + self.assertTrue(self.fs.exists(rm_file_actual_file)) self.assertTrue(fs.exists(rm_file_file)) # test delete file @@ -435,16 +445,17 @@ def test_rmdir(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(rmdir_actual_dir) - self.assertTrue(self.hdfs.exists(rmdir_actual_dir)) + self.fs.mkdir(rmdir_actual_dir) + self.assertTrue(self.fs.exists(rmdir_actual_dir)) self.assertTrue(fs.exists(rmdir_dir)) rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" - self.hdfs.touch(rmdir_actual_file) - self.assertTrue(self.hdfs.exists(rmdir_actual_file)) + self.fs.touch(rmdir_actual_file) + self.assertTrue(self.fs.exists(rmdir_actual_file)) self.assertTrue(fs.exists(rmdir_file)) # test delete file @@ -461,16 +472,17 @@ def test_open(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(open_actual_dir) - self.assertTrue(self.hdfs.exists(open_actual_dir)) + self.fs.mkdir(open_actual_dir) + self.assertTrue(self.fs.exists(open_actual_dir)) self.assertTrue(fs.exists(open_dir)) open_file = self.fileset_gvfs_location + "/test_open/test.file" open_actual_file = self.fileset_storage_location + "/test_open/test.file" - self.hdfs.touch(open_actual_file) - self.assertTrue(self.hdfs.exists(open_actual_file)) + self.fs.touch(open_actual_file) + self.assertTrue(self.fs.exists(open_actual_file)) self.assertTrue(fs.exists(open_file)) # test open and write file @@ -488,11 +500,12 @@ def test_mkdir(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) fs.mkdir(mkdir_dir) self.assertTrue(fs.exists(mkdir_dir)) - self.assertTrue(self.hdfs.exists(mkdir_actual_dir)) + self.assertTrue(self.fs.exists(mkdir_actual_dir)) # test mkdir dir with create_parents = false parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir" @@ -514,11 +527,12 @@ def test_makedirs(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) fs.makedirs(makedirs_dir) self.assertTrue(fs.exists(makedirs_dir)) - self.assertTrue(self.hdfs.exists(makedirs_actual_dir)) + self.assertTrue(self.fs.exists(makedirs_actual_dir)) # test mkdir dir not exist parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir" @@ -532,10 +546,11 @@ def test_created(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(created_actual_dir) - self.assertTrue(self.hdfs.exists(created_actual_dir)) + self.fs.mkdir(created_actual_dir) + self.assertTrue(self.fs.exists(created_actual_dir)) self.assertTrue(fs.exists(created_dir)) with self.assertRaises(GravitinoRuntimeException): @@ -547,10 +562,11 @@ def test_modified(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(modified_actual_dir) - self.assertTrue(self.hdfs.exists(modified_actual_dir)) + self.fs.mkdir(modified_actual_dir) + self.assertTrue(self.fs.exists(modified_actual_dir)) self.assertTrue(fs.exists(modified_dir)) # test mkdir dir which exists @@ -562,16 +578,17 @@ def test_cat_file(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(cat_actual_dir) - self.assertTrue(self.hdfs.exists(cat_actual_dir)) + self.fs.mkdir(cat_actual_dir) + self.assertTrue(self.fs.exists(cat_actual_dir)) self.assertTrue(fs.exists(cat_dir)) cat_file = self.fileset_gvfs_location + "/test_cat/test.file" cat_actual_file = self.fileset_storage_location + "/test_cat/test.file" - self.hdfs.touch(cat_actual_file) - self.assertTrue(self.hdfs.exists(cat_actual_file)) + self.fs.touch(cat_actual_file) + self.assertTrue(self.fs.exists(cat_actual_file)) self.assertTrue(fs.exists(cat_file)) # test open and write file @@ -589,16 +606,17 @@ def test_get_file(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(get_actual_dir) - self.assertTrue(self.hdfs.exists(get_actual_dir)) + self.fs.mkdir(get_actual_dir) + self.assertTrue(self.fs.exists(get_actual_dir)) self.assertTrue(fs.exists(get_dir)) get_file = self.fileset_gvfs_location + "/test_get/test.file" get_actual_file = self.fileset_storage_location + "/test_get/test.file" - self.hdfs.touch(get_actual_file) - self.assertTrue(self.hdfs.exists(get_actual_file)) + self.fs.touch(get_actual_file) + self.assertTrue(self.fs.exists(get_actual_file)) self.assertTrue(fs.exists(get_file)) # test open and write file @@ -628,10 +646,11 @@ def test_pandas(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(pands_actual_dir) - self.assertTrue(self.hdfs.exists(pands_actual_dir)) + self.fs.mkdir(pands_actual_dir) + self.assertTrue(self.fs.exists(pands_actual_dir)) self.assertTrue(fs.exists(pands_dir)) data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) @@ -642,7 +661,7 @@ def test_pandas(self): ) data.to_parquet(parquet_file, filesystem=fs) self.assertTrue(fs.exists(parquet_file)) - self.assertTrue(self.hdfs.exists(parquet_actual_file)) + self.assertTrue(self.fs.exists(parquet_actual_file)) # read parquet ds1 = pandas.read_parquet(path=parquet_file, filesystem=fs) @@ -650,6 +669,7 @@ def test_pandas(self): storage_options = { "server_uri": "http://localhost:8090", "metalake_name": self.metalake_name, + "options": self.options, } # to csv csv_file = self.fileset_gvfs_location + "/test_pandas/test.csv" @@ -660,7 +680,7 @@ def test_pandas(self): storage_options=storage_options, ) self.assertTrue(fs.exists(csv_file)) - self.assertTrue(self.hdfs.exists(csv_actual_file)) + self.assertTrue(self.fs.exists(csv_actual_file)) # read csv ds2 = pandas.read_csv(csv_file, storage_options=storage_options) @@ -672,10 +692,11 @@ def test_pyarrow(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(pyarrow_actual_dir) - self.assertTrue(self.hdfs.exists(pyarrow_actual_dir)) + self.fs.mkdir(pyarrow_actual_dir) + self.assertTrue(self.fs.exists(pyarrow_actual_dir)) self.assertTrue(fs.exists(pyarrow_dir)) data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) @@ -701,16 +722,18 @@ def test_llama_index(self): fs = gvfs.GravitinoVirtualFileSystem( server_uri="http://localhost:8090", metalake_name=self.metalake_name, + options=self.options, **self.conf, ) - self.hdfs.mkdir(llama_actual_dir) - self.assertTrue(self.hdfs.exists(llama_actual_dir)) + self.fs.mkdir(llama_actual_dir) + self.assertTrue(self.fs.exists(llama_actual_dir)) self.assertTrue(fs.exists(llama_dir)) data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) storage_options = { "server_uri": "http://localhost:8090", "metalake_name": self.metalake_name, + "options": self.options, } csv_file = llama_dir + "/test.csv" # to csv diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java index 73a45006f03..312236fe5da 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java @@ -52,6 +52,7 @@ public void startIntegrationTest() { @BeforeAll public void startUp() throws Exception { + // Copy the GCP jars to the gravitino server if in deploy mode. copyBundleJarsToHadoop("gcp-bundle"); // Need to download jars to gravitino server super.startIntegrationTest();