-
Notifications
You must be signed in to change notification settings - Fork 3
/
stage_0_0_indexFiles.py
75 lines (64 loc) · 2.39 KB
/
stage_0_0_indexFiles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#%% [markdown]
# # Stage 1 - Inventory Files
# The purpose of this stage of the process is to create an inventory of all of the source files
# Some future improvements to process:
# - Connect to Azure storage
# - Work with agreed taxonomy for sotrage so that it is more parameter driven: client/scheme name, data cut version or date etc.
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import os as os
import re as re
import collections
import time
import shutil
from azureml.dataprep import value
from azureml.dataprep import col
from azureml.dataprep import Dataflow
from commonCode import saveDataFileInventory
# Let's also set up global variables...
# NOTE - is there any way to set these up centrally?
stageNumber = '0'
nextStageNumber = '10'
# Path to the source data
# NOTE - ultimately this could point to other storage sources such as blob on Azure
dataPath = './data'
packagePath = './packages/'
#%%
# List all of the files in the data directory...
fileList = os.listdir(dataPath)
dataFiles = pd.DataFrame({'FileName':fileList})
#%%
fullFilePaths = dataPath + '/' + dataFiles.FileName
fullFilePaths.name = "FullFilePath"
dataFiles = pd.concat([dataFiles, fullFilePaths], axis=1)
#%%
fileSize = []
modifiedTime = []
dataNames = []
archived = []
for index, row in dataFiles.iterrows():
fileSize.append(os.path.getsize(row.FullFilePath))
modifiedTime.append(time.ctime(os.path.getmtime(row.FullFilePath)))
dataNames.append(row.FileName.split('.')[0])
archived.append(False)
fileSizeCol = pd.DataFrame({'FileSize':fileSize})
modifiedTimeCol = pd.DataFrame({'Modified':modifiedTime})
dataNamesCol = pd.DataFrame({'DataName':dataNames})
archivedCol = pd.DataFrame({'Archived': archived})
dataFiles = pd.concat([dataFiles, fileSizeCol], axis=1)
dataFiles = pd.concat([dataFiles, modifiedTimeCol], axis=1)
dataFiles = pd.concat([dataFiles, dataNamesCol], axis=1)
dataFiles = pd.concat([dataFiles, archivedCol], axis=1)
#%%
# Create folders to use as workspaces for each of the files
for index, row in dataFiles.iterrows():
if not os.path.isdir(packagePath):
os.mkdir(packagePath)
if os.path.isdir(packagePath + row.DataName):
shutil.rmtree(packagePath + row.DataName)
os.mkdir(packagePath + row.DataName)
#%%
# Write inventory away as input for next stage in the process
saveDataFileInventory(dataFiles, stageNumber, nextStageNumber)