Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #13699 - Add separator for Storage Container manifest #13924

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _manifest_entries_to_metadata_entries_by_container(
structureFormat=entry.structureFormat,
isPartitioned=entry.isPartitioned,
partitionColumns=entry.partitionColumns,
separator=entry.separator,
)
for entry in manifest.entries
if entry.containerName == container_name
Expand Down Expand Up @@ -222,6 +223,7 @@ def extract_column_definitions(
key=sample_key,
bucket_name=bucket_name,
file_extension=SupportedTypes(metadata_entry.structureFormat),
separator=metadata_entry.separator,
),
)
columns = []
Expand Down
8 changes: 6 additions & 2 deletions ingestion/src/metadata/readers/dataframe/dsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,9 @@ def _read(self, *, key: str, bucket_name: str, **__) -> DatalakeColumnWrapper:
)


CSVDataFrameReader = functools.partial(DSVDataFrameReader, separator=CSV_SEPARATOR)
TSVDataFrameReader = functools.partial(DSVDataFrameReader, separator=TSV_SEPARATOR)
def get_dsv_reader_by_separator(separator: str) -> functools.partial:
return functools.partial(DSVDataFrameReader, separator=separator)


CSVDataFrameReader = get_dsv_reader_by_separator(separator=CSV_SEPARATOR)
TSVDataFrameReader = get_dsv_reader_by_separator(separator=TSV_SEPARATOR)
5 changes: 4 additions & 1 deletion ingestion/src/metadata/readers/dataframe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""
from typing import Any, List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field

from metadata.generated.schema.entity.data.table import Column

Expand All @@ -39,6 +39,9 @@ class DatalakeTableSchemaWrapper(BaseModel):
key: str
bucket_name: str
file_extension: Optional[Any]
separator: Optional[str] = Field(
None, description="Used for DSV readers to identify the separator"
)


class DatalakeTableMetadata(BaseModel):
Expand Down
17 changes: 15 additions & 2 deletions ingestion/src/metadata/readers/dataframe/reader_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

from metadata.readers.dataframe.avro import AvroDataFrameReader
from metadata.readers.dataframe.base import DataFrameReader
from metadata.readers.dataframe.dsv import CSVDataFrameReader, TSVDataFrameReader
from metadata.readers.dataframe.dsv import (
CSVDataFrameReader,
TSVDataFrameReader,
get_dsv_reader_by_separator,
)
from metadata.readers.dataframe.json import JSONDataFrameReader
from metadata.readers.dataframe.parquet import ParquetDataFrameReader
from metadata.readers.models import ConfigSource
Expand Down Expand Up @@ -52,11 +56,20 @@ class SupportedTypes(Enum):


def get_df_reader(
type_: SupportedTypes, config_source: ConfigSource, client: Optional[Any]
type_: SupportedTypes,
config_source: ConfigSource,
client: Optional[Any],
separator: Optional[str] = None,
) -> DataFrameReader:
"""
Load the File Reader based on the Config Source
"""
# If we have a DSV file, build a reader dynamically based on the received separator
if type_ in {SupportedTypes.CSV, SupportedTypes.TSV} and separator:
return get_dsv_reader_by_separator(separator=separator)(
config_source=config_source, client=client
)

if type_.value in DF_READER_MAP:
return DF_READER_MAP[type_.value](config_source=config_source, client=client)

Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/utils/datalake/datalake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def fetch_dataframe(
type_=file_extension,
config_source=config_source,
client=client,
separator=file_fqn.separator,
)
try:
df_wrapper: DatalakeColumnWrapper = df_reader.read(
Expand Down
19 changes: 19 additions & 0 deletions ingestion/tests/unit/readers/test_df_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ def test_dsv_reader(self):
list(df_list[0].columns), ["transaction_id", "transaction_value"]
)

def test_dsv_reader_with_separator(self):
key = ROOT_PATH / "transactions_separator.csv"

df_list = fetch_dataframe(
config_source=LocalConfig(),
client=None,
file_fqn=DatalakeTableSchemaWrapper(
key=str(key), bucket_name="unused", separator=";"
),
)

self.assertIsNotNone(df_list)
self.assertTrue(len(df_list))

self.assertEquals(df_list[0].shape, (5, 2))
self.assertEquals(
list(df_list[0].columns), ["transaction_id", "transaction_value"]
)

def test_json_reader(self):
key = ROOT_PATH / "employees.json"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
transaction_id;transaction_value
1;100
2;200
3;300
4;400
5;500
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ need to bring information about:

- **dataPath**: Where we can find the data. This should be a path relative to the top-level container.
- **structureFormat**: What is the format of the data we are going to find. This information will be used to read the data.
- **separator**: Optionally, for delimiter-separated formats such as CSV, you can specify the separator to use when reading the file.
If you don't, we will use `,` for CSV and `/t` for TSV files.

After ingesting this container, we will bring in the schema of the data in the `dataPath`.

Expand Down Expand Up @@ -66,7 +68,8 @@ Again, this information will be added on top of the inferred schema from the dat
```json {% srNumber=2 %}
{
"dataPath": "transactions",
"structureFormat": "csv"
"structureFormat": "csv",
"separator": ","
},
```
```json {% srNumber=3 %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
"type": "string",
"default": null
},
"separator": {
"title": "Separator",
"description": "For delimited files such as CSV, what is the separator being used?",
"type": "string",
"default": null
},
"isPartitioned": {
"title": "Is Partitioned",
"description": "Flag indicating whether the container's data is partitioned",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
"type": "string",
"default": null
},
"separator": {
"title": "Separator",
"description": "For delimited files such as CSV, what is the separator being used?",
"type": "string",
"default": null
},
"isPartitioned": {
"title": "Is Partitioned",
"description": "Flag indicating whether the container's data is partitioned",
Expand Down
Loading