From 598bc051010490fddd47883ab64e450d36aa3d86 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Wed, 8 Jan 2025 17:04:36 +0800 Subject: [PATCH] [#5996] feat(python-client): Using credentail in python GVFS client. (#5997) ### What changes were proposed in this pull request? Support using credentail in GVFS python client for cloud storage. ### Why are the changes needed? It's need. Fix: #5996 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? New it and test locally --- .../credential/ADLSTokenCredential.java | 1 + .../metadata_object_credential_operations.py | 2 +- .../gravitino/filesystem/gvfs.py | 302 +++++++++++++++--- .../gravitino/filesystem/gvfs_config.py | 8 + .../tests/integration/test_gvfs_with_abs.py | 6 +- .../test_gvfs_with_abs_credential.py | 171 ++++++++++ .../tests/integration/test_gvfs_with_gcs.py | 9 +- .../test_gvfs_with_gcs_credential.py | 112 +++++++ .../test_gvfs_with_oss_credential.py | 225 +++++++++++++ .../test_gvfs_with_s3_credential.py | 151 +++++++++ docs/how-to-use-gvfs.md | 23 +- 11 files changed, 942 insertions(+), 68 deletions(-) create mode 100644 clients/client-python/tests/integration/test_gvfs_with_abs_credential.py create mode 100644 clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py create mode 100644 clients/client-python/tests/integration/test_gvfs_with_oss_credential.py create mode 100644 clients/client-python/tests/integration/test_gvfs_with_s3_credential.py diff --git a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java index 249b0ac0b03..6f1c463033b 100644 --- a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java +++ b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java @@ -74,6 +74,7 @@ public long expireTimeInMs() { public Map credentialInfo() { return (new ImmutableMap.Builder()) .put(GRAVITINO_ADLS_SAS_TOKEN, sasToken) + .put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName) .build(); } diff --git a/clients/client-python/gravitino/client/metadata_object_credential_operations.py b/clients/client-python/gravitino/client/metadata_object_credential_operations.py index 93d538cfa0e..7184cd797cc 100644 --- a/clients/client-python/gravitino/client/metadata_object_credential_operations.py +++ b/clients/client-python/gravitino/client/metadata_object_credential_operations.py @@ -48,7 +48,7 @@ def __init__( metadata_object_type = metadata_object.type().value metadata_object_name = metadata_object.name() self._request_path = ( - f"api/metalakes/{metalake_name}objects/{metadata_object_type}/" + f"api/metalakes/{metalake_name}/objects/{metadata_object_type}/" f"{metadata_object_name}/credentials" ) diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index cd9521dc7a3..0dc020ee90b 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -14,9 +14,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging +import sys + +# Disable C0302: Too many lines in module +# pylint: disable=C0302 +import time from enum import Enum from pathlib import PurePosixPath -from typing import Dict, Tuple +from typing import Dict, Tuple, List import re import importlib import fsspec @@ -28,6 +34,9 @@ from fsspec.utils import infer_storage_options from readerwriterlock import rwlock + +from gravitino.api.catalog import Catalog +from gravitino.api.credential.credential import Credential from gravitino.audit.caller_context import CallerContext, CallerContextHolder from gravitino.audit.fileset_audit_constants import FilesetAuditConstants from gravitino.audit.fileset_data_operation import FilesetDataOperation @@ -35,14 +44,31 @@ from gravitino.auth.default_oauth2_token_provider import DefaultOAuth2TokenProvider from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider from gravitino.auth.simple_auth_provider import SimpleAuthProvider +from gravitino.client.generic_fileset import GenericFileset from gravitino.client.fileset_catalog import FilesetCatalog from gravitino.client.gravitino_client import GravitinoClient -from gravitino.exceptions.base import GravitinoRuntimeException +from gravitino.exceptions.base import ( + GravitinoRuntimeException, +) from gravitino.filesystem.gvfs_config import GVFSConfig from gravitino.name_identifier import NameIdentifier +from gravitino.api.credential.adls_token_credential import ADLSTokenCredential +from gravitino.api.credential.azure_account_key_credential import ( + AzureAccountKeyCredential, +) +from gravitino.api.credential.gcs_token_credential import GCSTokenCredential +from gravitino.api.credential.oss_secret_key_credential import OSSSecretKeyCredential +from gravitino.api.credential.oss_token_credential import OSSTokenCredential +from gravitino.api.credential.s3_secret_key_credential import S3SecretKeyCredential +from gravitino.api.credential.s3_token_credential import S3TokenCredential + +logger = logging.getLogger(__name__) + PROTOCOL_NAME = "gvfs" +TIME_WITHOUT_EXPIRATION = sys.maxsize + class StorageType(Enum): HDFS = "hdfs" @@ -677,8 +703,10 @@ def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperatio NameIdentifier.of(identifier.namespace().level(2), identifier.name()), sub_path, ) + return FilesetContextPair( - actual_file_location, self._get_filesystem(actual_file_location) + actual_file_location, + self._get_filesystem(actual_file_location, fileset_catalog, identifier), ) def _extract_identifier(self, path): @@ -866,50 +894,90 @@ def _get_fileset_catalog(self, catalog_ident: NameIdentifier): finally: write_lock.release() - def _get_filesystem(self, actual_file_location: str): + def _file_system_expired(self, expire_time: int): + return expire_time <= time.time() * 1000 + + # Disable Too many branches (13/12) (too-many-branches) + # pylint: disable=R0912 + def _get_filesystem( + self, + actual_file_location: str, + fileset_catalog: Catalog, + name_identifier: NameIdentifier, + ): storage_type = self._recognize_storage_type(actual_file_location) read_lock = self._cache_lock.gen_rlock() try: read_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) if cache_value is not None: - return cache_value + if not self._file_system_expired(cache_value[0]): + return cache_value[1] finally: read_lock.release() write_lock = self._cache_lock.gen_wlock() try: write_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) + if cache_value is not None: - return cache_value + if not self._file_system_expired(cache_value[0]): + return cache_value[1] + + new_cache_value: Tuple[int, AbstractFileSystem] if storage_type == StorageType.HDFS: fs_class = importlib.import_module("pyarrow.fs").HadoopFileSystem - fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location)) + new_cache_value = ( + TIME_WITHOUT_EXPIRATION, + ArrowFSWrapper(fs_class.from_uri(actual_file_location)), + ) elif storage_type == StorageType.LOCAL: - fs = LocalFileSystem() + new_cache_value = (TIME_WITHOUT_EXPIRATION, LocalFileSystem()) elif storage_type == StorageType.GCS: - fs = self._get_gcs_filesystem() + new_cache_value = self._get_gcs_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.S3A: - fs = self._get_s3_filesystem() + new_cache_value = self._get_s3_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.OSS: - fs = self._get_oss_filesystem() + new_cache_value = self._get_oss_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.ABS: - fs = self._get_abs_filesystem() + new_cache_value = self._get_abs_filesystem( + fileset_catalog, name_identifier + ) else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." ) - self._cache[storage_type] = fs - return fs + self._cache[name_identifier] = new_cache_value + return new_cache_value[1] finally: write_lock.release() - def _get_gcs_filesystem(self): + def _get_gcs_filesystem(self, fileset_catalog: Catalog, identifier: NameIdentifier): + fileset: GenericFileset = fileset_catalog.as_fileset_catalog().load_fileset( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()) + ) + credentials = fileset.support_credentials().get_credentials() + + credential = self._get_most_suitable_gcs_credential(credentials) + if credential is not None: + expire_time = self._get_expire_time_by_ratio(credential.expire_time_in_ms()) + if isinstance(credential, GCSTokenCredential): + fs = importlib.import_module("gcsfs").GCSFileSystem( + token=credential.token() + ) + return (expire_time, fs) + # get 'service-account-key' from gcs_options, if the key is not found, throw an exception service_account_key_path = self._options.get( GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE @@ -918,11 +986,47 @@ def _get_gcs_filesystem(self): raise GravitinoRuntimeException( "Service account key is not found in the options." ) - return importlib.import_module("gcsfs").GCSFileSystem( - token=service_account_key_path + return ( + TIME_WITHOUT_EXPIRATION, + importlib.import_module("gcsfs").GCSFileSystem( + token=service_account_key_path + ), ) - def _get_s3_filesystem(self): + def _get_s3_filesystem(self, fileset_catalog: Catalog, identifier: NameIdentifier): + fileset: GenericFileset = fileset_catalog.as_fileset_catalog().load_fileset( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()) + ) + credentials = fileset.support_credentials().get_credentials() + credential = self._get_most_suitable_s3_credential(credentials) + + # S3 endpoint from gravitino server, Note: the endpoint may not a real S3 endpoint + # it can be a simulated S3 endpoint, such as minio, so though the endpoint is not a required field + # for S3FileSystem, we still need to assign the endpoint to the S3FileSystem + s3_endpoint = fileset_catalog.properties().get("s3-endpoint", None) + # If the oss endpoint is not found in the fileset catalog, get it from the client options + if s3_endpoint is None: + s3_endpoint = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT) + + if credential is not None: + expire_time = self._get_expire_time_by_ratio(credential.expire_time_in_ms()) + if isinstance(credential, S3TokenCredential): + fs = importlib.import_module("s3fs").S3FileSystem( + key=credential.access_key_id(), + secret=credential.secret_access_key(), + token=credential.session_token(), + endpoint_url=s3_endpoint, + ) + return (expire_time, fs) + if isinstance(credential, S3SecretKeyCredential): + fs = importlib.import_module("s3fs").S3FileSystem( + key=credential.access_key_id(), + secret=credential.secret_access_key(), + endpoint_url=s3_endpoint, + ) + return (expire_time, fs) + + # this is the old way to get the s3 file system # get 'aws_access_key_id' from s3_options, if the key is not found, throw an exception aws_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY) if aws_access_key_id is None: @@ -939,20 +1043,48 @@ def _get_s3_filesystem(self): "AWS secret access key is not found in the options." ) - # get 'aws_endpoint_url' from s3_options, if the key is not found, throw an exception - aws_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT) - if aws_endpoint_url is None: - raise GravitinoRuntimeException( - "AWS endpoint url is not found in the options." - ) + return ( + TIME_WITHOUT_EXPIRATION, + importlib.import_module("s3fs").S3FileSystem( + key=aws_access_key_id, + secret=aws_secret_access_key, + endpoint_url=s3_endpoint, + ), + ) - return importlib.import_module("s3fs").S3FileSystem( - key=aws_access_key_id, - secret=aws_secret_access_key, - endpoint_url=aws_endpoint_url, + def _get_oss_filesystem(self, fileset_catalog: Catalog, identifier: NameIdentifier): + fileset: GenericFileset = fileset_catalog.as_fileset_catalog().load_fileset( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()) ) + credentials = fileset.support_credentials().get_credentials() + + # OSS endpoint from gravitino server + oss_endpoint = fileset_catalog.properties().get("oss-endpoint", None) + # If the oss endpoint is not found in the fileset catalog, get it from the client options + if oss_endpoint is None: + oss_endpoint = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT) + + credential = self._get_most_suitable_oss_credential(credentials) + if credential is not None: + expire_time = self._get_expire_time_by_ratio(credential.expire_time_in_ms()) + if isinstance(credential, OSSTokenCredential): + fs = importlib.import_module("ossfs").OSSFileSystem( + key=credential.access_key_id(), + secret=credential.secret_access_key(), + token=credential.security_token(), + endpoint=oss_endpoint, + ) + return (expire_time, fs) + if isinstance(credential, OSSSecretKeyCredential): + return ( + expire_time, + importlib.import_module("ossfs").OSSFileSystem( + key=credential.access_key_id(), + secret=credential.secret_access_key(), + endpoint=oss_endpoint, + ), + ) - def _get_oss_filesystem(self): # get 'oss_access_key_id' from oss options, if the key is not found, throw an exception oss_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY) if oss_access_key_id is None: @@ -969,20 +1101,38 @@ def _get_oss_filesystem(self): "OSS secret access key is not found in the options." ) - # get 'oss_endpoint_url' from oss options, if the key is not found, throw an exception - oss_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT) - if oss_endpoint_url is None: - raise GravitinoRuntimeException( - "OSS endpoint url is not found in the options." - ) + return ( + TIME_WITHOUT_EXPIRATION, + importlib.import_module("ossfs").OSSFileSystem( + key=oss_access_key_id, + secret=oss_secret_access_key, + endpoint=oss_endpoint, + ), + ) - return importlib.import_module("ossfs").OSSFileSystem( - key=oss_access_key_id, - secret=oss_secret_access_key, - endpoint=oss_endpoint_url, + def _get_abs_filesystem(self, fileset_catalog: Catalog, identifier: NameIdentifier): + fileset: GenericFileset = fileset_catalog.as_fileset_catalog().load_fileset( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()) ) + credentials = fileset.support_credentials().get_credentials() + + credential = self._get_most_suitable_abs_credential(credentials) + if credential is not None: + expire_time = self._get_expire_time_by_ratio(credential.expire_time_in_ms()) + if isinstance(credential, ADLSTokenCredential): + fs = importlib.import_module("adlfs").AzureBlobFileSystem( + account_name=credential.account_name(), + sas_token=credential.sas_token(), + ) + return (expire_time, fs) + + if isinstance(credential, AzureAccountKeyCredential): + fs = importlib.import_module("adlfs").AzureBlobFileSystem( + account_name=credential.account_name(), + account_key=credential.account_key(), + ) + return (expire_time, fs) - def _get_abs_filesystem(self): # get 'abs_account_name' from abs options, if the key is not found, throw an exception abs_account_name = self._options.get( GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME @@ -1001,10 +1151,68 @@ def _get_abs_filesystem(self): "ABS account key is not found in the options." ) - return importlib.import_module("adlfs").AzureBlobFileSystem( - account_name=abs_account_name, - account_key=abs_account_key, + return ( + TIME_WITHOUT_EXPIRATION, + importlib.import_module("adlfs").AzureBlobFileSystem( + account_name=abs_account_name, + account_key=abs_account_key, + ), + ) + + def _get_most_suitable_s3_credential(self, credentials: List[Credential]): + for credential in credentials: + # Prefer to use the token credential, if it does not exist, use the + # secret key credential. + if isinstance(credential, S3TokenCredential): + return credential + + for credential in credentials: + if isinstance(credential, S3SecretKeyCredential): + return credential + return None + + def _get_most_suitable_oss_credential(self, credentials: List[Credential]): + for credential in credentials: + # Prefer to use the token credential, if it does not exist, use the + # secret key credential. + if isinstance(credential, OSSTokenCredential): + return credential + + for credential in credentials: + if isinstance(credential, OSSSecretKeyCredential): + return credential + return None + + def _get_most_suitable_gcs_credential(self, credentials: List[Credential]): + for credential in credentials: + # Prefer to use the token credential, if it does not exist, return None. + if isinstance(credential, GCSTokenCredential): + return credential + return None + + def _get_most_suitable_abs_credential(self, credentials: List[Credential]): + for credential in credentials: + # Prefer to use the token credential, if it does not exist, use the + # account key credential + if isinstance(credential, ADLSTokenCredential): + return credential + + for credential in credentials: + if isinstance(credential, AzureAccountKeyCredential): + return credential + return None + + def _get_expire_time_by_ratio(self, expire_time: int): + if expire_time <= 0: + return TIME_WITHOUT_EXPIRATION + + ratio = float( + self._options.get( + GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO, + GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO, + ) ) + return time.time() * 1000 + (expire_time - time.time() * 1000) * ratio fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py index 4261fb48f2a..6fbd8a99d18 100644 --- a/clients/client-python/gravitino/filesystem/gvfs_config.py +++ b/clients/client-python/gravitino/filesystem/gvfs_config.py @@ -44,3 +44,11 @@ class GVFSConfig: GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name" GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key" + + # This configuration marks the expired time of the credential. For instance, if the credential + # fetched from Gravitino server has expired time of 3600 seconds, and the credential_expired_time_ration is 0.5 + # then the credential will be considered as expired after 1800 seconds and will try to retrieve a new credential. + GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO = "credential_expiration_ratio" + + # The default value of the credential_expired_time_ratio is 0.5 + DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO = 0.5 diff --git a/clients/client-python/tests/integration/test_gvfs_with_abs.py b/clients/client-python/tests/integration/test_gvfs_with_abs.py index a218efcfdf9..53c265c539a 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_abs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_abs.py @@ -33,8 +33,6 @@ ) from gravitino.exceptions.base import GravitinoRuntimeException from gravitino.filesystem.gvfs_config import GVFSConfig -from gravitino.filesystem.gvfs import StorageType - logger = logging.getLogger(__name__) @@ -281,7 +279,7 @@ def test_mkdir(self): self.assertFalse(self.fs.exists(mkdir_actual_dir)) self.assertFalse(fs.exists(mkdir_dir)) - self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}")) + self.assertFalse(self.fs.exists("abfss://{new_bucket}")) def test_makedirs(self): mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" @@ -309,7 +307,7 @@ def test_makedirs(self): self.assertFalse(self.fs.exists(mkdir_actual_dir)) self.assertFalse(fs.exists(mkdir_dir)) - self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}")) + self.assertFalse(self.fs.exists(f"abfss://{new_bucket}")) def test_ls(self): ls_dir = self.fileset_gvfs_location + "/test_ls" diff --git a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py new file mode 100644 index 00000000000..9071679fb7d --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + + +from adlfs import AzureBlobFileSystem + +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.filesystem.gvfs_config import GVFSConfig +from tests.integration.test_gvfs_with_abs import TestGvfsWithABS + + +logger = logging.getLogger(__name__) + + +def azure_abs_with_credential_is_prepared(): + return ( + os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL") + and os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL") + and os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL") + and os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL") + and os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL") + and os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL") + ) + + +@unittest.skipUnless( + azure_abs_with_credential_is_prepared(), + "Azure Blob Storage credential test is not prepared.", +) +class TestGvfsWithCredentialABS(TestGvfsWithABS): + # Before running this test, please set the make sure azure-bundle-xxx.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + azure_abs_account_key = os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL") + azure_abs_account_name = os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL") + azure_abs_container_name = os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL") + azure_abs_tenant_id = os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL") + azure_abs_client_id = os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL") + azure_abs_client_secret = os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL") + + metalake_name: str = "TestGvfsWithCredentialABS_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME: self.azure_abs_account_name, + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY: self.azure_abs_account_key, + } + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "abs", + "azure-storage-account-name": cls.azure_abs_account_name, + "azure-storage-account-key": cls.azure_abs_account_key, + "azure-tenant-id": cls.azure_abs_tenant_id, + "azure-client-id": cls.azure_abs_client_id, + "azure-client-secret": cls.azure_abs_client_secret, + "credential-providers": "adls-token", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"{cls.azure_abs_container_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=( + f"abfss://{cls.azure_abs_container_name}@{cls.azure_abs_account_name}.dfs.core.windows.net/" + f"{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ), + properties=cls.fileset_properties, + ) + + cls.fs = AzureBlobFileSystem( + account_name=cls.azure_abs_account_name, + account_key=cls.azure_abs_account_key, + ) + + # As the permission provided by the dynamic token is smaller compared to the permission provided by the static token + # like account key and account name, the test case will fail if we do not override the test case. + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.azure_abs_container_name + "2" + mkdir_actual_dir = mkdir_actual_dir.replace( + self.azure_abs_container_name, new_bucket + ) + self.fs.mkdir(mkdir_actual_dir, create_parents=True) + + self.assertFalse(self.fs.exists(mkdir_actual_dir)) + + self.assertTrue(self.fs.exists(f"abfss://{new_bucket}")) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.azure_abs_container_name + "1" + new_mkdir_actual_dir = mkdir_actual_dir.replace( + self.azure_abs_container_name, new_bucket + ) + self.fs.makedirs(new_mkdir_actual_dir) + self.assertFalse(self.fs.exists(mkdir_actual_dir)) diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py b/clients/client-python/tests/integration/test_gvfs_with_gcs.py index 15833aca01a..40e83d00814 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) -def oss_is_configured(): +def gcs_is_configured(): return all( [ os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH") is not None, @@ -45,7 +45,7 @@ def oss_is_configured(): ) -@unittest.skipUnless(oss_is_configured(), "GCS is not configured.") +@unittest.skipUnless(gcs_is_configured(), "GCS is not configured.") class TestGvfsWithGCS(TestGvfsWithHDFS): # Before running this test, please set the make sure gcp-bundle-x.jar has been # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory @@ -254,11 +254,10 @@ def test_mkdir(self): new_bucket = self.bucket_name + "1" mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) - fs.mkdir(mkdir_dir, create_parents=True) + with self.assertRaises(OSError): + fs.mkdir(mkdir_dir, create_parents=True) self.assertFalse(self.fs.exists(mkdir_actual_dir)) - self.assertFalse(fs.exists(mkdir_dir)) - self.assertFalse(self.fs.exists("gs://" + new_bucket)) def test_makedirs(self): mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py new file mode 100644 index 00000000000..eec502a13bb --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from gcsfs import GCSFileSystem + +from gravitino import Catalog, Fileset, GravitinoClient +from gravitino.filesystem import gvfs +from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS + +logger = logging.getLogger(__name__) + + +def gcs_with_credential_is_configured(): + return all( + [ + os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL") is not None, + os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL") is not None, + ] + ) + + +@unittest.skipUnless(gcs_with_credential_is_configured(), "GCS is not configured.") +class TestGvfsWithGCSCredential(TestGvfsWithGCS): + # Before running this test, please set the make sure gcp-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + key_file = os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL") + bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL") + metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1, 10000)) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "gcs", + "gcs-credential-file-path": cls.key_file, + "gcs-service-account-file": cls.key_file, + "credential-providers": "gcs-token", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = GCSFileSystem(token=cls.key_file) + + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + + fs.mkdir(mkdir_dir, create_parents=True) + self.assertFalse(self.fs.exists(mkdir_actual_dir)) diff --git a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py new file mode 100644 index 00000000000..14b8b523129 --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py @@ -0,0 +1,225 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + + +from ossfs import OSSFileSystem + +from gravitino import ( + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.filesystem import gvfs +from gravitino.filesystem.gvfs_config import GVFSConfig +from tests.integration.test_gvfs_with_oss import TestGvfsWithOSS + +logger = logging.getLogger(__name__) + + +def oss_with_credential_is_configured(): + return all( + [ + os.environ.get("OSS_STS_ACCESS_KEY_ID") is not None, + os.environ.get("OSS_STS_SECRET_ACCESS_KEY") is not None, + os.environ.get("OSS_STS_ENDPOINT") is not None, + os.environ.get("OSS_STS_BUCKET_NAME") is not None, + os.environ.get("OSS_STS_REGION") is not None, + os.environ.get("OSS_STS_ROLE_ARN") is not None, + ] + ) + + +@unittest.skipUnless( + oss_with_credential_is_configured(), "OSS with crednetial is not configured." +) +class TestGvfsWithOSSCredential(TestGvfsWithOSS): + # Before running this test, please set the make sure aliyun-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + oss_access_key = os.environ.get("OSS_STS_ACCESS_KEY_ID") + oss_secret_key = os.environ.get("OSS_STS_SECRET_ACCESS_KEY") + oss_endpoint = os.environ.get("OSS_STS_ENDPOINT") + bucket_name = os.environ.get("OSS_STS_BUCKET_NAME") + oss_sts_region = os.environ.get("OSS_STS_REGION") + oss_sts_role_arn = os.environ.get("OSS_STS_ROLE_ARN") + + metalake_name: str = "TestGvfsWithOSSCredential_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}": self.oss_access_key, + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}": self.oss_secret_key, + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint, + } + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "oss", + "oss-access-key-id": cls.oss_access_key, + "oss-secret-access-key": cls.oss_secret_key, + "oss-endpoint": cls.oss_endpoint, + "oss-region": cls.oss_sts_region, + "oss-role-arn": cls.oss_sts_role_arn, + "credential-providers": "oss-token", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"oss://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = OSSFileSystem( + key=cls.oss_access_key, + secret=cls.oss_secret_key, + endpoint=cls.oss_endpoint, + ) + + def test_cat_file(self): + cat_dir = self.fileset_gvfs_location + "/test_cat" + cat_actual_dir = self.fileset_storage_location + "/test_cat" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(cat_dir, cat_actual_dir, fs) + + cat_file = self.fileset_gvfs_location + "/test_cat/test.file" + cat_actual_file = self.fileset_storage_location + "/test_cat/test.file" + self.fs.touch(cat_actual_file) + self.assertTrue(self.fs.exists(cat_actual_file)) + self.assertTrue(fs.exists(cat_file)) + + # test open and write file + with fs.open(cat_file, mode="wb") as f: + f.write(b"test_cat_file") + self.assertTrue(fs.info(cat_file)["size"] > 0) + + # test cat file + content = fs.cat_file(cat_file) + self.assertEqual(b"test_cat_file", content) + + @unittest.skip( + "Skip this test case because fs.ls(dir) using credential is always empty" + ) + def test_ls(self): + ls_dir = self.fileset_gvfs_location + "/test_ls" + ls_actual_dir = self.fileset_storage_location + "/test_ls" + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(ls_dir, ls_actual_dir, fs) + + ls_file = self.fileset_gvfs_location + "/test_ls/test.file" + ls_actual_file = self.fileset_storage_location + "/test_ls/test.file" + self.fs.touch(ls_actual_file) + self.assertTrue(self.fs.exists(ls_actual_file)) + + # test detail = false + file_list_without_detail = fs.ls(ls_dir, detail=False) + self.assertEqual(1, len(file_list_without_detail)) + self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://") :]) + + # test detail = true + file_list_with_detail = fs.ls(ls_dir, detail=True) + self.assertEqual(1, len(file_list_with_detail)) + self.assertEqual(file_list_with_detail[0]["name"], ls_file[len("gvfs://") :]) + + @unittest.skip( + "Skip this test case because fs.info(info_file) using credential is always None" + ) + def test_info(self): + info_dir = self.fileset_gvfs_location + "/test_info" + info_actual_dir = self.fileset_storage_location + "/test_info" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(info_dir, info_actual_dir, fs) + + info_file = self.fileset_gvfs_location + "/test_info/test.file" + info_actual_file = self.fileset_storage_location + "/test_info/test.file" + self.fs.touch(info_actual_file) + self.assertTrue(self.fs.exists(info_actual_file)) + + ## OSS info has different behavior than S3 info. For OSS info, the name of the + ## directory will have a trailing slash if it's a directory and the path + # does not end with a slash, while S3 info will not have a trailing + # slash if it's a directory. + + # >> > oss.info('bucket-xiaoyu/lisi') + # {'name': 'bucket-xiaoyu/lisi/', 'type': 'directory', + # 'size': 0, 'Size': 0, 'Key': 'bucket-xiaoyu/lisi/'} + # >> > oss.info('bucket-xiaoyu/lisi/') + # {'name': 'bucket-xiaoyu/lisi', 'size': 0, + # 'type': 'directory', 'Size': 0, + # 'Key': 'bucket-xiaoyu/lisi' + + # >> > s3.info('paimon-bucket/lisi'); + # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0, + # 'StorageClass': 'DIRECTORY'} + # >> > s3.info('paimon-bucket/lisi/'); + # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0, + # 'StorageClass': 'DIRECTORY'} + + dir_info = fs.info(info_dir) + self.assertEqual(dir_info["name"][:-1], info_dir[len("gvfs://") :]) + + file_info = fs.info(info_file) + self.assertEqual(file_info["name"], info_file[len("gvfs://") :]) diff --git a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py new file mode 100644 index 00000000000..35d88c2c826 --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py @@ -0,0 +1,151 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from s3fs import S3FileSystem + +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.filesystem.gvfs_config import GVFSConfig +from tests.integration.test_gvfs_with_s3 import TestGvfsWithS3 + +logger = logging.getLogger(__name__) + + +def s3_with_credential_is_configured(): + return all( + [ + os.environ.get("S3_STS_ACCESS_KEY_ID") is not None, + os.environ.get("S3_STS_SECRET_ACCESS_KEY") is not None, + os.environ.get("S3_STS_ENDPOINT") is not None, + os.environ.get("S3_STS_BUCKET_NAME") is not None, + os.environ.get("S3_STS_REGION") is not None, + os.environ.get("S3_STS_ROLE_ARN") is not None, + ] + ) + + +@unittest.skipUnless(s3_with_credential_is_configured(), "S3 is not configured.") +class TestGvfsWithS3Credential(TestGvfsWithS3): + # Before running this test, please set the make sure aws-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + s3_access_key = os.environ.get("S3_STS_ACCESS_KEY_ID") + s3_secret_key = os.environ.get("S3_STS_SECRET_ACCESS_KEY") + s3_endpoint = os.environ.get("S3_STS_ENDPOINT") + bucket_name = os.environ.get("S3_STS_BUCKET_NAME") + s3_sts_region = os.environ.get("S3_STS_REGION") + s3_role_arn = os.environ.get("S3_STS_ROLE_ARN") + + metalake_name: str = "TestGvfsWithS3Credential_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key, + f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key, + f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint, + } + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "s3", + "s3-access-key-id": cls.s3_access_key, + "s3-secret-access-key": cls.s3_secret_key, + "s3-endpoint": cls.s3_endpoint, + "s3-region": cls.s3_sts_region, + "s3-role-arn": cls.s3_role_arn, + "credential-providers": "s3-token", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = S3FileSystem( + key=cls.s3_access_key, + secret=cls.s3_secret_key, + endpoint_url=cls.s3_endpoint, + ) + + # The following tests are copied from tests/integration/test_gvfs_with_s3.py, with some modifications as + # `mkdir` and `makedirs` have different behavior in the S3, other cloud storage like GCS, ABS, and OSS. + # are similar. + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + with self.assertRaises(ValueError): + fs.mkdir(mkdir_dir, create_parents=True) + self.assertFalse(fs.exists(mkdir_dir)) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md index 31ede3a5374..aff3b74adfd 100644 --- a/docs/how-to-use-gvfs.md +++ b/docs/how-to-use-gvfs.md @@ -455,17 +455,18 @@ to recompile the native libraries like `libhdfs` and others, and completely repl ### Configuration -| Configuration item | Description | Default value | Required | Since version | -|----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------| -| `server_uri` | The Gravitino server uri, e.g. `http://localhost:8090`. | (none) | Yes | 0.6.0-incubating | -| `metalake_name` | The metalake name which the fileset belongs to. | (none) | Yes | 0.6.0-incubating | -| `cache_size` | The cache capacity of the Gravitino Virtual File System. | `20` | No | 0.6.0-incubating | -| `cache_expired_time` | The value of time that the cache expires after accessing in the Gravitino Virtual File System. The value is in `seconds`. | `3600` | No | 0.6.0-incubating | -| `auth_type` | The auth type to initialize the Gravitino client to use with the Gravitino Virtual File System. Currently supports `simple` and `oauth2` auth types. | `simple` | No | 0.6.0-incubating | -| `oauth2_server_uri` | The auth server URI for the Gravitino client when using `oauth2` auth type. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | -| `oauth2_credential` | The auth credential for the Gravitino client when using `oauth2` auth type. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | -| `oauth2_path` | The auth server path for the Gravitino client when using `oauth2` auth type. Please remove the first slash `/` from the path, for example `oauth/token`. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | -| `oauth2_scope` | The auth scope for the Gravitino client when using `oauth2` auth type with the Gravitino Virtual File System. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | +| Configuration item | Description | Default value | Required | Since version | +|-------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------| +| `server_uri` | The Gravitino server uri, e.g. `http://localhost:8090`. | (none) | Yes | 0.6.0-incubating | +| `metalake_name` | The metalake name which the fileset belongs to. | (none) | Yes | 0.6.0-incubating | +| `cache_size` | The cache capacity of the Gravitino Virtual File System. | `20` | No | 0.6.0-incubating | +| `cache_expired_time` | The value of time that the cache expires after accessing in the Gravitino Virtual File System. The value is in `seconds`. | `3600` | No | 0.6.0-incubating | +| `auth_type` | The auth type to initialize the Gravitino client to use with the Gravitino Virtual File System. Currently supports `simple` and `oauth2` auth types. | `simple` | No | 0.6.0-incubating | +| `oauth2_server_uri` | The auth server URI for the Gravitino client when using `oauth2` auth type. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | +| `oauth2_credential` | The auth credential for the Gravitino client when using `oauth2` auth type. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | +| `oauth2_path` | The auth server path for the Gravitino client when using `oauth2` auth type. Please remove the first slash `/` from the path, for example `oauth/token`. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | +| `oauth2_scope` | The auth scope for the Gravitino client when using `oauth2` auth type with the Gravitino Virtual File System. | (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating | +| `credential_expiration_ratio` | The ratio of expiration time for credential from Gravitino. This is used in the cases where Gravitino Hadoop catalogs have enable credential vending. if the expiration time of credential fetched from Gravitino is 1 hour, GVFS client will try to refresh the credential in 1 * 0.9 = 0.5 hour. | 0.5 | No | 0.8.0-incubating | #### Extra configuration for S3, GCS, OSS fileset