Skip to content

Commit eb6b6ad

Browse files
authored
Add support for catalog federation in the CLI (apache#1912)
The CLI currently only supports the version of EXTERNAL catalogs that was present in 0.9.0. Now, EXTERNAL catalogs can be configured with various configurations relating to federation. This PR updates the CLI to better match the REST API so that federated catalogs can be easily set up in the CLI.
1 parent 108989d commit eb6b6ad

File tree

17 files changed

+570
-649
lines changed

17 files changed

+570
-649
lines changed

client/python/cli/command/__init__.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def options_get(key, f=lambda x: x):
3939
properties = Parser.parse_properties(options_get(Arguments.PROPERTY))
4040
set_properties = Parser.parse_properties(options_get(Arguments.SET_PROPERTY))
4141
remove_properties = options_get(Arguments.REMOVE_PROPERTY)
42+
catalog_client_scopes = options_get(Arguments.CATALOG_CLIENT_SCOPE)
4243

4344
command = None
4445
if options.command == Commands.CATALOGS:
@@ -61,9 +62,24 @@ def options_get(key, f=lambda x: x):
6162
catalog_name=options_get(Arguments.CATALOG),
6263
properties={} if properties is None else properties,
6364
set_properties={} if set_properties is None else set_properties,
64-
remove_properties=[]
65-
if remove_properties is None
66-
else remove_properties,
65+
hadoop_warehouse=options_get(Arguments.HADOOP_WAREHOUSE),
66+
iceberg_remote_catalog_name=options_get(Arguments.ICEBERG_REMOTE_CATALOG_NAME),
67+
remove_properties=[] if remove_properties is None else remove_properties,
68+
catalog_connection_type=options_get(Arguments.CATALOG_CONNECTION_TYPE),
69+
catalog_authentication_type=options_get(Arguments.CATALOG_AUTHENTICATION_TYPE),
70+
catalog_service_identity_type=options_get(Arguments.CATALOG_SERVICE_IDENTITY_TYPE),
71+
catalog_service_identity_iam_arn=options_get(Arguments.CATALOG_SERVICE_IDENTITY_IAM_ARN),
72+
catalog_uri=options_get(Arguments.CATALOG_URI),
73+
catalog_token_uri=options_get(Arguments.CATALOG_TOKEN_URI),
74+
catalog_client_id=options_get(Arguments.CATALOG_CLIENT_ID),
75+
catalog_client_secret=options_get(Arguments.CATALOG_CLIENT_SECRET),
76+
catalog_client_scopes=[] if catalog_client_scopes is None else catalog_client_scopes,
77+
catalog_bearer_token=options_get(Arguments.CATALOG_BEARER_TOKEN),
78+
catalog_role_arn=options_get(Arguments.CATALOG_ROLE_ARN),
79+
catalog_role_session_name=options_get(Arguments.CATALOG_ROLE_SESSION_NAME),
80+
catalog_external_id=options_get(Arguments.CATALOG_EXTERNAL_ID),
81+
catalog_signing_region=options_get(Arguments.CATALOG_SIGNING_REGION),
82+
catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME)
6783
)
6884
elif options.command == Commands.PRINCIPALS:
6985
from cli.command.principals import PrincipalsCommand

client/python/cli/command/catalogs.py

Lines changed: 123 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,17 @@
1919
from dataclasses import dataclass
2020
from typing import Dict, List
2121

22-
from pydantic import StrictStr
22+
from pydantic import StrictStr, SecretStr
2323

2424
from cli.command import Command
25-
from cli.constants import StorageType, CatalogType, Subcommands, Arguments
25+
from cli.constants import StorageType, CatalogType, CatalogConnectionType, Subcommands, Arguments, AuthenticationType, \
26+
ServiceIdentityType
2627
from cli.options.option_tree import Argument
27-
from polaris.management import (
28-
PolarisDefaultApi,
29-
CreateCatalogRequest,
30-
UpdateCatalogRequest,
31-
StorageConfigInfo,
32-
ExternalCatalog,
33-
AwsStorageConfigInfo,
34-
AzureStorageConfigInfo,
35-
GcpStorageConfigInfo,
36-
PolarisCatalog,
37-
CatalogProperties,
38-
)
28+
from polaris.management import PolarisDefaultApi, CreateCatalogRequest, UpdateCatalogRequest, \
29+
StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \
30+
PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, \
31+
OAuthClientCredentialsParameters, SigV4AuthenticationParameters, HadoopConnectionConfigInfo, \
32+
IcebergRestConnectionConfigInfo, AwsIamServiceIdentityInfo
3933

4034

