Skip to content

Commit

Permalink
Align data source enrollment with Immuta defaults and fix tagging (#19)
Browse files Browse the repository at this point in the history
* Data sources now enroll by default under their original schema name and table name in the Query Engine
* Schema monitoring also enrolls data sources by default under their original table name
* Data source-level tags apply only to data sources created by the dataset configuration file in which they are defined, and use Unix shell-style wildcard matching
  • Loading branch information
zturechek authored Apr 9, 2021
1 parent d6e3416 commit b5273e4
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 74 deletions.
30 changes: 18 additions & 12 deletions doc/managing_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Note the following:

* Enabling or disabling schema evolution requires a schema value is present in the `schemas_to_bulk_enroll` key in the
configuration. **When enabling schema evolution for the first time for a remote database**, a bulk enroll of a
non-enrolled schema must be run to correctly create the schema evolution record in Immuta
non-enrolled schema must be run to correctly create the schema evolution record in Immuta.
* Data sources enrolled with schema evolution automatically have table evolution enabled. See the internal Immuta
documentation page titled "Schema Monitoring" for more information on table evolution.
* Schemas and tables are enrolled with the currently authenticated user running `fh-immuta-utils` as the owner
Expand All @@ -66,7 +66,7 @@ An example of a state file for a PostgreSQL database is as follows:
hostname: my-database.foo.com
port: 5439
database: db-name
# Prefix to prepend to name of data source created in Immuta
# Prefix to add to data source names and query engine table names created in Immuta
user_prefix:
handler_type: PostgreSQL
# List of schemas to enroll where for each schema,
Expand All @@ -78,22 +78,25 @@ schemas_to_enroll:
# List of schemas where we want to enroll all tables in each schema
schemas_to_bulk_enroll:
- schema_prefix: "baz"
# Schema evolution enablement and naming templates
query_engine_target_schema: "foobar" # Override target schema name in the Query Engine. Defaults to original schema
prefix_query_engine_names_with_handler: false # Prefix handler to query engine table names. Defaults to false
prefix_query_engine_names_with_schema: false # Prefix schema to query engine table names. Defaults to false
# Schema evolution enablement and naming templates (defaults shown below)
schema_evolution:
disable_schema_evolution: true
datasource_name_format: "<user_prefix>_<handler_prefix>_<schema>_<tablename>"
query_engine_table_name_format: "<user_prefix>_<handler_prefix>_<schema>_<tablename>"
query_engine_table_name_format: "<user_prefix>_<tablename>"
query_engine_schema_name_format: "<schema>"
credentials:
# Read from environment variable
source: ENV
key: USER_PASSWORD
username: service_user
# Tags to apply directly to data sources created by this config file.
# Key follows the pattern <prefix>_<schema>, where prefix matches with PREFIX_MAP in data_source.py
# Keys will match on data source names using Unix shell-style wildcard matching and apply the tags
tags:
pg_baz: ["tag1", "tag2.subtag2"]
pg_foo: ["tag3", "tag4"]
"pg_baz*": ["tag1", "tag2.subtag2"]
"pg_foo*": ["tag3", "tag4"]
```

**Note:** For AWS Redshift, use the same format as above, replacing the `handler_type` value with `Redshift`.
Expand All @@ -108,7 +111,7 @@ An example of a state file for an AWS Athena database is as follows:
region: us-east-1
hostname: us-east-1
database: my-database
# Prefix to prepend to name of data source created in Immuta
# Prefix to add to data source names and query engine table names created in Immuta
user_prefix:
handler_type: Amazon Athena
queryResultLocationBucket: bucket-where-results-should-be-stored
Expand All @@ -119,22 +122,25 @@ schemas_to_enroll:
# Will glob in database for all schemas starting with this prefix.
- schema_prefix: foo
table_prefix: bar
query_engine_target_schema: "foobar" # Override target schema name in the Query Engine. Defaults to original schema
prefix_query_engine_names_with_handler: false # Prefix handler to query engine table names. Defaults to false
prefix_query_engine_names_with_schema: false # Prefix schema to query engine table names. Defaults to false
# List of schemas where we want to enroll all tables in each schema
schemas_to_bulk_enroll:
# Schema evolution enablement and naming templates
# Schema evolution enablement and naming templates (defaults shown below)
schema_evolution:
disable_schema_evolution: true
datasource_name_format: "<user_prefix>_<handler_prefix>_<schema>_<tablename>"
query_engine_table_name_format: "<user_prefix>_<handler_prefix>_<schema>_<tablename>"
query_engine_table_name_format: "<user_prefix>_<tablename>"
query_engine_schema_name_format: "<schema>"
credentials:
# Read from an instance of Hashicorp Vault
source: VAULT
key: path/to/vault/secret
# Tags to apply directly to data sources created by this config file.
# Key follows the pattern <prefix>_<schema>, where prefix matches with PREFIX_MAP in data_source.py
# Keys will match on data source names using Unix shell-style wildcard matching and apply the tags
tags:
ath_foo: ["tag1", "tag2.subtag2"]
"ath_foo*": ["tag1", "tag2.subtag2"]
```

## Data Source Column Tags
Expand Down
56 changes: 34 additions & 22 deletions fh_immuta_utils/data_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Optional, List, Tuple
from typing import Dict, Any, Optional, List, Tuple, Union
import logging

from pydantic import BaseModel, Field
Expand Down Expand Up @@ -53,11 +53,9 @@ def make_immuta_datasource_name(
Returns a table name that's guaranteed to be unique and within the Immuta data source name max char limit (255)
"""
table_name = ""
if user_prefix:
table_name = f"{user_prefix}_"
table_name += f"{user_prefix}_" if user_prefix else ""
table_name += f"{PREFIX_MAP[handler_type]}_{schema}_{table}"
if table_name is None:
return None

if len(table_name) <= MAX_IMMUTA_NAME_LIMIT:
return table_name
import hashlib
Expand All @@ -72,12 +70,13 @@ def make_postgres_table_name(
handler_type: str, schema: str, table: str, user_prefix: Optional[str]
) -> str:
"""
Returns table name that has a shortened prefix and conforms to the Immuta-designated Postgres max char limit (255)
Returns table name that conforms to the Immuta-designated Postgres max char limit (255)
"""
table_name = ""
if user_prefix:
table_name = f"{user_prefix}_"
table_name += f"{PREFIX_MAP[handler_type]}_{schema}_{table}"
table_name += f"{user_prefix}_" if user_prefix else ""
table_name += f"{PREFIX_MAP[handler_type]}_" if handler_type else ""
table_name += f"{schema}_" if schema else ""
table_name += f"{table}"
if len(table_name) < MAX_POSTGRES_NAME_LIMIT:
return table_name
trunc_table_name = table_name[:MAX_POSTGRES_NAME_LIMIT]
Expand Down Expand Up @@ -163,8 +162,10 @@ class HandlerMetadata(BaseModel):
staleDataTolerance: int
# Don't know what this is. . .
# blobId: List[str]
# Table name that is exposed by Immuta
# Table name that is exposed by the Query Engine
bodataTableName: str = ""
# Schema name for the table in the Query Engine
bodataSchemaName: str = ""
# The name of the Data Source to which this handler corresponds
dataSourceName: str = ""
columns: Optional[List[DataSourceColumn]] = None
Expand Down Expand Up @@ -213,6 +214,9 @@ def make_bulk_create_objects(
schema: str,
tables: List[str],
user_prefix: Optional[str] = None,
bodata_schema_name: str = "",
prefix_query_engine_names_with_schema: Union[str, bool] = False,
prefix_query_engine_names_with_handler: Union[str, bool] = False,
) -> Tuple[DataSource, List[Handler], SchemaEvolutionMetadata]:
"""
Returns a (data source, metadata) tuple containing relevant details to bulk create new data
Expand All @@ -221,8 +225,10 @@ def make_bulk_create_objects(
handlers = []
for table in tables:
postgres_table_name = make_postgres_table_name(
handler_type=config["handler_type"],
schema=schema,
handler_type=config["handler_type"]
if prefix_query_engine_names_with_handler
else "",
schema=schema if prefix_query_engine_names_with_schema else "",
table=table,
user_prefix=user_prefix,
)
Expand All @@ -237,6 +243,7 @@ def make_bulk_create_objects(
schema=schema,
config=config,
bodataTableName=postgres_table_name,
bodataSchemaName=bodata_schema_name,
dataSourceName=immuta_datasource_name,
)
handlers.append(handler)
Expand All @@ -255,14 +262,19 @@ def to_immuta_objects(
table: str,
columns: List[DataSourceColumn],
user_prefix: Optional[str] = None,
bodata_schema_name: str = "",
prefix_query_engine_names_with_schema: Union[str, bool] = False,
prefix_query_engine_names_with_handler: Union[str, bool] = False,
) -> Tuple[DataSource, Handler, SchemaEvolutionMetadata]:
"""
Returns a tuple containing relevant details to create a new data source
in Immuta from the source schema
"""
postgres_table_name = make_postgres_table_name(
handler_type=config["handler_type"],
schema=schema,
handler_type=config["handler_type"]
if prefix_query_engine_names_with_handler
else "",
schema=schema if prefix_query_engine_names_with_schema else "",
table=table,
user_prefix=user_prefix,
)
Expand All @@ -278,6 +290,7 @@ def to_immuta_objects(
config=config,
columns=columns,
bodataTableName=postgres_table_name,
bodataSchemaName=bodata_schema_name,
dataSourceName=immuta_datasource_name,
)
ds = DataSource(
Expand Down Expand Up @@ -331,21 +344,19 @@ def make_handler_metadata(

def make_schema_evolution_metadata(config: Dict[str, Any]) -> SchemaEvolutionMetadata:
"""
Builds metadata for the schema evolution object. Immuta table name and SQL table name template defaults match the
pattern defined in make_table_name()
Builds metadata for the schema evolution object. Immuta data source name and Query Engine table name template
defaults match the patterns set in the Immuta UI.
:param config: dataset configuration dictionary
:return: SchemaEvolutionMetadata object
"""
user_prefix = ""
if config.get("prefix"):
user_prefix = f"{config.get('prefix')}_"
if config.get("user_prefix"):
user_prefix = f"{config.get('user_prefix')}_"
handler_prefix = PREFIX_MAP[config["handler_type"]]
datasource_name_format_default = (
f"{user_prefix}{handler_prefix}_<schema>_<tablename>"
)
query_engine_table_name_format_default = (
f"{user_prefix}{handler_prefix}_<schema>_<tablename>"
)
query_engine_table_name_format_default = f"{user_prefix}<tablename>"
query_engine_schema_name_format_default = "<schema>"

return SchemaEvolutionMetadata(
Expand All @@ -356,7 +367,8 @@ def make_schema_evolution_metadata(config: Dict[str, Any]) -> SchemaEvolutionMet
config=SchemaEvolutionMetadataConfig(
nameTemplate=SchemaEvolutionMetadataConfigTemplate(
dataSourceNameFormat=config.get("schema_evolution", {}).get(
"datasource_name_format", datasource_name_format_default
"datasource_name_format",
datasource_name_format_default,
),
queryEngineTableNameFormat=config.get("schema_evolution", {}).get(
"query_engine_table_name_format",
Expand Down
24 changes: 21 additions & 3 deletions fh_immuta_utils/scripts/manage_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,25 @@ def data_sources_enroll_iterator(
continue
LOGGER.info("Processing table: %s.%s", schema, table["tableName"])
handler = make_handler_metadata(
config=config, table=table["tableName"], schema=schema
config=config,
table=table["tableName"],
schema=schema,
)
columns = client.get_column_types(
config=config, data_source_type=config["handler_type"], handler=handler
)

data_source, handler, schema_evolution = to_immuta_objects(
schema=schema, table=table["tableName"], columns=columns, config=config
schema=schema,
table=table["tableName"],
columns=columns,
config=config,
bodata_schema_name=schema_obj.get("query_engine_target_schema", schema),
prefix_query_engine_names_with_schema=schema_obj.get(
"prefix_query_engine_names_with_schema", False
),
prefix_query_engine_names_with_handler=schema_obj.get(
"prefix_query_engine_names_with_handler", False
),
)
yield data_source, handler, schema_evolution

Expand All @@ -190,6 +201,13 @@ def data_sources_bulk_enroll_iterator(
tables=[table["tableName"] for table in tables],
config=config,
user_prefix=config.get("user_prefix"),
bodata_schema_name=schema_obj.get("query_engine_target_schema", schema),
prefix_query_engine_names_with_schema=schema_obj.get(
"prefix_query_engine_names_with_schema", False
),
prefix_query_engine_names_with_handler=schema_obj.get(
"prefix_query_engine_names_with_handler", False
),
)
yield data_source, handlers, schema_evolution

Expand Down
15 changes: 12 additions & 3 deletions fh_immuta_utils/scripts/tag_existing_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,26 @@ def main(config_file: str, search_text: str, dry_run: bool, debug: bool):
with Paginator(client.get_data_source_list, search_text=search_text) as paginator:
for data_source in paginator:
data_sources_to_tag.append(
{"id": data_source["id"], "name": data_source["name"]}
{
"id": data_source["id"],
"name": data_source["name"],
"handler_type": data_source["blobHandlerType"],
"connection_string": data_source["connectionString"],
}
)

progress_iterator = tqdm(data_sources_to_tag)
for data_source in progress_iterator:
progress_iterator.set_description(
desc=f"Tagging ID: {data_source['id']}, Name: {data_source['name']} :"
)
data_source_tags = tagger.get_tags_for_data_source(name=data_source["name"])
data_source_tags = tagger.get_tags_for_data_source(
name=data_source["name"],
handler_type=data_source["handler_type"],
connection_string=data_source["connection_string"],
)
if data_source_tags:
logging.debug(f"Adding data source tags to {data_source['name']}.")
logging.debug(f"\nAdding data source-level tags to {data_source['name']}.")
if not dry_run:
client.tag_data_source(id=data_source["id"], tag_data=data_source_tags)
dictionary = client.get_data_source_dictionary(id=data_source["id"])
Expand Down
Loading

0 comments on commit b5273e4

Please sign in to comment.