From 8219062d5f32e3305f2c5fa5ba3eba120cb10170 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Sat, 17 Jun 2023 23:47:10 +0100 Subject: [PATCH 01/11] global manifest option for storage services --- .../ingestion/source/storage/s3/metadata.py | 99 ++++++++++---- .../source/storage/storage_service.py | 11 +- .../metadata/utils/storage_metadata_config.py | 122 ++++++++++++++++++ .../storage/manifestMetadataConfig.json | 62 +++++++++ .../storage/storageBucketDetails.json | 22 ++++ .../storage/storageMetadataHttpConfig.json | 17 +++ .../storage/storageMetadataLocalConfig.json | 17 +++ .../storage/storageMetadataS3Config.json | 20 +++ .../storageServiceMetadataPipeline.json | 15 +++ 9 files changed, 361 insertions(+), 24 deletions(-) create mode 100644 ingestion/src/metadata/utils/storage_metadata_config.py create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageBucketDetails.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataHttpConfig.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataLocalConfig.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataS3Config.json diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index a3fbbc34d989..5ea797b1a8d2 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -39,6 +39,9 @@ MetadataEntry, StorageContainerConfig, ) +from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import ( + ManifestMetadataConfig, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -92,51 +95,58 @@ def create(cls, config_dict, metadata_config: OpenMetadataConnection): return cls(config, metadata_config) def get_containers(self) -> Iterable[S3ContainerDetails]: + global_manifest: Optional[ManifestMetadataConfig] = self.get_manifest_file() bucket_results = self.fetch_buckets() for bucket_response in bucket_results: + bucket_name = bucket_response.name try: - - # We always try to generate the parent container (the bucket) + # We always generate the parent container (the bucket) yield self._generate_unstructured_container( bucket_response=bucket_response ) - self._bucket_cache[bucket_response.name] = self.context.container - - metadata_config = self._load_metadata_file( - bucket_name=bucket_response.name + self._bucket_cache[bucket_name] = self.context.container + parent_entity: EntityReference = EntityReference( + id=self._bucket_cache[bucket_name].id.__root__, type="container" ) - if metadata_config: - for metadata_entry in metadata_config.entries: - logger.info( - f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " - f"and generating structured container" + if global_manifest: + manifest_entries_for_current_bucket = ( + self._manifest_entries_to_metadata_entries_by_bucket( + bucket=bucket_name, manifest=global_manifest ) - structured_container: Optional[ - S3ContainerDetails - ] = self._generate_container_details( + ) + # Check if we have entries in the manifest file belonging to this bucket + if manifest_entries_for_current_bucket: + # ingest all the relevant valid paths from it + yield from self._generate_structured_containers( bucket_response=bucket_response, - metadata_entry=metadata_entry, - parent=EntityReference( - id=self._bucket_cache[bucket_response.name].id.__root__, - type="container", + entries=self._manifest_entries_to_metadata_entries_by_bucket( + bucket=bucket_name, manifest=global_manifest ), + parent=parent_entity, ) - if structured_container: - yield structured_container - + # nothing else do to for the current bucket, skipping to the next + continue + # If no global file, or no valid entries in the manifest, check for bucket level metadata file + metadata_config = self._load_metadata_file(bucket_name=bucket_name) + if metadata_config: + yield from self._generate_structured_containers( + bucket_response=bucket_response, + entries=metadata_config.entries, + parent=parent_entity, + ) except ValidationError as err: error = f"Validation error while creating Container from bucket details - {err}" logger.debug(traceback.format_exc()) logger.warning(error) - self.status.failed(bucket_response.name, error, traceback.format_exc()) + self.status.failed(bucket_name, error, traceback.format_exc()) except Exception as err: error = ( f"Wild error while creating Container from bucket details - {err}" ) logger.debug(traceback.format_exc()) logger.warning(error) - self.status.failed(bucket_response.name, error, traceback.format_exc()) + self.status.failed(bucket_name, error, traceback.format_exc()) def yield_create_container_requests( self, container_details: S3ContainerDetails @@ -188,6 +198,30 @@ def _generate_container_details( ) return None + def _generate_structured_containers( + self, + bucket_response: S3BucketResponse, + entries: List[MetadataEntry], + parent: Optional[EntityReference] = None, + ) -> List[S3ContainerDetails]: + result: List[S3ContainerDetails] = [] + for metadata_entry in entries: + logger.info( + f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " + f"and generating structured container" + ) + structured_container: Optional[ + S3ContainerDetails + ] = self._generate_container_details( + bucket_response=bucket_response, + metadata_entry=metadata_entry, + parent=parent, + ) + if structured_container: + result.append(structured_container) + + return result + def _get_columns( self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry ) -> Optional[List[Column]]: @@ -326,6 +360,25 @@ def _generate_unstructured_container( data_model=None, ) + @staticmethod + def _manifest_entries_to_metadata_entries_by_bucket( + bucket: str, manifest: ManifestMetadataConfig + ) -> List[MetadataEntry]: + """ + Convert manifest entries(which have an extra bucket property) to bucket-level metadata entries, filtered by + a given bucket + """ + return [ + MetadataEntry( + dataPath=entry.dataPath, + structureFormat=entry.structureFormat, + isPartitioned=entry.isPartitioned, + partitionColumns=entry.partitionColumns, + ) + for entry in manifest.entries + if entry.bucketName == bucket + ] + @staticmethod def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]: """ diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 4e2415488fd8..7f1020910535 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -12,7 +12,7 @@ Base class for ingesting Object Storage services """ from abc import ABC, abstractmethod -from typing import Any, Iterable +from typing import Any, Iterable, Optional from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.entity.data.container import Container @@ -23,6 +23,9 @@ StorageConnection, StorageService, ) +from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import ( + ManifestMetadataConfig, +) from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( StorageServiceMetadataPipeline, ) @@ -41,6 +44,7 @@ from metadata.ingestion.source.connections import get_connection, get_test_connection_fn from metadata.ingestion.source.storage.s3.connection import S3ObjectStoreClient from metadata.utils.logger import ingestion_logger +from metadata.utils.storage_metadata_config import get_manifest logger = ingestion_logger() @@ -109,6 +113,11 @@ def __init__( self.connection_obj = self.connection self.test_connection() + def get_manifest_file(self) -> Optional[ManifestMetadataConfig]: + if self.source_config.storageMetadataConfigSource: + return get_manifest(self.source_config.storageMetadataConfigSource) + return None + @abstractmethod def get_containers(self) -> Iterable[Any]: """ diff --git a/ingestion/src/metadata/utils/storage_metadata_config.py b/ingestion/src/metadata/utils/storage_metadata_config.py new file mode 100644 index 000000000000..f1f596c56dfe --- /dev/null +++ b/ingestion/src/metadata/utils/storage_metadata_config.py @@ -0,0 +1,122 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Hosts the singledispatch to get Storage Metadata manifest file +""" +import json +import traceback +from functools import singledispatch + +import requests +from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import ( + ManifestMetadataConfig, +) + +from metadata.generated.schema.metadataIngestion.storage.storageMetadataLocalConfig import ( + StorageMetadataLocalConfig, +) +from metadata.generated.schema.metadataIngestion.storage.storageMetadataHttpConfig import ( + StorageMetadataHttpConfig, +) +from metadata.generated.schema.metadataIngestion.storage.storageMetadataS3Config import ( + StorageMetadataS3Config, +) +from metadata.utils.logger import ometa_logger + +logger = ometa_logger() + +STORAGE_METADATA_MANIFEST_FILE_NAME = "storage_metadata_manifest.json" + + +class StorageMetadataConfigException(Exception): + """ + Raise when encountering errors while extracting storage metadata manifest file + """ + + +@singledispatch +def get_manifest(config): + """ + Single dispatch method to get the Storage Metadata manifest file from different sources + """ + + if config: + raise NotImplementedError( + f"Config not implemented for type {type(config)}: {config}" + ) + + +@get_manifest.register +def _(config: StorageMetadataLocalConfig) -> ManifestMetadataConfig: + try: + if config.manifestFilePath is not None: + logger.debug(f"Reading [manifestFilePath] from: {config.manifestFilePath}") + with open(config.manifestFilePath, "r", encoding="utf-8") as manifest: + metadata_manifest = manifest.read() + return ManifestMetadataConfig.parse_obj(json.loads(metadata_manifest)) + except Exception as exc: + logger.debug(traceback.format_exc()) + raise StorageMetadataConfigException( + f"Error fetching manifest file from local: {exc}" + ) + + +@get_manifest.register +def _(config: StorageMetadataHttpConfig) -> ManifestMetadataConfig: + try: + logger.debug(f"Requesting [dbtManifestHttpPath] to: {config.manifestHttpPath}") + http_manifest = requests.get( # pylint: disable=missing-timeout + config.manifestHttpPath + ) + if not http_manifest: + raise StorageMetadataConfigException( + "Manifest file not found in file server" + ) + return ManifestMetadataConfig.parse_obj(http_manifest.json()) + except StorageMetadataConfigException as exc: + raise exc + except Exception as exc: + logger.debug(traceback.format_exc()) + raise StorageMetadataConfigException( + f"Error fetching manifest file from file server: {exc}" + ) + + +@get_manifest.register +def _(config: StorageMetadataS3Config) -> ManifestMetadataConfig: + manifest = None + try: + bucket_name, prefix = ( + config.prefixConfig.bucketName, + config.prefixConfig.objectPrefix, + ) + from metadata.clients.aws_client import ( # pylint: disable=import-outside-toplevel + AWSClient, + ) + + aws_client = AWSClient(config.securityConfig).get_resource("s3") + bucket = aws_client.Bucket(bucket_name) + obj_list = bucket.objects.filter(Prefix=prefix) + for bucket_object in obj_list: + if STORAGE_METADATA_MANIFEST_FILE_NAME in bucket_object.key: + logger.debug(f"{STORAGE_METADATA_MANIFEST_FILE_NAME} found") + manifest = bucket_object.get()["Body"].read().decode() + break + if not manifest: + raise StorageMetadataConfigException("Manifest file not found in s3") + return ManifestMetadataConfig.parse_obj(json.loads(manifest)) + except StorageMetadataConfigException as exc: + raise exc + except Exception as exc: + logger.debug(traceback.format_exc()) + raise StorageMetadataConfigException( + f"Error fetching manifest file from s3: {exc}" + ) diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json new file mode 100644 index 000000000000..501c4046a03b --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json @@ -0,0 +1,62 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/manifestMetadataConfig.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ManifestMetadataConfig", + "description": "Definition of the manifest file containing entries to be ingested across multiple buckets as object storage entries", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.ManifestMetadataConfig", + "definitions": { + "manifestMetadataEntry": { + "description": "Config properties for a container found in a user-supplied metadata config", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.ManifestMetadataEntry", + "type": "object", + "properties": { + "bucketName": { + "title": "Bucket Name", + "description": "The bucket name containing the data path to be ingested", + "type": "string" + }, + "dataPath": { + "title": "Data path", + "description": "The path where the data resides in the container, excluding the bucket name", + "type": "string" + }, + "structureFormat": { + "title": "Schema format", + "description": "What's the schema format for the container, eg. avro, parquet, csv.", + "type": "string", + "default": null + }, + "isPartitioned": { + "title": "Is Partitioned", + "description": "Flag indicating whether the container's data is partitioned", + "type": "boolean", + "default": false + }, + "partitionColumns": { + "title": "Partition Columns", + "description": "What are the partition columns in case the container's data is partitioned", + "type": "array", + "items": { + "$ref": "../../entity/data/table.json#/definitions/column" + }, + "default": null + } + }, + "required": [ + "bucketName" ,"dataPath" + ] + } + }, + "properties": { + "entries": { + "description": "List of metadata entries for the bucket containing information about where data resides and its structure", + "type": "array", + "items": { + "$ref": "#/definitions/manifestMetadataEntry" + }, + "default": null + } + }, + "required": ["entries"], + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageBucketDetails.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageBucketDetails.json new file mode 100644 index 000000000000..ca0223f22053 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageBucketDetails.json @@ -0,0 +1,22 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/storage/storageBucketDetails.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Storage Metadata Bucket Details", + "description": "Details of the bucket where the storage metadata manifest file is stored", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataBucketDetails", + "type": "object", + "properties": { + "bucketName": { + "title": "Storage Metadata Bucket Name", + "description": "Name of the bucket where the storage metadata file is stored", + "type": "string" + }, + "objectPrefix": { + "title": "Storage Metadata Object Prefix", + "description": "Path of the folder where the storage metadata file is stored, '/' for root", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["bucketName", "objectPrefix"] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataHttpConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataHttpConfig.json new file mode 100644 index 000000000000..2c8cbde62308 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataHttpConfig.json @@ -0,0 +1,17 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/storage/storageMetadataHttpConfig.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Storage Metadata Http Config", + "description": "Storage Metadata Manifest file HTTP path config.", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataHttpConfig", + "type": "object", + "properties": { + "manifestHttpPath": { + "title": "Storage Metadata Manifest HTTP Path", + "description": "Storage Metadata manifest http file path to extract locations to ingest from.", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["manifestHttpPath"] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataLocalConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataLocalConfig.json new file mode 100644 index 000000000000..4d7ef83debbd --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataLocalConfig.json @@ -0,0 +1,17 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/storage/storageMetadataLocalConfig.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Storage Metadata Local Config", + "description": "Storage Metadata Manifest file path config.", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataLocalConfig", + "type": "object", + "properties": { + "manifestFilePath": { + "title": "Storage Metadata Manifest File Path", + "description": "Storage Metadata manifest file path to extract locations to ingest from.", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["manifestFilePath"] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataS3Config.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataS3Config.json new file mode 100644 index 000000000000..7c851651178d --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/storageMetadataS3Config.json @@ -0,0 +1,20 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/storage/storageMetadataS3Config.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Storage Metadata S3 Config", + "description": "Storage Metadata Manifest file S3 path config.", + "javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataS3Config", + "type": "object", + "properties": { + "securityConfig": { + "title": "S3 Security Config", + "$ref": "../../security/credentials/awsCredentials.json" + }, + "prefixConfig": { + "title": "Storage Metadata Prefix Config", + "$ref": "./storageBucketDetails.json" + } + }, + "additionalProperties": false, + "required": ["prefixConfig"] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json index 6d1261f1e620..b7ac98c2a913 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json @@ -20,6 +20,21 @@ "containerFilterPattern": { "description": "Regex to only fetch containers that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "storageMetadataConfigSource": { + "mask": true, + "title": "Storage Metadata Configuration Source", + "oneOf": [ + { + "$ref": "./storage/storageMetadataLocalConfig.json" + }, + { + "$ref": "./storage/storageMetadataHttpConfig.json" + }, + { + "$ref": "./storage/storageMetadataS3Config.json" + } + ] } }, "additionalProperties": false From 7bbc570667d14800c870d6f7cc0b072f2277a4e8 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 19 Jun 2023 11:53:31 +0100 Subject: [PATCH 02/11] added a no metadata config source option for global manifest s3 services option --- .../ingestion/source/storage/storage_service.py | 5 +++-- .../storageServiceMetadataPipeline.json | 12 +++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 7f1020910535..13d7d7dadb63 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -27,7 +27,7 @@ ManifestMetadataConfig, ) from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( - StorageServiceMetadataPipeline, + StorageServiceMetadataPipeline, NoMetadataConfigurationSource, ) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, @@ -114,7 +114,8 @@ def __init__( self.test_connection() def get_manifest_file(self) -> Optional[ManifestMetadataConfig]: - if self.source_config.storageMetadataConfigSource: + if self.source_config.storageMetadataConfigSource and not isinstance( + self.source_config.storageMetadataConfigSource, NoMetadataConfigurationSource): return get_manifest(self.source_config.storageMetadataConfigSource) return None diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json index b7ac98c2a913..c97369e2797a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json @@ -9,6 +9,12 @@ "type": "string", "enum": ["StorageMetadata"], "default": "StorageMetadata" + }, + "noMetadataConfigurationSource": { + "title": "No Global Manifest", + "description": "No manifest file available. Ingestion would look for bucket-level metadata file instead", + "type": "object", + "additionalProperties": false } }, "properties": { @@ -25,6 +31,9 @@ "mask": true, "title": "Storage Metadata Configuration Source", "oneOf": [ + { + "$ref": "#/definitions/noMetadataConfigurationSource" + }, { "$ref": "./storage/storageMetadataLocalConfig.json" }, @@ -37,5 +46,6 @@ ] } }, - "additionalProperties": false + "additionalProperties": false, + "required": ["storageMetadataConfigSource"] } From bc1e52a8b90f6f1f01e9118292ee9f0739f90a77 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Sep 2023 12:56:56 +0100 Subject: [PATCH 03/11] merge fixes --- .../ingestion/source/storage/s3/metadata.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 3d22dd67d4c5..fba2406ad198 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -366,23 +366,23 @@ def _generate_unstructured_container( ) @staticmethod - def _manifest_entries_to_metadata_entries_by_bucket( - bucket: str, manifest: ManifestMetadataConfig - ) -> List[MetadataEntry]: - """ - Convert manifest entries(which have an extra bucket property) to bucket-level metadata entries, filtered by - a given bucket - """ - return [ - MetadataEntry( - dataPath=entry.dataPath, - structureFormat=entry.structureFormat, - isPartitioned=entry.isPartitioned, - partitionColumns=entry.partitionColumns, - ) - for entry in manifest.entries - if entry.bucketName == bucket - ] + def _manifest_entries_to_metadata_entries_by_bucket( + bucket: str, manifest: ManifestMetadataConfig + ) -> List[MetadataEntry]: + """ + Convert manifest entries(which have an extra bucket property) to bucket-level metadata entries, filtered by + a given bucket + """ + return [ + MetadataEntry( + dataPath=entry.dataPath, + structureFormat=entry.structureFormat, + isPartitioned=entry.isPartitioned, + partitionColumns=entry.partitionColumns, + ) + for entry in manifest.entries + if entry.bucketName == bucket + ] def _get_sample_file_path( self, bucket_name: str, metadata_entry: MetadataEntry @@ -489,4 +489,4 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf logger.warning( f"Failed loading metadata file s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}-{exc}" ) - return None \ No newline at end of file + return None From d822a396ba04a7f6a7a19d000fbf4e29a5411300 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Sep 2023 18:46:21 +0100 Subject: [PATCH 04/11] more merge fixes. --- .../ingestion/source/storage/s3/metadata.py | 33 +------------------ .../source/storage/storage_service.py | 16 ++++++--- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index fba2406ad198..48ba9bfdb638 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -239,7 +239,7 @@ def _generate_structured_containers( result: List[S3ContainerDetails] = [] for metadata_entry in entries: logger.info( - f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " + f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} " f"and generating structured container" ) structured_container: Optional[ @@ -254,37 +254,6 @@ def _generate_structured_containers( return result - def _get_columns( - self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry - ) -> Optional[List[Column]]: - """ - Get the columns from the file and partition information - """ - extracted_cols = self.extract_column_definitions(bucket_name, sample_key) - return (metadata_entry.partitionColumns or []) + (extracted_cols or []) - - def extract_column_definitions( - self, bucket_name: str, sample_key: str - ) -> List[Column]: - """ - Extract Column related metadata from s3 - """ - connection_args = self.service_connection.awsConfig - data_structure_details = fetch_dataframe( - config_source=S3Config(), - client=self.s3_client, - file_fqn=DatalakeTableSchemaWrapper( - key=sample_key, bucket_name=bucket_name - ), - connection_kwargs=connection_args, - ) - columns = [] - if isinstance(data_structure_details, DataFrame): - columns = DatalakeSource.get_columns(data_structure_details) - if isinstance(data_structure_details, list) and data_structure_details: - columns = DatalakeSource.get_columns(data_structure_details[0]) - return columns - def fetch_buckets(self) -> List[S3BucketResponse]: results: List[S3BucketResponse] = [] try: diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 2e971ee0f436..64e6f51c9fa7 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -12,7 +12,9 @@ Base class for ingesting Object Storage services """ from abc import ABC, abstractmethod -from typing import Any, Iterable +from typing import Any, Iterable, Optional, List + +from pandas import DataFrame from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.entity.data.container import Container @@ -24,10 +26,14 @@ StorageService, ) from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import ( - MetadataEntry, ManifestMetadataConfig + MetadataEntry, +) +from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import ( + ManifestMetadataConfig, ) from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( - StorageServiceMetadataPipeline, NoMetadataConfigurationSource, + StorageServiceMetadataPipeline, + NoMetadataConfigurationSource, ) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, @@ -123,7 +129,9 @@ def __init__( def get_manifest_file(self) -> Optional[ManifestMetadataConfig]: if self.source_config.storageMetadataConfigSource and not isinstance( - self.source_config.storageMetadataConfigSource, NoMetadataConfigurationSource): + self.source_config.storageMetadataConfigSource, + NoMetadataConfigurationSource, + ): return get_manifest(self.source_config.storageMetadataConfigSource) return None From d2605be80e6e80ad6fc665bc2d0b4c58c17c995f Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Sep 2023 18:53:18 +0100 Subject: [PATCH 05/11] black stuff --- .../metadata/ingestion/source/storage/storage_service.py | 4 ++-- ingestion/src/metadata/utils/storage_metadata_config.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 64e6f51c9fa7..47cc1840cded 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -12,7 +12,7 @@ Base class for ingesting Object Storage services """ from abc import ABC, abstractmethod -from typing import Any, Iterable, Optional, List +from typing import Any, Iterable, List, Optional from pandas import DataFrame @@ -32,8 +32,8 @@ ManifestMetadataConfig, ) from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( - StorageServiceMetadataPipeline, NoMetadataConfigurationSource, + StorageServiceMetadataPipeline, ) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, diff --git a/ingestion/src/metadata/utils/storage_metadata_config.py b/ingestion/src/metadata/utils/storage_metadata_config.py index f1f596c56dfe..82e8f9b287f1 100644 --- a/ingestion/src/metadata/utils/storage_metadata_config.py +++ b/ingestion/src/metadata/utils/storage_metadata_config.py @@ -16,16 +16,16 @@ from functools import singledispatch import requests + from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import ( ManifestMetadataConfig, ) - -from metadata.generated.schema.metadataIngestion.storage.storageMetadataLocalConfig import ( - StorageMetadataLocalConfig, -) from metadata.generated.schema.metadataIngestion.storage.storageMetadataHttpConfig import ( StorageMetadataHttpConfig, ) +from metadata.generated.schema.metadataIngestion.storage.storageMetadataLocalConfig import ( + StorageMetadataLocalConfig, +) from metadata.generated.schema.metadataIngestion.storage.storageMetadataS3Config import ( StorageMetadataS3Config, ) From dff33a6b0bb7085c7e95f738bdac3899898b91ff Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Sep 2023 21:24:18 +0100 Subject: [PATCH 06/11] test fixes --- .../unit/topology/storage/test_storage.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index c3df2a7b121c..35a7e7ae9598 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -62,6 +62,15 @@ "config": { "type": "StorageMetadata", "containerFilterPattern": {"includes": ["^test_*"]}, + "storageMetadataConfigSource": { + "securityConfig": { + "awsRegion": "us-east-1" + }, + "prefixConfig": { + "bucketName": "test_bucket", + "objectPrefix": "manifest", + } + } } }, }, @@ -166,6 +175,15 @@ def test_create_from_invalid_source(self): "sourceConfig": { "config": { "type": "StorageMetadata", + "storageMetadataConfigSource": { + "securityConfig": { + "awsRegion": "us-east-1" + }, + "prefixConfig": { + "bucketName": "test_bucket", + "objectPrefix": "manifest", + } + } } }, } From eccf26c60fdefe2f57480da15cee51c9db315623 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Sep 2023 21:27:24 +0100 Subject: [PATCH 07/11] formatting --- .../tests/unit/topology/storage/test_storage.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index 35a7e7ae9598..ae84c9d7d12b 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -63,14 +63,12 @@ "type": "StorageMetadata", "containerFilterPattern": {"includes": ["^test_*"]}, "storageMetadataConfigSource": { - "securityConfig": { - "awsRegion": "us-east-1" - }, + "securityConfig": {"awsRegion": "us-east-1"}, "prefixConfig": { "bucketName": "test_bucket", "objectPrefix": "manifest", - } - } + }, + }, } }, }, @@ -176,14 +174,12 @@ def test_create_from_invalid_source(self): "config": { "type": "StorageMetadata", "storageMetadataConfigSource": { - "securityConfig": { - "awsRegion": "us-east-1" - }, + "securityConfig": {"awsRegion": "us-east-1"}, "prefixConfig": { "bucketName": "test_bucket", "objectPrefix": "manifest", - } - } + }, + }, } }, } From a7955c9143e34365ba846ee1931329fc5dc62dbb Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Dec 2023 23:28:32 +0200 Subject: [PATCH 08/11] storage service container automatic deletion support --- .../ingestion/source/storage/s3/metadata.py | 11 +++--- .../source/storage/storage_service.py | 36 ++++++++++++++++++- ingestion/src/metadata/utils/fqn.py | 18 ++++++++++ .../storageServiceMetadataPipeline.json | 6 ++++ 4 files changed, 66 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 44fb726ebab9..c2a805805213 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -177,8 +177,7 @@ def get_containers(self) -> Iterable[S3ContainerDetails]: def yield_create_container_requests( self, container_details: S3ContainerDetails ) -> Iterable[Either[CreateContainerRequest]]: - yield Either( - right=CreateContainerRequest( + container_request = CreateContainerRequest( name=container_details.name, prefix=container_details.prefix, numberOfObjects=container_details.number_of_objects, @@ -188,8 +187,9 @@ def yield_create_container_requests( parent=container_details.parent, sourceUrl=container_details.sourceUrl, fileFormats=container_details.file_formats, - ) ) + yield Either(right=container_request) + self.register_record(container_request=container_request) def _generate_container_details( self, @@ -427,7 +427,7 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf """ try: logger.info( - f"Found metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" + f"Looking for metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" ) response_object = self.s3_reader.read( path=OPENMETADATA_TEMPLATE_FILE_NAME, @@ -438,6 +438,9 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf metadata_config = StorageContainerConfig.parse_obj(content) return metadata_config except ReadException: + logger.warning( + f"No metadata file found at s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" + ) pass except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index f960907b1942..cf455d99733c 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -12,8 +12,9 @@ Base class for ingesting Object Storage services """ from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Optional +from typing import Any, Iterable, List, Optional, Set +from metadata.utils import fqn from pandas import DataFrame from metadata.generated.schema.api.data.createContainer import CreateContainerRequest @@ -35,9 +36,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.ingestion.api.delete import delete_entity_from_source from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, @@ -78,6 +81,7 @@ class StorageServiceTopology(ServiceTopology): ), ], children=["container"], + post_process=["mark_containers_as_deleted"], ) container = TopologyNode( @@ -109,6 +113,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): topology = StorageServiceTopology() context = create_source_context(topology) + container_source_state: Set = set() global_manifest: Optional[ManifestMetadataConfig] @@ -166,10 +171,39 @@ def get_services(self) -> Iterable[WorkflowSource]: def prepare(self): """By default, nothing needs to be taken care of when loading the source""" + + def register_record(self, container_request: CreateContainerRequest) -> None: + """ + Mark the container record as scanned and update + the storage_source_state + """ + parent_container = self.metadata.get_by_id( + entity=Container, entity_id=container_request.parent.id + ).fullyQualifiedName.__root__ if container_request.parent else None + container_fqn = fqn.build( + self.metadata, + entity_type=Container, + service_name=self.context.objectstore_service, + parent_container=parent_container, + container_name=container_request.name.__root__, + ) + + self.container_source_state.add(container_fqn) def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + + def mark_containers_as_deleted(self) -> Iterable[Either[DeleteEntity]]: + """Method to mark the containers as deleted""" + if self.source_config.markDeletedContainers: + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=Container, + entity_source_state=self.container_source_state, + mark_deleted_entity=self.source_config.markDeletedContainers, + params={"service": self.context.objectstore_service}, + ) def yield_create_request_objectstore_service(self, config: WorkflowSource): yield Either( diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 07a45095cf73..255a317c43ec 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -21,6 +21,7 @@ from antlr4.error.ErrorStrategy import BailErrorStrategy from antlr4.InputStream import InputStream from antlr4.tree.Tree import ParseTreeWalker +from metadata.generated.schema.entity.data.container import Container from pydantic import BaseModel, Field from metadata.antlr.split_listener import FqnSplitListener @@ -284,6 +285,23 @@ def _( return _build(service_name, topic_name) +@fqn_build_registry.add(Container) +def _( + _: Optional[OpenMetadata], # ES Index not necessary for Container FQN building + *, + service_name: str, + parent_container: str, + container_name: str, +) -> str: + if not service_name or not container_name: + raise FQNBuildingException( + f"Args should be informed, but got service=`{service_name}`, container=`{container_name}``" + ) + return _build(parent_container, container_name, quote=False) if parent_container else ( + _build(service_name, container_name) + ) + + @fqn_build_registry.add(SearchIndex) def _( _: Optional[OpenMetadata], # ES Index not necessary for Search Index FQN building diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json index 7c761af4e90a..cd5d5a3ffa53 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storageServiceMetadataPipeline.json @@ -48,6 +48,12 @@ "$ref": "./storage/storageMetadataADLSConfig.json" } ] + }, + "markDeletedContainers": { + "description": "Optional configuration to soft delete containers in OpenMetadata if the source containers are deleted. Also, if the topic is deleted, all the associated entities with that containers will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Containers" } }, "additionalProperties": false From a7a25f97afbba4452838dbb979f4561be336a3eb Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Mon, 25 Dec 2023 23:39:29 +0200 Subject: [PATCH 09/11] reverted auto-modified files. --- docker/validate_compose.py | 6 ++-- scripts/update_version.py | 57 ++++++++++++++------------------------ 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/docker/validate_compose.py b/docker/validate_compose.py index e86b756c3adf..1f135eab825b 100644 --- a/docker/validate_compose.py +++ b/docker/validate_compose.py @@ -25,9 +25,7 @@ def get_last_run_info() -> Tuple[str, str]: log_ansi_encoded_string(message="Waiting for DAG Run data...") time.sleep(5) runs = requests.get( - "http://localhost:8080/api/v1/dags/sample_data/dagRuns", - auth=BASIC_AUTH, - timeout=REQUESTS_TIMEOUT, + "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT ).json() dag_runs = runs.get("dag_runs") @@ -41,7 +39,7 @@ def print_last_run_logs() -> None: logs = requests.get( "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data", auth=BASIC_AUTH, - timeout=REQUESTS_TIMEOUT, + timeout=REQUESTS_TIMEOUT ).text pprint(logs) diff --git a/scripts/update_version.py b/scripts/update_version.py index e93ffdbd603f..c8bf177bebe4 100644 --- a/scripts/update_version.py +++ b/scripts/update_version.py @@ -11,104 +11,90 @@ # Function to update the Github workflow with search pattern as "input=" or "DOCKER_RELEASE_TAG=" def update_github_action(file_path, release_version): - logger.info( - f"Updating Github workflow's Docker version in {file_path} to version {release_version}\n" - ) + logger.info(f"Updating Github workflow's Docker version in {file_path} to version {release_version}\n") try: - with open(file_path, "r") as file: + with open(file_path, 'r') as file: content = file.read() # Update the input pattern - input_pattern = r"input=\d+(\.\d+)*(\.\d+)?" - input_replacement = f"input={release_version}" + input_pattern = r'input=\d+(\.\d+)*(\.\d+)?' + input_replacement = f'input={release_version}' updated_content = re.sub(input_pattern, input_replacement, content) # Update the DOCKER_RELEASE_TAG pattern - docker_release_tag_pattern = r"DOCKER_RELEASE_TAG=\d+(\.\d+)*(\.\d+)?" - docker_release_tag_replacement = f"DOCKER_RELEASE_TAG={release_version}" - updated_content = re.sub( - docker_release_tag_pattern, docker_release_tag_replacement, updated_content - ) + docker_release_tag_pattern = r'DOCKER_RELEASE_TAG=\d+(\.\d+)*(\.\d+)?' + docker_release_tag_replacement = f'DOCKER_RELEASE_TAG={release_version}' + updated_content = re.sub(docker_release_tag_pattern, docker_release_tag_replacement, updated_content) - with open(file_path, "w") as file: + with open(file_path, 'w') as file: file.write(updated_content) logger.info(f"Patterns updated to {release_version} in {file_path}") except Exception as e: logger.error(f"An error occurred: {e}") - # Function to update the Python files in ingestion with search pattern as "version=" def update_python_files(file_path, release_version): # Logic for updating Python files logger.info(f"Updating version numbers in {file_path} to {release_version}\n") try: - with open(file_path, "r") as file: + with open(file_path, 'r') as file: content = file.read() pattern = r'version\s*=\s*"([^"]+)"' updated_content = re.sub(pattern, f'version="{release_version}"', content) - with open(file_path, "w") as file: + with open(file_path, 'w') as file: file.write(updated_content) logger.info(f"Version numbers updated to {release_version} in {file_path}") except Exception as e: logger.error(f"An error occurred: {e}") - # Function to update the image version in Docker compose files with search pattern where image, docker, getcollate, and openmetadata are used. def update_dockerfile_version(file_path, release_version): # Logic for updating Docker compose version try: - with open(file_path, "r") as file: + with open(file_path, 'r') as file: content = file.read() # Update image versions using regular expression updated_content = re.sub( - r"(image: docker\.getcollate\.io/openmetadata/.*?):[\d.]+", - rf"\1:{release_version}", - content, + r'(image: docker\.getcollate\.io/openmetadata/.*?):[\d.]+', + rf'\1:{release_version}', + content ) - with open(file_path, "w") as file: + with open(file_path, 'w') as file: file.write(updated_content) logger.info(f"Updated image versions in {file_path}") except Exception as e: logger.error(f"An error occurred while updating {file_path}: {e}") - # Function to update the DOCKERFILE used to create the images, search pattern used as "RI_VERSION" def update_ingestion_version(file_path, release_version): - logger.info( - f"Updating ingestion version in {file_path} to version {release_version}\n" - ) + logger.info(f"Updating ingestion version in {file_path} to version {release_version}\n") try: - with open(file_path, "r") as file: + with open(file_path, 'r') as file: content = file.read() pattern = r'RI_VERSION="[\d\.]+"' replacement = f'RI_VERSION="{release_version}"' updated_content = re.sub(pattern, replacement, content) - with open(file_path, "w") as file: + with open(file_path, 'w') as file: file.write(updated_content) logger.info(f"RI_VERSION updated to {release_version} in {file_path}") except Exception as e: logger.error(f"An error occurred: {e}") - def main(): parser = argparse.ArgumentParser(description="Update version information in files.") - parser.add_argument( - "action_type", type=int, choices=range(1, 5), help="Type of action to perform" - ) + parser.add_argument("action_type", type=int, choices=range(1, 5), help="Type of action to perform") parser.add_argument("file_path", type=str, help="Path to the file to update") - parser.add_argument( - "-s", dest="release_version", required=True, help="Release version to set" - ) + parser.add_argument("-s", dest="release_version", required=True, help="Release version to set") args = parser.parse_args() @@ -128,6 +114,5 @@ def main(): logger.error("Invalid action type") sys.exit(1) - if __name__ == "__main__": - main() + main() \ No newline at end of file From 10aa96c7c83e8b53d67a750e55b97d95e1af13a3 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Sat, 30 Dec 2023 21:26:01 +0200 Subject: [PATCH 10/11] styling --- .../ingestion/source/storage/s3/metadata.py | 18 +++++++++--------- .../source/storage/storage_service.py | 16 ++++++++++------ ingestion/src/metadata/utils/fqn.py | 8 +++++--- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index c2a805805213..93713abe23a8 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -178,15 +178,15 @@ def yield_create_container_requests( self, container_details: S3ContainerDetails ) -> Iterable[Either[CreateContainerRequest]]: container_request = CreateContainerRequest( - name=container_details.name, - prefix=container_details.prefix, - numberOfObjects=container_details.number_of_objects, - size=container_details.size, - dataModel=container_details.data_model, - service=self.context.objectstore_service, - parent=container_details.parent, - sourceUrl=container_details.sourceUrl, - fileFormats=container_details.file_formats, + name=container_details.name, + prefix=container_details.prefix, + numberOfObjects=container_details.number_of_objects, + size=container_details.size, + dataModel=container_details.data_model, + service=self.context.objectstore_service, + parent=container_details.parent, + sourceUrl=container_details.sourceUrl, + fileFormats=container_details.file_formats, ) yield Either(right=container_request) self.register_record(container_request=container_request) diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index cf455d99733c..b76a8da33986 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from typing import Any, Iterable, List, Optional, Set -from metadata.utils import fqn from pandas import DataFrame from metadata.generated.schema.api.data.createContainer import CreateContainerRequest @@ -53,6 +52,7 @@ from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.readers.models import ConfigSource +from metadata.utils import fqn from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_columns from metadata.utils.logger import ingestion_logger from metadata.utils.storage_metadata_config import ( @@ -171,15 +171,19 @@ def get_services(self) -> Iterable[WorkflowSource]: def prepare(self): """By default, nothing needs to be taken care of when loading the source""" - + def register_record(self, container_request: CreateContainerRequest) -> None: """ Mark the container record as scanned and update the storage_source_state """ - parent_container = self.metadata.get_by_id( - entity=Container, entity_id=container_request.parent.id - ).fullyQualifiedName.__root__ if container_request.parent else None + parent_container = ( + self.metadata.get_by_id( + entity=Container, entity_id=container_request.parent.id + ).fullyQualifiedName.__root__ + if container_request.parent + else None + ) container_fqn = fqn.build( self.metadata, entity_type=Container, @@ -193,7 +197,7 @@ def register_record(self, container_request: CreateContainerRequest) -> None: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) test_connection_fn(self.metadata, self.connection_obj, self.service_connection) - + def mark_containers_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the containers as deleted""" if self.source_config.markDeletedContainers: diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 255a317c43ec..0140cd2a4d19 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -21,7 +21,6 @@ from antlr4.error.ErrorStrategy import BailErrorStrategy from antlr4.InputStream import InputStream from antlr4.tree.Tree import ParseTreeWalker -from metadata.generated.schema.entity.data.container import Container from pydantic import BaseModel, Field from metadata.antlr.split_listener import FqnSplitListener @@ -29,6 +28,7 @@ from metadata.generated.antlr.FqnParser import FqnParser from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel from metadata.generated.schema.entity.data.database import Database @@ -297,8 +297,10 @@ def _( raise FQNBuildingException( f"Args should be informed, but got service=`{service_name}`, container=`{container_name}``" ) - return _build(parent_container, container_name, quote=False) if parent_container else ( - _build(service_name, container_name) + return ( + _build(parent_container, container_name, quote=False) + if parent_container + else (_build(service_name, container_name)) ) From c6084570bd6ca901812ea4b004a4c1780e153d95 Mon Sep 17 00:00:00 2001 From: Cristian Calugaru Date: Sat, 30 Dec 2023 22:16:56 +0200 Subject: [PATCH 11/11] linting --- ingestion/src/metadata/ingestion/source/storage/s3/metadata.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 93713abe23a8..863fb26c9fc3 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -441,7 +441,6 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf logger.warning( f"No metadata file found at s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}" ) - pass except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(