forked from Smartitect/dataprep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
newstage_3_31_renameColumns.py
59 lines (44 loc) · 2.26 KB
/
newstage_3_31_renameColumns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#%% [markdown]
# # Stage : Rename Columns
# - Based on parameters in config, rename the columns outlined in the config file
# - Save the data flow that has been created for each file away so that it can be referenced and used later on
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import os as os
import re as re
import collections
from azureml.dataprep import col
from azureml.dataprep import Dataflow
from commonDataFlowProcessingLoop import dataFlowProcessingLoop
from commonInventoryCreation import getColumnStats, getDataFlowStats
from commonPackageHandling import openDataFlowPackage, saveDataFlowPackage
# Let's also set up global variables and common functions...
previousStageNumber = '30'
thisStageNumber = '31'
#%%
def renameColumns(dataName, previousStageNumber, thisStageNumber, qualityFlag, operatorToUse, operationFlag):
dataFlow, fullPackagePath = openDataFlowPackage(dataName, previousStageNumber, qualityFlag)
if dataFlow:
print('{0}: loaded package from path {1}'.format(dataName, fullPackagePath))
if operationFlag != '':
# Do the operation on columns to rename them...
print('{0}: renamed {1} columns'.format(dataName, operationFlag))
else:
print('{0}: no operation to perform'.format(dataName))
dataProfile = dataFlow.get_profile()
# Now generate column and data flow inventories
columnInventory = getColumnStats(dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
dataFlowInventory = getDataFlowStats(dataFlow, dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
# Finally save the data flow so it can be passed onto the next stage of the process...
targetPackagePath = saveDataFlowPackage(dataFlow, dataName, thisStageNumber, 'A')
print('{0}: saved package to {1}'.format(dataName, targetPackagePath))
return dataFlow, columnInventory, dataFlowInventory
else:
print('{0}: no package file found at location {1}'.format(dataName, fullPackagePath))
return None, None, None
#%%
dataFlowInventoryAll = dataFlowProcessingLoop(previousStageNumber, thisStageNumber, 'A', 'RenameColumns', renameColumns)
#%%
dataFlowInventoryAll