Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Docker Host Retry #19127

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions ingestion/src/metadata/automations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.ingestion.connections.test_connections import (
raise_test_connection_exception,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init
Expand Down Expand Up @@ -60,19 +63,40 @@ def _(
"""
Run the test connection
"""

ssl_manager = None
ssl_manager: SSLManager = check_ssl_and_init(request.connection.config)
if ssl_manager:
request.connection.config = ssl_manager.setup_ssl(request.connection.config)

connection = get_connection(request.connection.config)

# Find the test_connection function in each <source>/connection.py file
test_connection_fn = get_test_connection_fn(request.connection.config)
test_connection_fn(
metadata, connection, request.connection.config, automation_workflow
)

try:
connection = get_connection(request.connection.config)

host_port_str = str(request.connection.config.hostPort or "")
if "localhost" in host_port_str:
result = test_connection_fn(metadata, connection, request.connection.config)
raise_test_connection_exception(result)

test_connection_fn(
metadata, connection, request.connection.config, automation_workflow
)
except Exception as error:
host_port_str = str(getattr(request.connection.config, "hostPort", None) or "")
if "localhost" not in host_port_str:
raise error

host_port_type = type(request.connection.config.hostPort)
docker_host_port_str = host_port_str.replace(
"localhost", "host.docker.internal"
)
request.connection.config.hostPort = host_port_type(docker_host_port_str)

connection = get_connection(request.connection.config)
test_connection_fn(
metadata, connection, request.connection.config, automation_workflow
)

if ssl_manager:
ssl_manager.cleanup_temp_files()
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ class DashboardServiceTopology(ServiceTopology):
)


from metadata.utils.helpers import retry_with_docker_host


# pylint: disable=too-many-public-methods
class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Expand All @@ -216,6 +219,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
dashboard_source_state: Set = set()
datamodel_source_state: Set = set()

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
from metadata.utils import fqn
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_ddls,
Expand Down Expand Up @@ -162,9 +163,11 @@ def get_columns(bq_schema):
"precision": field.precision,
"scale": field.scale,
"max_length": field.max_length,
"system_data_type": _array_sys_data_type_repr(col_type)
if str(col_type) == "ARRAY"
else str(col_type),
"system_data_type": (
_array_sys_data_type_repr(col_type)
if str(col_type) == "ARRAY"
else str(col_type)
),
"is_complex": is_complex_type(str(col_type)),
"policy_tags": None,
}
Expand Down Expand Up @@ -223,6 +226,7 @@ class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
Database metadata from Bigquery Source
"""

@retry_with_docker_host()
def __init__(self, config, metadata, incremental_configuration: IncrementalConfig):
# Check if the engine is established before setting project IDs
# This ensures that we don't try to set project IDs when there is no engine
Expand Down Expand Up @@ -685,9 +689,11 @@ def get_table_partition_details(
return True, TablePartition(
columns=[
PartitionColumnDetails(
columnName="_PARTITIONTIME"
if table.time_partitioning.type_ == "HOUR"
else "_PARTITIONDATE",
columnName=(
"_PARTITIONTIME"
if table.time_partitioning.type_ == "HOUR"
else "_PARTITIONDATE"
),
interval=str(table.time_partitioning.type_),
intervalType=PartitionIntervalTypes.INGESTION_TIME,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
calculate_execution_time_generator,
)
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger
from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init

Expand Down Expand Up @@ -108,6 +109,7 @@ class CommonDbSourceService(
- fetch_column_tags implemented at SqlColumnHandler. Sources should override this when needed
"""

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.datalake.datalake_utils import DataFrameColumnParser
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger
from metadata.utils.ssl_manager import check_ssl_and_init