4135
@dataclass
@@ -68,19 +62,56 @@ class CatalogsCommand(Command):
6862
properties: Dict[str, StrictStr]
6963
set_properties: Dict[str, StrictStr]
7064
remove_properties: List[str]
65+
hadoop_warehouse: str
66+
iceberg_remote_catalog_name: str
67+
catalog_connection_type: str
68+
catalog_authentication_type: str
69+
catalog_service_identity_type: str
70+
catalog_service_identity_iam_arn: str
71+
catalog_uri: str
72+
catalog_token_uri: str
73+
catalog_client_id: str
74+
catalog_client_secret: str
75+
catalog_client_scopes: List[str]
76+
catalog_bearer_token: str
77+
catalog_role_arn: str
78+
catalog_role_session_name: str
79+
catalog_external_id: str
80+
catalog_signing_region: str
81+
catalog_signing_name: str
7182

7283
def validate(self):
7384
if self.catalogs_subcommand == Subcommands.CREATE:
74-
if not self.storage_type:
75-
raise Exception(
76-
f"Missing required argument:"
77-
f" {Argument.to_flag_name(Arguments.STORAGE_TYPE)}"
78-
)
79-
if not self.default_base_location:
80-
raise Exception(
81-
f"Missing required argument:"
82-
f" {Argument.to_flag_name(Arguments.DEFAULT_BASE_LOCATION)}"
83-
)
85+
if self.catalog_type != CatalogType.EXTERNAL.value:
86+
if not self.storage_type:
87+
raise Exception(f'Missing required argument:'
88+
f' {Argument.to_flag_name(Arguments.STORAGE_TYPE)}')
89+
if not self.default_base_location:
90+
raise Exception(f'Missing required argument:'
91+
f' {Argument.to_flag_name(Arguments.DEFAULT_BASE_LOCATION)}')
92+
else:
93+
if self.catalog_authentication_type == AuthenticationType.OAUTH.value:
94+
if not self.catalog_token_uri or not self.catalog_client_id \
95+
or not self.catalog_client_secret or len(self.catalog_client_scopes) == 0:
96+
raise Exception(f"Authentication type 'OAUTH' requires"
97+
f" {Argument.to_flag_name(Arguments.CATALOG_TOKEN_URI)},"
98+
f" {Argument.to_flag_name(Arguments.CATALOG_CLIENT_ID)},"
99+
f" {Argument.to_flag_name(Arguments.CATALOG_CLIENT_SECRET)},"
100+
f" and at least one {Argument.to_flag_name(Arguments.CATALOG_CLIENT_SCOPE)}.")
101+
elif self.catalog_authentication_type == AuthenticationType.BEARER.value:
102+
if not self.catalog_bearer_token:
103+
raise Exception(f"Missing required argument for authentication type 'BEARER':"
104+
f" {Argument.to_flag_name(Arguments.CATALOG_BEARER_TOKEN)}")
105+
elif self.catalog_authentication_type == AuthenticationType.SIGV4.value:
106+
if not self.catalog_role_arn or not self.catalog_signing_region:
107+
raise Exception(f"Authentication type 'SIGV4 requires"
108+
f" {Argument.to_flag_name(Arguments.CATALOG_ROLE_ARN)}"
109+
f" and {Argument.to_flag_name(Arguments.CATALOG_SIGNING_REGION)}")
110+
111+
if self.catalog_service_identity_type == ServiceIdentityType.AWS_IAM.value:
112+
if not self.catalog_service_identity_iam_arn:
113+
raise Exception(f"Missing required argument for service identity type 'AWS_IAM':"
114+
f" {Argument.to_flag_name(Arguments.CATALOG_SERVICE_IDENTITY_IAM_ARN)}")
84115

85116
if self.storage_type == StorageType.S3.value:
86117
if not self.role_arn:
@@ -166,31 +197,94 @@ def _build_storage_config_info(self):
166197
)
167198
return config
168199

