Skip to content

Commit

Permalink
Use use_blob in write_async functions
Browse files Browse the repository at this point in the history
Fixes #848 and #774
  • Loading branch information
MariusWirtz committed Mar 28, 2023
1 parent ea39c39 commit b11d817
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions TM1py/Services/CellService.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,10 @@ def write_dataframe(self, cube_name: str, data: 'pd.DataFrame', dimensions: Iter
:param reactivate_transaction_log:
:param sandbox_name:
:param use_ti:
:param use_blob:
:param use_blob: Uses blob to write. Requires admin permissions. 10x faster compared to use_ti
:param use_changeset: Enable ChangesetID: True or False
:param precision: max precision when writhing through unbound process.
Necessary when dealing with large numbers to avoid "number too long" TI syntax error.
Necessary when dealing with large numbers to avoid "number too long" TI syntax error
:param skip_non_updateable skip cells that are not updateable (e.g. rule derived or consolidated)
:param measure_dimension_elements: dictionary of measure elements and their types to improve
performance when `use_ti` is `True`.
Expand Down Expand Up @@ -677,7 +677,7 @@ def write_dataframe(self, cube_name: str, data: 'pd.DataFrame', dimensions: Iter
**kwargs)

@manage_transaction_log
def write_async(self, cube_name: str, cells: Dict, slice_size: int, max_workers: int,
def write_async(self, cube_name: str, cells: Dict, slice_size: int = 250_000, max_workers: int = 8,
dimensions: Iterable[str] = None, increment: bool = False,
deactivate_transaction_log: bool = False, reactivate_transaction_log: bool = False,
sandbox_name: str = None, precision: int = None, measure_dimension_elements: Dict = None,
Expand Down Expand Up @@ -714,7 +714,7 @@ def _chunks(data: Dict):

def _write(chunk: Dict):
return self.write(cube_name=cube_name, cellset_as_dict=chunk, dimensions=dimensions, increment=increment,
use_ti=True, sandbox_name=sandbox_name, precision=precision,
use_blob=True, sandbox_name=sandbox_name, precision=precision,
measure_dimension_elements=measure_dimension_elements, **kwargs)

async def _write_async(data: Dict):
Expand Down Expand Up @@ -745,8 +745,8 @@ async def _write_async(data: Dict):

@require_pandas
@manage_transaction_log
def write_dataframe_async(self, cube_name: str, data: 'pd.DataFrame', slice_size_of_dataframe: int,
max_workers: int, dimensions: Iterable[str] = None, increment: bool = False,
def write_dataframe_async(self, cube_name: str, data: 'pd.DataFrame', slice_size_of_dataframe: int = 250_000,
max_workers: int = 8, dimensions: Iterable[str] = None, increment: bool = False,
sandbox_name: str = None, deactivate_transaction_log: bool = False,
reactivate_transaction_log: bool = False, **kwargs):
""" Write DataFrame into a cube using unbound TI processes in a multi-threading way. Requires admin permissions.
Expand Down Expand Up @@ -778,7 +778,7 @@ def _chunks(df: 'pd.DataFrame'):

def _write(chunk: 'pd.DataFrame'):
return self.write_dataframe(cube_name=cube_name, data=chunk, dimensions=dimensions, increment=increment,
use_ti=True, sandbox_name=sandbox_name, **kwargs)
use_blob=True, sandbox_name=sandbox_name, **kwargs)

async def _write_async(df: 'pd.DataFrame'):
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -850,7 +850,7 @@ def write(self, cube_name: str, cellset_as_dict: Dict, dimensions: Iterable[str]
:param reactivate_transaction_log: reactivate after writing
:param sandbox_name: str
:param use_ti: Use unbound process to write. Requires admin permissions. causes massive performance improvement.
:param use_blob: Upload CSV to tm1 server and runs an unbound process to write. Requires admin permissions.
:param use_blob: Uses blob to write. Requires admin permissions. 10x faster compared to use_ti
It causes massive performance improvement.
:param use_changeset: Enable ChangesetID: True or False
:param precision: max precision when writhing through unbound process.
Expand Down Expand Up @@ -884,6 +884,7 @@ def write(self, cube_name: str, cellset_as_dict: Dict, dimensions: Iterable[str]
deactivate_transaction_log=deactivate_transaction_log,
reactivate_transaction_log=reactivate_transaction_log,
skip_non_updateable=skip_non_updateable,
dimensions=dimensions,
**kwargs)

return self.write_through_cellset(cube_name, cellset_as_dict, dimensions, increment, deactivate_transaction_log,
Expand Down Expand Up @@ -1016,6 +1017,7 @@ def write_through_unbound_process(self, cube_name: str, cellset_as_dict: Dict, i
@require_pandas
def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: bool = False,
sandbox_name: str = None, skip_non_updateable: bool = False,
folder_name: str = "tm1py", delete_blob=True, dimensions: str = None,
**kwargs):
"""
Writes data back to TM1 via an unbound TI process having an uploaded CSV as data source
Expand All @@ -1024,6 +1026,9 @@ def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: b
:param increment: increment or update cell values
:param sandbox_name: str
:param skip_non_updateable skip cells that are not updateable (e.g. rule derived or consolidated)
:param folder_name
:param delete_blob: choose False for debugging purposes
:param dimensions: optional. Dimension names in their natural order. Will speed up the execution!
:param kwargs:
:return: Success: bool, Messages: list, ChangeSet: None
"""
Expand All @@ -1033,9 +1038,8 @@ def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: b
cube_service = self.get_cube_service()

# Generate hash based on tm1-session-id and local-thread guarantee unique name
# ToDo: is this reliably unique?
unique_string = f"{self._rest.session_id}{threading.get_ident()}"
unique_hash = hashlib.sha256(unique_string.encode('utf-8')).hexdigest()[:8]
unique_hash = hashlib.sha256(unique_string.encode('utf-8')).hexdigest()[:12]

# Transform the cellset into a dictionary and export to CSV
csv_content = StringIO()
Expand All @@ -1053,7 +1057,7 @@ def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: b

# Upload CSV to TM1 server using ApplicationService:
# Create a folder TM1py
tm1py_folder = FolderApplication(path='', name='TM1py')
tm1py_folder = FolderApplication(path='', name=folder_name)
if not application_service.exists(path='', application_type='FOLDER', name=tm1py_folder.name):
application_service.create(tm1py_folder)

Expand All @@ -1073,12 +1077,16 @@ def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: b

# Create a TI process to load CSV data into the cube
process_name = f"}}tm1py.{unique_hash}"
process = self._create_blob_write_process(cube_name, process_name, blob_filename,
cube_service.get_dimension_names(cube_name), increment,
skip_non_updateable, sandbox_name)
process = self._create_blob_write_process(
cube_name=cube_name,
process_name=process_name,
blob_filename=blob_filename,
dimensions=dimensions or cube_service.get_dimension_names(cube_name),
increment=increment,
skip_non_updateable=skip_non_updateable,
sandbox_name=sandbox_name)

# Call the TI process with result
# process_service.create(process)
success, status, log_file = process_service.execute_process_with_return(process=process)
if not success:
if status in ['HasMinorErrors']:
Expand All @@ -1088,16 +1096,17 @@ def write_through_blob(self, cube_name: str, cellset_as_dict: dict, increment: b

# delete application in TM1
finally:
application_service.delete(
path='TM1py',
application_type='DOCUMENT',
application_name=filename,
private=True)

def _create_blob_write_process(self, cube_name, dataload_process_name, blob_filename, cube_dimensions, increment,
skip_non_updateable, sandbox_name):
if delete_blob:
application_service.delete(
path=folder_name,
application_type='DOCUMENT',
application_name=filename,
private=True)

def _create_blob_write_process(self, cube_name: str, process_name: str, blob_filename: str, dimensions: List[str],
increment: bool, skip_non_updateable: bool, sandbox_name: str):
dataload_process = Process(
name=dataload_process_name,
name=process_name,
datasource_type='ASCII',
datasource_ascii_header_records=0,
datasource_data_source_name_for_server='.\\}Externals\\' + blob_filename,
Expand All @@ -1106,14 +1115,13 @@ def _create_blob_write_process(self, cube_name, dataload_process_name, blob_file
datasource_ascii_decimal_separator='.',
datasource_ascii_thousand_separator='',
datasource_ascii_quote_character='"')
cube_measure = cube_dimensions[-1]
cube_measure = dimensions[-1]

# Create variables as all String
variable_prefix = 'v'
dimension_variables = [
f"{variable_prefix}{n}"
f"v{n}"
for n
in range(1, len(cube_dimensions) + 1)]
in range(1, len(dimensions) + 1)]
for variable in dimension_variables:
dataload_process.add_variable(name=variable, variable_type='String')
variable_cube_measure = dimension_variables[-1]
Expand Down

0 comments on commit b11d817

Please sign in to comment.