Expand All @@ -79,6 +80,7 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
Database metadata from NoSQL source
"""

@retry_with_docker_host()
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__()
self.config = config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ def get_database_names(self) -> Iterable[str]:
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else database_name,
(
database_fqn
if self.source_config.useFqnForFiltering
else database_name
),
):
self.status.filter(database_fqn, "Database Filtered out")
else:
Expand Down Expand Up @@ -180,9 +182,11 @@ def get_database_schema_names(self) -> Iterable[str]:

if filter_by_schema(
self.config.sourceConfig.config.schemaFilterPattern,
schema_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else schema_name,
(
schema_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else schema_name
),
):
self.status.filter(schema_fqn, "Bucket Filtered Out")
continue
Expand Down Expand Up @@ -352,9 +356,11 @@ def filter_dl_table(self, table_name: str):

if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
table_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else table_name,
(
table_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else table_name
),
):
self.status.filter(
table_fqn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.utils.helpers import get_start_and_end
from metadata.utils.helpers import get_start_and_end, retry_with_docker_host
from metadata.utils.logger import ingestion_logger
from metadata.utils.ssl_manager import get_ssl_connection

Expand All @@ -49,6 +49,7 @@ class QueryParserSource(Source, ABC):
database_field: str
schema_field: str

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand All @@ -64,9 +65,11 @@ def __init__(
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
self.source_config = self.config.sourceConfig.config
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)
self.engine = (
get_ssl_connection(self.service_connection) if get_engine else None
)

self.engine = None
if get_engine:
self.engine = get_ssl_connection(self.service_connection)
self.test_connection()

@property
def name(self) -> str:
Expand Down Expand Up @@ -129,5 +132,5 @@ def close(self):

def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
result = test_connection_fn(self.engine)
result = test_connection_fn(self.metadata, self.engine, self.service_connection)
raise_test_connection_exception(result)
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ def parse_cdata(

def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
result = test_connection_fn(self.engine)
result = test_connection_fn(self.metadata, self.engine, self.service_connection)
raise_test_connection_exception(result)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
from metadata.ingestion.source.database.unitycatalog.models import LineageTableStreams
from metadata.utils import fqn
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -50,6 +51,7 @@ class UnitycatalogLineageSource(Source):
Lineage Unity Catalog Source
"""

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -89,6 +90,7 @@ class UnitycatalogSource(
the unity catalog source
"""

@retry_with_docker_host()
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__()
self.config = config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils import fqn
from metadata.utils.filters import filter_by_topic
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand Down Expand Up @@ -125,6 +126,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
context = TopologyContextManager(topology)
topic_source_state: Set = set()

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -71,6 +72,7 @@ class AlationsinkSource(Source):
config: WorkflowSource
alation_sink_client: AlationSinkClient

@retry_with_docker_host()
def __init__(
self,
config: WorkflowSource,
Expand Down Expand Up @@ -117,9 +119,11 @@ def create_datasource_request(
),
),
db_username="Test",
title=om_database.displayName
if om_database.displayName
else model_str(om_database.name),
title=(
om_database.displayName
if om_database.displayName
else model_str(om_database.name)
),
description=model_str(om_database.description),
)
except Exception as exc:
Expand All @@ -140,9 +144,11 @@ def create_schema_request(
key=fqn._build( # pylint: disable=protected-access
str(alation_datasource_id), model_str(om_schema.name)
),
title=om_schema.displayName
if om_schema.displayName
else model_str(om_schema.name),
title=(
om_schema.displayName
if om_schema.displayName
else model_str(om_schema.name)
),
description=model_str(om_schema.description),
)
except Exception as exc:
Expand All @@ -163,9 +169,11 @@ def create_table_request(
key=fqn._build( # pylint: disable=protected-access
str(alation_datasource_id), schema_name, model_str(om_table.name)
),
title=om_table.displayName
if om_table.displayName
else model_str(om_table.name),
title=(
om_table.displayName
if om_table.displayName
else model_str(om_table.name)
),
description=model_str(om_table.description),
table_type=TABLE_TYPE_MAPPER.get(om_table.tableType, "TABLE"),
sql=om_table.schemaDefinition,
Expand Down Expand Up @@ -273,16 +281,22 @@ def create_column_request(
table_name,
model_str(om_column.name),
),
column_type=om_column.dataTypeDisplay.lower()
if om_column.dataTypeDisplay
else om_column.dataType.value.lower(),
title=om_column.displayName
if om_column.displayName
else model_str(om_column.name),
column_type=(
om_column.dataTypeDisplay.lower()
if om_column.dataTypeDisplay
else om_column.dataType.value.lower()
),
title=(
om_column.displayName
if om_column.displayName
else model_str(om_column.name)
),
description=model_str(om_column.description),
position=str(om_column.ordinalPosition)
if om_column.ordinalPosition
else None,
position=(
str(om_column.ordinalPosition)
if om_column.ordinalPosition
else None
),
index=self._get_column_index(
alation_datasource_id, om_column, table_constraints
),
Expand Down
Loading
Loading