-
Notifications
You must be signed in to change notification settings - Fork 0
/
DataWrite.py
151 lines (123 loc) · 6.16 KB
/
DataWrite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import json
from datetime import datetime
from CockroachHandler import CockroachHandler
class DataWrite:
"""
This class implements the data write functionality in Cenote.
"""
def __init__(self):
"""
Initializes this data writer.
"""
self.ch = CockroachHandler()
self.excluded_columns = ["uuid"]
self.nested_properties_sep = '$'
def create_table(self, table_name, column_specs):
# Every table should have a uuid column which will be used as primary key
column_specs.append({"name": "uuid", "type": "UUID", "primary_key": "yes"})
# Every table should have a cenote created_at column, a timestamp end column and an id column
column_specs.append({"name": "cenote" + self.nested_properties_sep + "created_at", "type": "timestamp"})
column_specs.append({"name": "cenote" + self.nested_properties_sep + "timestamp", "type": "timestamp"})
column_specs.append({"name": "cenote" + self.nested_properties_sep + "id", "type": "uuid"})
return self.ch.create_table(table_name, column_specs)
@staticmethod
def get_table(url):
if url.endswith('/'):
url = url[:-1]
info = url.split('/projects/')
project_id = info[len(info) - 1].split('/events/')[0]
event_collection = info[len(info) - 1].split('/events/')[1]
table = project_id + '_' + event_collection
return table
def create_column_specs(self, obj, col_specs=None, prev_key=''):
if col_specs is None:
col_specs = []
for key in obj:
if type(obj[key]) is dict:
if prev_key != '':
self.create_column_specs(obj[key], col_specs,
prev_key + self.nested_properties_sep + key.replace(' ', '').lower())
else:
self.create_column_specs(obj[key], col_specs, key.replace(' ', '').lower())
else:
info = {}
if prev_key != '':
info["name"] = prev_key + self.nested_properties_sep + key.replace(' ', '').lower()
else:
info["name"] = key.replace(' ', '').lower()
if type(obj[key]) is str:
info["type"] = "string"
elif type(obj[key]) is bool:
info["type"] = "bool"
else:
info["type"] = "decimal"
col_specs.append(info)
return col_specs
def create_data_write_obj(self, obj, data=None, prev_key=''):
if data is None:
data = []
for key in obj:
if type(obj[key]) is dict:
if prev_key != '':
self.create_data_write_obj(obj[key], data,
prev_key + self.nested_properties_sep + key.replace(' ', '').lower())
else:
self.create_data_write_obj(obj[key], data, key.replace(' ', '').lower())
else:
info = {}
if prev_key != '':
info["column"] = prev_key + self.nested_properties_sep + key.replace(' ', '').lower()
else:
info["column"] = key.replace(' ', '').lower()
info["value"] = obj[key]
data.append(info)
return data
def append_cenote_info(self, obj, data=None):
if data is None:
data = []
if "timestamp" in obj["cenote"].keys():
timestamp = obj["cenote"]["timestamp"]
else:
timestamp = datetime.utcnow().isoformat()
data.append({"column": "cenote" + self.nested_properties_sep + "created_at",
"value": datetime.fromtimestamp(obj["cenote"]["created_at"] / 1e3).isoformat()})
data.append({"column": "cenote" + self.nested_properties_sep + "timestamp", "value": timestamp})
data.append({"column": "cenote" + self.nested_properties_sep + "id", "value": obj["cenote"]["id"]})
data.append({"column": "uuid", "built_in_function": "gen_random_uuid()"})
return data
def write_data(self, data_instance_array):
if type(data_instance_array) is str:
data_instance_array = json.loads(data_instance_array)
# Check/create table based on first event
first_event = data_instance_array[0]
table = self.get_table(first_event["cenote"]["url"])
col_specs = self.create_column_specs(first_event["data"], [])
res = self.create_table(table, col_specs)
if res["response"] != 201:
return {"response": 400, "exception": "Can't create table"}
# Create missing columns in current schema
if self.ch.describe_table(table) is None:
current_schema_cols = [val["name"] for val in col_specs]
else:
current_schema_cols = list(self.ch.describe_table(table).keys())
cols_to_be_added = [val for val in col_specs if val['name'] not in current_schema_cols]
if len(cols_to_be_added) > 0:
self.ch.alter_table(table, cols_to_be_added)
current_schema_cols = list(self.ch.describe_table(table).keys())
# Write events
try:
data_to_write = []
for data_instance in data_instance_array:
# Basic check if data have same schema
if data_instance["cenote"]["url"] != first_event["cenote"]["url"]:
raise Exception("Data don't belong to the same table!")
data = self.create_data_write_obj(data_instance["data"], [])
data = self.append_cenote_info(data_instance, data)
data = [val for val in data if val['column'] in current_schema_cols]
data_to_write.append(data)
res = self.ch.write_data(table, data_to_write)
if res["response"] != 201:
raise Exception(res["exception"])
return {"response": 201}
except Exception as e:
return {"response": 400, "exception": repr(e)}