Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
djobyte committed Dec 11, 2023
1 parent bf2a032 commit 123c83b
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions priceloop_api/priceloop/extras/table_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class TableUtils:
"""

@staticmethod
@retry(stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(1), retry=retry_if_exception_type(PlatformException))
@retry(
stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(1), retry=retry_if_exception_type(PlatformException)
)
def is_table_on_platform(
workspace_name: str,
table_name: str,
Expand Down Expand Up @@ -319,7 +321,9 @@ def read_table(
columns=columns,
)

@retry(stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException))
@retry(
stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException)
)
@staticmethod
def truncate(
workspace_name: str,
Expand Down Expand Up @@ -392,7 +396,9 @@ def get_data(
return table_data.rows

@staticmethod
@retry(stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException))
@retry(
stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException)
)
def get_metadata(
workspace_name: str,
table_name: str,
Expand Down Expand Up @@ -427,7 +433,9 @@ def get_metadata(
raise PlatformException

@staticmethod
@retry(stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(3), retry=retry_if_exception_type(PlatformException))
@retry(
stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(3), retry=retry_if_exception_type(PlatformException)
)
def get_slice_of_data(
workspace_name: str,
table_name: str,
Expand Down Expand Up @@ -506,7 +514,7 @@ def get_rows(
offset,
list_of_columns,
client,
)
)
return table_data

@staticmethod
Expand Down Expand Up @@ -613,7 +621,9 @@ def column_to_values(df: pd.DataFrame, column_name: str) -> List[Any]:
else:
return [str(x) if x is not None else None for x in df[column_name].values.tolist()]

@retry(stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException))
@retry(
stop=stop_after_attempt(NUM_OF_RETRIES), wait=wait_fixed(5), retry=retry_if_exception_type(PlatformException)
)
@staticmethod
def import_data(
workspace_name: str,
Expand Down Expand Up @@ -706,21 +716,21 @@ def patch_table(
- PatchMode.InsertOnConflictUpdate: Insert data into the table with an update conflict resolution strategy.
- PatchMode.Update: Update existing rows in the table based on specified columns.
"""
json_body: List[Append | Delete | Insert| Update] = []
match mode:
case PatchMode.Append:
json_body.append(TableUtils.get_patch_append(data_columns, table_df))
case PatchMode.Delete:
json_body.append(TableUtils.get_patch_delete(match_on_columns, "delete_matching", table_df))
case PatchMode.InsertOnConflictFail:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "fail"))
case PatchMode.InsertOnConflictIgnore:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "ignore"))
case PatchMode.InsertOnConflictUpdate:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "update"))
case PatchMode.Update:
json_body.append(TableUtils.get_patch_update(match_on_columns, data_columns, table_df))
json_body: List[Append | Delete | Insert | Update] = []
if mode == PatchMode.Append:
json_body.append(TableUtils.get_patch_append(data_columns, table_df))

elif mode == PatchMode.Delete:
json_body.append(TableUtils.get_patch_delete(match_on_columns, "delete_matching", table_df))
elif mode == PatchMode.InsertOnConflictFail:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "fail"))
elif mode == PatchMode.InsertOnConflictIgnore:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "ignore"))
elif mode == PatchMode.InsertOnConflictUpdate:
json_body.append(TableUtils.get_patch_insert(match_on_columns, data_columns, table_df, "update"))
elif mode == PatchMode.Update:
json_body.append(TableUtils.get_patch_update(match_on_columns, data_columns, table_df))

try:
request = patch_table_data.sync_detailed(
workspace=workspace_name, table=table_name, client=client, json_body=json_body
Expand Down

0 comments on commit 123c83b

Please sign in to comment.