forked from Smartitect/dataprep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
newstage_5_50_createUPMDataflow.py
77 lines (56 loc) · 3.3 KB
/
newstage_5_50_createUPMDataflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#%% [markdown]
# # Stage : Create UPM dataflows
# Uses a config file to drive which UPM dataflows we should create
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import os as os
import re as re
import collections
from azureml.dataprep import col, ReplacementsValue
from azureml.dataprep import Dataflow
from commonDataFlowProcessingLoop import dataFlowProcessingLoop
from commonInventoryCreation import getColumnStats, getDataFlowStats
from commonPackageHandling import openDataFlowPackage, saveDataFlowPackage, createNewPackageDirectory
from mappingCode import load_transformation_configuration, get_lookups_from_transforms, get_destination_column_name
# Let's also set up global variables and common functions...
previousStageNumber = '41'
thisStageNumber = '50'
#%%
def createUPMDataflow(dataName, previousStageNumber, thisStageNumber, qualityFlag, operatorToUse, operationFlag):
dataFlow, fullPackagePath = openDataFlowPackage(dataName, previousStageNumber, qualityFlag)
if dataFlow:
print('{0}: loaded package from path {1}'.format(dataName, fullPackagePath))
if operationFlag != '':
mappingConfig = dprep.read_csv('./Config/' + operationFlag).to_pandas_dataframe()
targetDataFlow = dataFlow
columnsToKeep = ''
for sourceTable in mappingConfig[mappingConfig.SourceTable == dataName]['SourceTable'].unique():
for sourceColumn, targetColumn in mappingConfig[mappingConfig.SourceTable == sourceTable][['SourceColumn', 'TargetColumn']].values:
if columnsToKeep is '':
columnsToKeep = targetColumn
else:
columnsToKeep = columnsToKeep + '|' + targetColumn
targetDataFlow = targetDataFlow.rename_columns({sourceColumn: targetColumn})
targetDataFlow = targetDataFlow.drop_columns(dprep.ColumnSelector(columnsToKeep, True, True, invert=True))
newPackageName = next(iter(mappingConfig[mappingConfig.SourceTable == dataName]['TargetTable'].unique()))
createNewPackageDirectory(newPackageName)
saveDataFlowPackage(targetDataFlow, newPackageName, thisStageNumber, 'A')
else:
print('{0}: no duplicate processing required'.format(dataName))
dataProfile = dataFlow.get_profile()
# Now generate column and data flow inventories
columnInventory = getColumnStats(dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
dataFlowInventory = getDataFlowStats(dataFlow, dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
# Finally save the data flow so it can be passed onto the next stage of the process...
targetPackagePath = saveDataFlowPackage(dataFlow, dataName, thisStageNumber, qualityFlag)
print('{0}: saved package to {1}'.format(dataName, targetPackagePath))
return dataFlow, columnInventory, dataFlowInventory
else:
print('{0}: no package file found at location {1}'.format(dataName, fullPackagePath))
return None, None, None
#%%
dataFlowInventoryAll = dataFlowProcessingLoop(previousStageNumber, thisStageNumber, 'A', 'UPMDataFlowMapping', createUPMDataflow)
#%%
dataFlowInventoryAll