Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEN-309: OpenAPI Connector #17754

Merged
merged 14 commits into from
Sep 11, 2024
20 changes: 20 additions & 0 deletions ingestion/src/metadata/examples/workflows/rest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
source:
type: rest
serviceName: openapi_rest
serviceConnection:
config:
type: REST
openAPISchemaURL: https://docs.open-metadata.org/swagger.json
token: <optional_jwt_token>
sourceConfig:
config:
type: ApiMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
13 changes: 13 additions & 0 deletions ingestion/src/metadata/ingestion/api/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.apiService import (
ApiServiceConnection,
APIServiceType,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
DashboardServiceType,
Expand Down Expand Up @@ -53,6 +57,10 @@
StorageConnection,
StorageServiceType,
)
from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import (
ApiMetadataConfigType,
ApiServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardMetadataConfigType,
DashboardServiceMetadataPipeline,
Expand Down Expand Up @@ -127,6 +135,7 @@
# Build a service type map dynamically from JSON Schema covered types
SERVICE_TYPE_MAP = {
"Backend": PipelineConnection, # For Airflow backend
**{service: ApiServiceConnection for service in APIServiceType.__members__},
**{service: DatabaseConnection for service in DatabaseServiceType.__members__},
**{service: DashboardConnection for service in DashboardServiceType.__members__},
**{service: MessagingConnection for service in MessagingServiceType.__members__},
Expand All @@ -138,6 +147,7 @@
}

SOURCE_CONFIG_CLASS_MAP = {
ApiMetadataConfigType.ApiMetadata.value: ApiServiceMetadataPipeline,
DashboardMetadataConfigType.DashboardMetadata.value: DashboardServiceMetadataPipeline,
ProfilerConfigType.Profiler.value: DatabaseServiceProfilerPipeline,
DatabaseUsageConfigType.DatabaseUsage.value: DatabaseServiceQueryUsagePipeline,
Expand Down Expand Up @@ -173,6 +183,7 @@ class InvalidWorkflowException(Exception):
def get_service_type(
source_type: str,
) -> Union[
Type[ApiServiceConnection],
Type[DashboardConnection],
Type[DatabaseConnection],
Type[MessagingConnection],
Expand All @@ -196,6 +207,7 @@ def get_service_type(
def get_source_config_class(
source_config_type: str,
) -> Union[
Type[ApiServiceMetadataPipeline],
Type[DashboardServiceMetadataPipeline],
Type[DatabaseServiceProfilerPipeline],
Type[DatabaseServiceQueryUsagePipeline],
Expand All @@ -221,6 +233,7 @@ def get_source_config_class(
def get_connection_class(
source_type: str,
service_type: Union[
Type[ApiServiceConnection],
Type[DashboardConnection],
Type[DatabaseConnection],
Type[MessagingConnection],
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/ingestion/models/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import NewType, Union

from metadata.generated.schema.entity.services.apiService import ApiService
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
Expand All @@ -25,6 +26,7 @@
ServiceWithConnectionType = NewType(
"ServiceWithConnectionType",
Union[
ApiService,
DashboardService,
DatabaseService,
MessagingService,
Expand Down
206 changes: 206 additions & 0 deletions ingestion/src/metadata/ingestion/source/api/api_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting api services
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable, Set

from pydantic import Field
from typing_extensions import Annotated

from metadata.generated.schema.api.data.createAPICollection import (
CreateAPICollectionRequest,
)
from metadata.generated.schema.api.data.createAPIEndpoint import (
CreateAPIEndpointRequest,
)
from metadata.generated.schema.entity.data.apiCollection import APICollection
from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
from metadata.generated.schema.entity.services.apiService import (
ApiService,
ApiServiceConnection,
)
from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import (
ApiServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.delete import delete_entity_from_source
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContextManager,
TopologyNode,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class ApiServiceTopology(ServiceTopology):
"""
Defines the hierarchy in API Services.
service -> ApiCollection -> ApiEndpoint

We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""

root: Annotated[
TopologyNode, Field(description="Root node for the topology")
] = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=ApiService,
context="api_service",
processor="yield_create_request_api_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["api_collection"],
post_process=["mark_api_collections_as_deleted"],
)
api_collection: Annotated[
TopologyNode, Field(description="API Collection Processing Node")
] = TopologyNode(
producer="get_api_collections",
stages=[
NodeStage(
type_=APICollection,
context="api_collections",
processor="yield_api_collection",
consumer=["api_service"],
use_cache=True,
),
NodeStage(
type_=APIEndpoint,
context="api_endpoints",
processor="yield_api_endpoint",
consumer=["api_service"],
use_cache=True,
),
],
)


class ApiServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for API services.
It implements the topology and context
"""

source_config: ApiServiceMetadataPipeline
config: WorkflowSource
# Big union of types we want to fetch dynamically
service_connection: ApiServiceConnection.model_fields["config"].annotation

topology = ApiServiceTopology()
context = TopologyContextManager(topology)
api_collection_source_state: Set = set()
api_endpoint_source_state: Set = set()

def __init__(
self,
config: WorkflowSource,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata = metadata
self.service_connection = self.config.serviceConnection.root.config
self.source_config: ApiServiceMetadataPipeline = self.config.sourceConfig.config
self.connection = get_connection(self.service_connection)

# Flag the connection for the test connection
self.connection_obj = self.connection
self.test_connection()

self.client = self.connection

@property
def name(self) -> str:
return self.service_connection.type.name

def get_services(self) -> Iterable[WorkflowSource]:
yield self.config

def yield_create_request_api_service(self, config: WorkflowSource):
yield Either(
right=self.metadata.get_create_service_from_source(
entity=ApiService, config=config
)
)

@abstractmethod
def get_api_collections(self, *args, **kwargs) -> Iterable[Any]:
"""
Method to list all collections to process.
Here is where filtering happens
"""

@abstractmethod
def yield_api_collection(
self, *args, **kwargs
) -> Iterable[Either[CreateAPICollectionRequest]]:
"""Method to return api collection Entities"""

@abstractmethod
def yield_api_endpoint(
self, *args, **kwargs
) -> Iterable[Either[CreateAPIEndpointRequest]]:
"""Method to return api endpoint Entities"""

def close(self):
"""By default, nothing to close"""

def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)

def mark_api_collections_as_deleted(self) -> Iterable[Either[DeleteEntity]]:
"""Method to mark the api collection as deleted"""
if self.source_config.markDeletedApiCollections:
yield from delete_entity_from_source(
metadata=self.metadata,
entity_type=APICollection,
entity_source_state=self.api_collection_source_state,
mark_deleted_entity=self.source_config.markDeletedApiCollections,
params={"service": self.context.get().api_service},
)

def register_record(self, collection_request: CreateAPICollectionRequest) -> None:
"""
Mark the api collection record as scanned and update
the api_collection_source_state
"""
api_collection_fqn = fqn.build(
self.metadata,
entity_type=APICollection,
service_name=collection_request.service.root,
api_collection_name=collection_request.name.root,
)

self.api_collection_source_state.add(api_collection_fqn)

def prepare(self):
"""By default, nothing to prepare"""
87 changes: 87 additions & 0 deletions ingestion/src/metadata/ingestion/source/api/rest/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Source connection handler
"""
from typing import Optional

import requests
from requests.models import Response

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.apiService.restConnection import (
RESTConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata


class SchemaURLError(Exception):
"""
Class to indicate schema url is invalid
"""


class InvalidOpenAPISchemaError(Exception):
"""
Class to indicate openapi schema is invalid
"""


def get_connection(connection: RESTConnection) -> Response:
"""
Create connection
"""
if connection.token:
headers = {"Authorization": f"Bearer {connection.token.get_secret_value()}"}
return requests.get(connection.openAPISchemaURL, headers=headers)
return requests.get(connection.openAPISchemaURL)


def test_connection(
metadata: OpenMetadata,
client: Response,
service_connection: RESTConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""

def custom_url_exec():
if (
"application/json" in client.headers.get("content-type")
and client.status_code == 200
):
return []
raise SchemaURLError(
"Failed to parse JSON schema url. Please check if provided url is valid JSON schema."
)

def custom_schema_exec():
if client.json().get("openapi") is not None:
return []
raise InvalidOpenAPISchemaError(
"Provided schema is not valid OpenAPI JSON schema"
)

test_fn = {"CheckURL": custom_url_exec, "CheckSchema": custom_schema_exec}

test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)
Loading
Loading