Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 13823: Validate and Parse gracefully IngestionPipeline #14461

Merged
merged 3 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 79 additions & 26 deletions ingestion/src/metadata/ingestion/api/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
"""
Helper to parse workflow configurations
"""
from typing import Optional, Type, TypeVar, Union
from typing import Type, TypeVar, Union

from pydantic import BaseModel, ValidationError

from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
Expand All @@ -26,6 +26,9 @@
DatabaseConnection,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
MessagingServiceType,
Expand Down Expand Up @@ -270,13 +273,11 @@ def _parse_inner_connection(config_dict: dict, source_type: str) -> None:
:param config_dict: JSON configuration
:param source_type: source type name, e.g., Airflow.
"""
inner_source_type = config_dict["source"]["serviceConnection"]["config"][
"connection"
]["type"]
inner_source_type = config_dict["type"]
inner_service_type = get_service_type(inner_source_type)
inner_connection_class = get_connection_class(inner_source_type, inner_service_type)
_unsafe_parse_config(
config=config_dict["source"]["serviceConnection"]["config"]["connection"],
config=config_dict,
cls=inner_connection_class,
message=f"Error parsing the inner service connection for {source_type}",
)
Expand All @@ -303,7 +304,12 @@ def parse_service_connection(config_dict: dict) -> None:

if source_type in HAS_INNER_CONNECTION:
# We will first parse the inner `connection` configuration
_parse_inner_connection(config_dict, source_type)
_parse_inner_connection(
config_dict["source"]["serviceConnection"]["config"]["connection"][
"config"
]["connection"],
source_type,
)

# Parse the service connection dictionary with the scoped class
_unsafe_parse_config(
Expand Down Expand Up @@ -400,37 +406,84 @@ def parse_workflow_config_gracefully(
raise ParsingConfigurationError("Uncaught error when parsing the workflow!")


def parse_test_connection_request_gracefully(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the method since it wasn't being used anywhere except for tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

def parse_ingestion_pipeline_config_gracefully(
config_dict: dict,
) -> Optional[TestServiceConnectionRequest]:
) -> IngestionPipeline:
"""
This function either correctly parses the pydantic class,
or throws a scoped error while fetching the required source
connection class
This function either correctly parses the pydantic class, or
throws a scoped error while fetching the required source connection
class.

:param config_dict: JSON workflow config
:return: TestServiceConnectionRequest or scoped error
:param config_dict: JSON ingestion pipeline config
:return:Ingestion Pipeline config or scoped error
"""

try:
test_service_connection = TestServiceConnectionRequest.parse_obj(config_dict)
return test_service_connection
ingestion_pipeline = IngestionPipeline.parse_obj(config_dict)
return ingestion_pipeline

except ValidationError as err:
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
source_type = config_dict["connection"]["config"]["type"]
logger.warning(
f"Error parsing the Workflow Configuration for {source_type} ingestion: {err}"
except ValidationError:
source_config_type = config_dict["sourceConfig"]["config"].get("type")

if source_config_type is None:
raise InvalidWorkflowException("Missing type in the sourceConfig config")

source_config_class = get_source_config_class(source_config_type)

_unsafe_parse_config(
config=config_dict["sourceConfig"]["config"],
cls=source_config_class,
message="Error parsing the source config",
)

raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)


def parse_automation_workflow_gracefully(
config_dict: dict,
) -> AutomationWorkflow:
"""
This function either correctly parses the pydantic class, or
throws a scoped error while fetching the required source connection
class.

:param config_dict: JSON AutomationWorkflow config
:return: AutomationWorkflow config or scoped error
"""

try:
automation_workflow = AutomationWorkflow.parse_obj(config_dict)
return automation_workflow

except ValidationError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we take care of the serviceConnection? Not only the sourceConfig

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's where we get also a lot of noise, for example when we configure an URI without the scheme or something

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, that was the wrong comment. We'll be adding it to the run_automations endpoint though

source_type = config_dict["request"]["connection"]["config"].get("type")

if source_type is None:
raise InvalidWorkflowException("Missing type in the connection config")

logger.debug(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
)

service_type = get_service_type(source_type)
connection_class = get_connection_class(source_type, service_type)

# Parse the dictionary with the scoped class
if source_type in HAS_INNER_CONNECTION:
# We will first parse the inner `connection` configuration
_parse_inner_connection(
config_dict["request"]["connection"]["config"]["connection"],
source_type,
)

# Parse the service connection dictionary with the scoped class
_unsafe_parse_config(
config=config_dict["connection"]["config"],
config=config_dict["request"]["connection"]["config"],
cls=connection_class,
message="Error parsing the connection config",
message="Error parsing the service connection",
)

raise ParsingConfigurationError("Uncaught error when parsing the workflow!")
raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
IngestionPipeline,
PipelineStatus,
)
from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully
from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger

Expand Down Expand Up @@ -79,7 +80,7 @@ def run_pipeline(self, ingestion_pipeline_id: str) -> IngestionPipeline:
f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}"
)

return IngestionPipeline.parse_obj(resp)
return parse_ingestion_pipeline_config_gracefully(resp)

def get_pipeline_status_between_ts(
self,
Expand Down Expand Up @@ -125,6 +126,6 @@ def get_ingestion_pipeline_by_name(
)

if hasattr(resp, "sourceConfig"):
return IngestionPipeline.parse_obj(resp)
return parse_ingestion_pipeline_config_gracefully(resp)

return None
Loading
Loading