diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 9b4e624bf5de..64b28a91f6a1 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -190,6 +190,7 @@ def _manifest_entries_to_metadata_entries_by_container( structureFormat=entry.structureFormat, isPartitioned=entry.isPartitioned, partitionColumns=entry.partitionColumns, + separator=entry.separator, ) for entry in manifest.entries if entry.containerName == container_name @@ -222,6 +223,7 @@ def extract_column_definitions( key=sample_key, bucket_name=bucket_name, file_extension=SupportedTypes(metadata_entry.structureFormat), + separator=metadata_entry.separator, ), ) columns = [] diff --git a/ingestion/src/metadata/readers/dataframe/dsv.py b/ingestion/src/metadata/readers/dataframe/dsv.py index d030cce3ae1b..7d0236bf618c 100644 --- a/ingestion/src/metadata/readers/dataframe/dsv.py +++ b/ingestion/src/metadata/readers/dataframe/dsv.py @@ -114,5 +114,9 @@ def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper: ) -CSVDataFrameReader = functools.partial(DSVDataFrameReader, separator=CSV_SEPARATOR) -TSVDataFrameReader = functools.partial(DSVDataFrameReader, separator=TSV_SEPARATOR) +def get_dsv_reader_by_separator(separator: str) -> functools.partial: + return functools.partial(DSVDataFrameReader, separator=separator) + + +CSVDataFrameReader = get_dsv_reader_by_separator(separator=CSV_SEPARATOR) +TSVDataFrameReader = get_dsv_reader_by_separator(separator=TSV_SEPARATOR) diff --git a/ingestion/src/metadata/readers/dataframe/models.py b/ingestion/src/metadata/readers/dataframe/models.py index 8b908a01b509..765e6c1ae783 100644 --- a/ingestion/src/metadata/readers/dataframe/models.py +++ b/ingestion/src/metadata/readers/dataframe/models.py @@ -14,7 +14,7 @@ """ from typing import Any, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field from metadata.generated.schema.entity.data.table import Column @@ -39,6 +39,9 @@ class DatalakeTableSchemaWrapper(BaseModel): key: str bucket_name: str file_extension: Optional[Any] + separator: Optional[str] = Field( + None, description="Used for DSV readers to identify the separator" + ) class DatalakeTableMetadata(BaseModel): diff --git a/ingestion/src/metadata/readers/dataframe/reader_factory.py b/ingestion/src/metadata/readers/dataframe/reader_factory.py index b8c3f03190f2..7e698699364f 100644 --- a/ingestion/src/metadata/readers/dataframe/reader_factory.py +++ b/ingestion/src/metadata/readers/dataframe/reader_factory.py @@ -21,7 +21,11 @@ from metadata.readers.dataframe.avro import AvroDataFrameReader from metadata.readers.dataframe.base import DataFrameReader -from metadata.readers.dataframe.dsv import CSVDataFrameReader, TSVDataFrameReader +from metadata.readers.dataframe.dsv import ( + CSVDataFrameReader, + TSVDataFrameReader, + get_dsv_reader_by_separator, +) from metadata.readers.dataframe.json import JSONDataFrameReader from metadata.readers.dataframe.parquet import ParquetDataFrameReader from metadata.readers.models import ConfigSource @@ -52,11 +56,20 @@ class SupportedTypes(Enum): def get_df_reader( - type_: SupportedTypes, config_source: ConfigSource, client: Optional[Any] + type_: SupportedTypes, + config_source: ConfigSource, + client: Optional[Any], + separator: Optional[str] = None, ) -> DataFrameReader: """ Load the File Reader based on the Config Source """ + # If we have a DSV file, build a reader dynamically based on the received separator + if type_ in {SupportedTypes.CSV, SupportedTypes.TSV} and separator: + return get_dsv_reader_by_separator(separator=separator)( + config_source=config_source, client=client + ) + if type_.value in DF_READER_MAP: return DF_READER_MAP[type_.value](config_source=config_source, client=client) diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index c49193ce4e2c..a482f8bc0b9a 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -65,6 +65,7 @@ def fetch_dataframe( type_=file_extension, config_source=config_source, client=client, + separator=file_fqn.separator, ) try: df_wrapper: DatalakeColumnWrapper = df_reader.read( diff --git a/ingestion/tests/unit/readers/test_df_reader.py b/ingestion/tests/unit/readers/test_df_reader.py index 5ab68f75d9ce..f071ce0faf5e 100644 --- a/ingestion/tests/unit/readers/test_df_reader.py +++ b/ingestion/tests/unit/readers/test_df_reader.py @@ -67,6 +67,25 @@ def test_dsv_reader(self): list(df_list[0].columns), ["transaction_id", "transaction_value"] ) + def test_dsv_reader_with_separator(self): + key = ROOT_PATH / "transactions_separator.csv" + + df_list = fetch_dataframe( + config_source=LocalConfig(), + client=None, + file_fqn=DatalakeTableSchemaWrapper( + key=str(key), bucket_name="unused", separator=";" + ), + ) + + self.assertIsNotNone(df_list) + self.assertTrue(len(df_list)) + + self.assertEquals(df_list[0].shape, (5, 2)) + self.assertEquals( + list(df_list[0].columns), ["transaction_id", "transaction_value"] + ) + def test_json_reader(self): key = ROOT_PATH / "employees.json" diff --git a/ingestion/tests/unit/resources/datalake/transactions_separator.csv b/ingestion/tests/unit/resources/datalake/transactions_separator.csv new file mode 100644 index 000000000000..a99d886d6a0c --- /dev/null +++ b/ingestion/tests/unit/resources/datalake/transactions_separator.csv @@ -0,0 +1,6 @@ +transaction_id;transaction_value +1;100 +2;200 +3;300 +4;400 +5;500 diff --git a/openmetadata-docs/content/partials/v1.2/connectors/storage/manifest.md b/openmetadata-docs/content/partials/v1.2/connectors/storage/manifest.md index a4e86545b1ba..289d48a1aecc 100644 --- a/openmetadata-docs/content/partials/v1.2/connectors/storage/manifest.md +++ b/openmetadata-docs/content/partials/v1.2/connectors/storage/manifest.md @@ -21,6 +21,8 @@ need to bring information about: - **dataPath**: Where we can find the data. This should be a path relative to the top-level container. - **structureFormat**: What is the format of the data we are going to find. This information will be used to read the data. +- **separator**: Optionally, for delimiter-separated formats such as CSV, you can specify the separator to use when reading the file. + If you don't, we will use `,` for CSV and `/t` for TSV files. After ingesting this container, we will bring in the schema of the data in the `dataPath`. @@ -66,7 +68,8 @@ Again, this information will be added on top of the inferred schema from the dat ```json {% srNumber=2 %} { "dataPath": "transactions", - "structureFormat": "csv" + "structureFormat": "csv", + "separator": "," }, ``` ```json {% srNumber=3 %} diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json index 6a0d95234f2b..437454b80a09 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json @@ -22,6 +22,12 @@ "type": "string", "default": null }, + "separator": { + "title": "Separator", + "description": "For delimited files such as CSV, what is the separator being used?", + "type": "string", + "default": null + }, "isPartitioned": { "title": "Is Partitioned", "description": "Flag indicating whether the container's data is partitioned", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json index f87f82ed2480..cb781a653436 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/manifestMetadataConfig.json @@ -26,6 +26,12 @@ "type": "string", "default": null }, + "separator": { + "title": "Separator", + "description": "For delimited files such as CSV, what is the separator being used?", + "type": "string", + "default": null + }, "isPartitioned": { "title": "Is Partitioned", "description": "Flag indicating whether the container's data is partitioned",