-
Notifications
You must be signed in to change notification settings - Fork 2
/
preprocess.py
112 lines (96 loc) · 3.39 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import json
import yaml
import csv
import pandas as pd
import argparse
from time import time
import io
import sqlite3
parser = argparse.ArgumentParser()
parser.add_argument("file_name", type=str)
args = parser.parse_args()
file_name = args.file_name
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
input_file_path = root_path + "/cohana/%s/data.csv" % file_name
print(input_file_path)
print(os.listdir(root_path+"/cohana/%s"%file_name))
output_path = root_path + "/cohana/%s" % file_name
raw_output = output_path + "/raw.csv"
yaml_input = output_path + "/table.yaml"
yaml_input2 = output_path + "/cube.yaml"
dim_output = output_path + "/dim.csv"
def simpleRead():
rawdata = pd.read_csv(input_file_path)
rawdata.fillna("null", inplace=True)
rawdata.to_csv(raw_output, header=False, index=False)
print("raw save finished")
spec = {}
with open(yaml_input, 'r') as stream:
try:
spec = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)
with open(dim_output, 'w') as csvfile:
dimwriter = csv.writer(csvfile)
for field in spec['fields']:
if field['dataType'] == 'String':
for key in rawdata[field['name']].astype('str').unique():
try:
dimwriter.writerow([field['name'], key])
except Exception:
pass
elif field['fieldType'] == 'ActionTime':
dimwriter.writerow([field['name'], str(rawdata[field['name']].min())+'|'+str(rawdata[field['name']].max())])
elif field['dataType'] == 'Int32':
dimwriter.writerow([field['name'], str(int(rawdata[field['name']].min()))+'|'+str(int(rawdata[field['name']].max()))])
else:
pass
def create_dim():
table = file_name
conn = None
print('creating dim...')
try:
conn = sqlite3.connect('dim.db')
c = conn.cursor()
sql = 'select name from sqlite_master where type = "table" and name = "%s";' % table
if all(t[0] != table for t in c.execute(sql)):
sql = 'create table "%s" (col VARCHAR(200), value VARCHAR(200));' % table
c.execute(sql)
print('table "%s" created' % table)
else:
sql = 'delete from "%s"' % table
c.execute(sql)
print('table "%s" deleted' % table)
insert_sql = 'INSERT INTO "%s" (col, value) VALUES ' % table
sql = insert_sql
i = 0
with io.open(dim_output) as ifile:
while ifile.readable():
line = ifile.readline().strip('\n').split(',')
if len(line) < 2:
break
sql += '("%s", "%s"),' % (line[0], line[1])
i += 1
if i == 200:
c.execute(sql.rstrip(',')+';')
sql = insert_sql
i = 0
c.execute(sql.rstrip(',')+';')
print('value inserted')
conn.commit()
conn.close()
except Exception as e:
conn.close()
raise e
t0 = time()
print("Preprocessing Started")
simpleRead()
create_dim()
print("Preprocessing Finished in "+str(time()-t0))
t0 = time()
print("Start Loading Engine")
os.system("mkdir cohana/"+file_name+"/000000")
os.system("java -jar utils/LocalLoader.jar '"+yaml_input+"' '"+dim_output+"' '"+raw_output+"' 'cohana/"+file_name+"/000000' 65536")
print("java -jar utils/LocalLoader.jar '"+yaml_input+"' '"+dim_output+"' '"+raw_output+"' 'cohana/"+file_name+"/000000' 65536")
print("Loading Finished in "+str(time()-t0))