Skip to content

Commit

Permalink
improve api of RequestMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
dittmar committed Feb 28, 2024
1 parent 25de911 commit b16d6d5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
5 changes: 2 additions & 3 deletions src/cadenzaanalytics/cadenza_analytics_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ def _get_request_data(self, multipart_request) -> AnalyticsRequest:

if metadata.has_columns():
type_mapping = {}
for columns in metadata.get_columns_by_attribute_group().values():
for column in columns:
type_mapping[column.name] = column.data_type.pandas_type()
for column in metadata.get_all_columns():
type_mapping[column.name] = column.data_type.pandas_type()
csv_data = StringIO(multipart_request.form['data'])
df_data = pd.read_csv(csv_data, sep=";", dtype=type_mapping)
else:
Expand Down
31 changes: 21 additions & 10 deletions src/cadenzaanalytics/request/request_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,35 @@ class RequestMetadata:
def __init__(self, request_metadata: dict):
self._request_metadata = request_metadata

def get_column_metadata(self, name: str) -> ColumnMetadata | None:
def get_column(self, name: str) -> ColumnMetadata | None:
if self.has_columns():
for column in self._request_metadata['dataContainers'][0]['columns']:
for column in self._get_columns():
if column["name"] == name:
return ColumnMetadata._from_dict(column)

return None

def get_columns_by_attribute_group(self) -> dict[str, list[ColumnMetadata]] | None:
def get_column_by_attribute_group(self, attribute_group) -> ColumnMetadata | None:
if self.has_columns():
grouped_columns = {}
for column in self._get_columns():
if column['attributeGroupName'] == attribute_group:
return ColumnMetadata._from_dict(column)
return None

for column in self._request_metadata['dataContainers'][0]['columns']:
if column['attributeGroupName'] not in grouped_columns:
grouped_columns[column['attributeGroupName']] = []
def get_all_columns_by_attribute_groups(self) -> dict[str, list[ColumnMetadata]]:
grouped_columns = {}
columns = self._get_columns() if self.has_columns() else []
for column in columns:
if column['attributeGroupName'] not in grouped_columns:
grouped_columns[column['attributeGroupName']] = []

grouped_columns[column['attributeGroupName']].append(ColumnMetadata._from_dict(column))
grouped_columns[column['attributeGroupName']].append(ColumnMetadata._from_dict(column))

return grouped_columns
return grouped_columns

return None
def get_all_columns(self) -> list[ColumnMetadata]:
columns = self._get_columns() if self.has_columns() else []
return [ColumnMetadata._from_dict(column) for column in columns]

def get_parameters(self) -> dict[str, str]:
return self._request_metadata['parameters']
Expand All @@ -39,3 +47,6 @@ def get_parameter(self, name: str) -> str:
def has_columns(self) -> bool:
return (len(self._request_metadata['dataContainers']) > 0
and "columns" in self._request_metadata['dataContainers'][0])

def _get_columns(self):
return self._request_metadata['dataContainers'][0]['columns']

0 comments on commit b16d6d5

Please sign in to comment.