200+
def _build_connection_config_info(self):
201+
if self.catalog_type != CatalogType.EXTERNAL.value:
202+
return None
203+
204+
auth_params = None
205+
if self.catalog_authentication_type == AuthenticationType.OAUTH.value:
206+
auth_params = OAuthClientCredentialsParameters(
207+
authentication_type=self.catalog_authentication_type.upper(),
208+
token_uri=self.catalog_token_uri,
209+
client_id=self.catalog_client_id,
210+
client_secret=SecretStr(self.catalog_client_secret),
211+
scopes=self.catalog_client_scopes
212+
)
213+
elif self.catalog_authentication_type == AuthenticationType.BEARER.value:
214+
auth_params = BearerAuthenticationParameters(
215+
authentication_type=self.catalog_authentication_type.upper(),
216+
bearer_token=SecretStr(self.catalog_bearer_token)
217+
)
218+
elif self.catalog_authentication_type == AuthenticationType.SIGV4.value:
219+
auth_params = SigV4AuthenticationParameters(
220+
authentication_type=self.catalog_authentication_type.upper(),
221+
role_arn=self.catalog_role_arn,
222+
role_session_name=self.catalog_role_session_name,
223+
external_id=self.catalog_external_id,
224+
signing_region=self.catalog_signing_region,
225+
signing_name=self.catalog_signing_name,
226+
)
227+
elif self.catalog_authentication_type is not None:
228+
raise Exception("Unknown authentication type:", self.catalog_authentication_type)
229+
230+
service_identity = None
231+
if self.catalog_service_identity_type == ServiceIdentityType.AWS_IAM:
232+
service_identity = AwsIamServiceIdentityInfo(
233+
identity_type=self.catalog_service_identity_type.upper(),
234+
iam_arn=self.catalog_service_identity_iam_arn
235+
)
236+
elif self.catalog_service_identity_type is not None:
237+
raise Exception("Unknown service identity type:", self.catalog_service_identity_type)
238+
239+
config = None
240+
if self.catalog_connection_type == CatalogConnectionType.HADOOP.value:
241+
config = HadoopConnectionConfigInfo(
242+
connection_type=self.catalog_connection_type.upper(),
243+
uri=self.catalog_uri,
244+
authentication_parameters=auth_params,
245+
service_identity=service_identity,
246+
warehouse=self.hadoop_warehouse
247+
)
248+
elif self.catalog_connection_type == CatalogConnectionType.ICEBERG.value:
249+
config = IcebergRestConnectionConfigInfo(
250+
connection_type=self.catalog_connection_type.upper().replace('-', '_'),
251+
uri=self.catalog_uri,
252+
authentication_parameters=auth_params,
253+
service_identity=service_identity,
254+
remote_catalog_name=self.iceberg_remote_catalog_name
255+
)
256+
elif self.catalog_connection_type is not None:
257+
raise Exception("Unknown catalog connection type:", self.catalog_connection_type)
258+
return config
259+
169260
def execute(self, api: PolarisDefaultApi) -> None:
170261
if self.catalogs_subcommand == Subcommands.CREATE:
171-
config = self._build_storage_config_info()
262+
storage_config = self._build_storage_config_info()
263+
connection_config = self._build_connection_config_info()
172264
if self.catalog_type == CatalogType.EXTERNAL.value:
173265
request = CreateCatalogRequest(
174266
catalog=ExternalCatalog(
175267
type=self.catalog_type.upper(),
176268
name=self.catalog_name,
177-
storage_config_info=config,
269+
storage_config_info=storage_config,
178270
properties=CatalogProperties(
179271
default_base_location=self.default_base_location,
180-
additional_properties=self.properties,
272+
additional_properties=self.properties
181273
),
274+
connection_config_info=connection_config
182275
)
183276
)
184277
else:
185278
request = CreateCatalogRequest(
186279
catalog=PolarisCatalog(
187280
type=self.catalog_type.upper(),
188281
name=self.catalog_name,
189-
storage_config_info=config,
282+
storage_config_info=storage_config,
190283
properties=CatalogProperties(
191284
default_base_location=self.default_base_location,
192-
additional_properties=self.properties,
285+
additional_properties=self.properties
193286
),
287+
connection_config_info=connection_config
194288
)
195289
)
196290
api.create_catalog(request)

client/python/cli/command/namespaces.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,7 @@
2626
from cli.command import Command
2727
from cli.constants import Subcommands, Arguments, UNIT_SEPARATOR
2828
from cli.options.option_tree import Argument
29-
from polaris.catalog import (
30-
IcebergCatalogAPI,
31-
CreateNamespaceRequest,
32-
ApiClient,
33-
Configuration,
34-
)
29+
from polaris.catalog import IcebergCatalogAPI, CreateNamespaceRequest, ApiClient, Configuration
3530
from polaris.management import PolarisDefaultApi
3631

3732

client/python/cli/command/principal_roles.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,8 @@
2424
from cli.command import Command
2525
from cli.constants import Subcommands, Arguments
2626
from cli.options.option_tree import Argument
27-
from polaris.management import (
28-
PolarisDefaultApi,
29-
CreatePrincipalRoleRequest,
30-
PrincipalRole,
31-
UpdatePrincipalRoleRequest,
32-
GrantPrincipalRoleRequest,
33-
)
27+
from polaris.management import PolarisDefaultApi, CreatePrincipalRoleRequest, PrincipalRole, UpdatePrincipalRoleRequest, \
28+
GrantPrincipalRoleRequest
3429

3530

3631
@dataclass

0 commit comments

Comments
 (0)