Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 10587 - Storage Service doesn't handle deleted containers #14497

Merged
merged 23 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8219062
global manifest option for storage services
cristiancalugaru Jun 17, 2023
2915f0e
Merge branch 'main' into ISSUE-10587
cristiancalugaru Jun 17, 2023
7793a45
Merge branch 'main' into ISSUE-10587
cristiancalugaru Jun 19, 2023
7bbc570
added a no metadata config source option for global manifest s3 servi…
cristiancalugaru Jun 19, 2023
9b59d61
Merge branch 'main' into ISSUE-10587
cristiancalugaru Jun 19, 2023
5813c59
merged master, fixed conflicts
cristiancalugaru Sep 25, 2023
bc1e52a
merge fixes
cristiancalugaru Sep 25, 2023
d822a39
more merge fixes.
cristiancalugaru Sep 25, 2023
d2605be
black stuff
cristiancalugaru Sep 25, 2023
7a8e099
Merge branch 'main' into ISSUE-10587
cristiancalugaru Sep 25, 2023
6bc910c
Merge branch 'main' into ISSUE-10587
cristiancalugaru Sep 25, 2023
dff33a6
test fixes
cristiancalugaru Sep 25, 2023
eccf26c
formatting
cristiancalugaru Sep 25, 2023
42d56df
merged main
cristiancalugaru Dec 24, 2023
a7955c9
storage service container automatic deletion support
cristiancalugaru Dec 25, 2023
a7a25f9
reverted auto-modified files.
cristiancalugaru Dec 25, 2023
94cf32b
Merge branch 'main' into ISSUE-10587
cristiancalugaru Dec 25, 2023
10aa96c
styling
cristiancalugaru Dec 30, 2023
7e74940
Merge branch 'main' into ISSUE-10587
cristiancalugaru Dec 30, 2023
3f404ee
Merge branch 'ISSUE-10587' of github.com:cristiancalugaru/OpenMetadat…
cristiancalugaru Dec 30, 2023
e449153
Merge branch 'main' into ISSUE-10587
cristiancalugaru Dec 30, 2023
c608457
linting
cristiancalugaru Dec 30, 2023
5c50806
Merge branch 'ISSUE-10587' of github.com:cristiancalugaru/OpenMetadat…
cristiancalugaru Dec 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions ingestion/src/metadata/ingestion/source/storage/s3/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,19 @@ def get_containers(self) -> Iterable[S3ContainerDetails]:
def yield_create_container_requests(
self, container_details: S3ContainerDetails
) -> Iterable[Either[CreateContainerRequest]]:
yield Either(
right=CreateContainerRequest(
name=container_details.name,
prefix=container_details.prefix,
numberOfObjects=container_details.number_of_objects,
size=container_details.size,
dataModel=container_details.data_model,
service=self.context.objectstore_service,
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
)
container_request = CreateContainerRequest(
name=container_details.name,
prefix=container_details.prefix,
numberOfObjects=container_details.number_of_objects,
size=container_details.size,
dataModel=container_details.data_model,
service=self.context.objectstore_service,
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
)
yield Either(right=container_request)
self.register_record(container_request=container_request)

def _generate_container_details(
self,
Expand Down Expand Up @@ -427,7 +427,7 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf
"""
try:
logger.info(
f"Found metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
f"Looking for metadata template file at - s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
response_object = self.s3_reader.read(
path=OPENMETADATA_TEMPLATE_FILE_NAME,
Expand All @@ -438,7 +438,9 @@ def _load_metadata_file(self, bucket_name: str) -> Optional[StorageContainerConf
metadata_config = StorageContainerConfig.parse_obj(content)
return metadata_config
except ReadException:
pass
logger.warning(
f"No metadata file found at s3://{bucket_name}/{OPENMETADATA_TEMPLATE_FILE_NAME}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Base class for ingesting Object Storage services
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional
from typing import Any, Iterable, List, Optional, Set

from pandas import DataFrame

Expand All @@ -35,9 +35,11 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.delete import delete_entity_from_source
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
Expand All @@ -50,6 +52,7 @@
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.readers.dataframe.reader_factory import SupportedTypes
from metadata.readers.models import ConfigSource
from metadata.utils import fqn
from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_columns
from metadata.utils.logger import ingestion_logger
from metadata.utils.storage_metadata_config import (
Expand Down Expand Up @@ -78,6 +81,7 @@ class StorageServiceTopology(ServiceTopology):
),
],
children=["container"],
post_process=["mark_containers_as_deleted"],
)

container = TopologyNode(
Expand Down Expand Up @@ -109,6 +113,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):

topology = StorageServiceTopology()
context = create_source_context(topology)
container_source_state: Set = set()

global_manifest: Optional[ManifestMetadataConfig]

Expand Down Expand Up @@ -167,10 +172,43 @@ def get_services(self) -> Iterable[WorkflowSource]:
def prepare(self):
"""By default, nothing needs to be taken care of when loading the source"""

def register_record(self, container_request: CreateContainerRequest) -> None:
"""
Mark the container record as scanned and update
the storage_source_state
"""
parent_container = (
self.metadata.get_by_id(
entity=Container, entity_id=container_request.parent.id
).fullyQualifiedName.__root__
if container_request.parent
else None
)
container_fqn = fqn.build(
self.metadata,
entity_type=Container,
service_name=self.context.objectstore_service,
parent_container=parent_container,
container_name=container_request.name.__root__,
)

self.container_source_state.add(container_fqn)

def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)

def mark_containers_as_deleted(self) -> Iterable[Either[DeleteEntity]]:
"""Method to mark the containers as deleted"""
if self.source_config.markDeletedContainers:
yield from delete_entity_from_source(
metadata=self.metadata,
entity_type=Container,
entity_source_state=self.container_source_state,
mark_deleted_entity=self.source_config.markDeletedContainers,
params={"service": self.context.objectstore_service},
)

def yield_create_request_objectstore_service(self, config: WorkflowSource):
yield Either(
right=self.metadata.get_create_service_from_source(
Expand Down
20 changes: 20 additions & 0 deletions ingestion/src/metadata/utils/fqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from metadata.generated.antlr.FqnParser import FqnParser
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
from metadata.generated.schema.entity.data.database import Database
Expand Down Expand Up @@ -284,6 +285,25 @@ def _(
return _build(service_name, topic_name)


@fqn_build_registry.add(Container)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for Container FQN building
*,
service_name: str,
parent_container: str,
container_name: str,
) -> str:
if not service_name or not container_name:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, container=`{container_name}``"
)
return (
_build(parent_container, container_name, quote=False)
if parent_container
else (_build(service_name, container_name))
)


@fqn_build_registry.add(SearchIndex)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for Search Index FQN building
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
"$ref": "./storage/storageMetadataADLSConfig.json"
}
]
},
"markDeletedContainers": {
"description": "Optional configuration to soft delete containers in OpenMetadata if the source containers are deleted. Also, if the topic is deleted, all the associated entities with that containers will be deleted",
"type": "boolean",
"default": true,
"title": "Mark Deleted Containers"
}
},
"additionalProperties": false
Expand Down
Loading