forked from kartikhans/HotCarbon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIoHeavyJob.py
84 lines (61 loc) · 3.15 KB
/
IoHeavyJob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
from google.cloud import bigquery
from google.oauth2 import service_account
import names
import random
import uuid
from datetime import datetime
from dotenv import load_dotenv
card_types = ['VISA', 'MASTERCARD', 'RUPAY']
status_types = ['ACTIVE', 'INACTIVE']
transaction_status = ['INPROGRESS', 'COMPLETE']
load_dotenv(dotenv_path='.env')
table_id_directory = dict(user=os.environ['USER_TABLE'], credit_card=os.environ['CREDIT_TABLE'],
transaction=os.environ['TRANSACTION_TABLE'])
def generate_random_transaction_data(credit_card_id):
return [dict(transaction_id=str(uuid.uuid4()), amount=random.randint(1, 1000),
datetime=str(datetime.now()), status=random.choice(transaction_status),
credit_card_id=credit_card_id)]
def generate_random_credit_data(user_id):
return [dict(card_id=str(uuid.uuid4()), last_four=random.randint(1001, 9999), category=random.choice(card_types),
user_id=user_id, status=random.choice(status_types))]
def generate_random_user_data():
return [dict(user_id=str(uuid.uuid4()), name=names.get_full_name(), country='US', age=random.randint(1, 100))]
class IoHeavyJob:
def __init__(self, key_path, user_count):
self.credentials = service_account.Credentials.from_service_account_file(key_path,
scopes=[
'https://www.googleapis.com/auth/bigquery'])
self.client = bigquery.Client(credentials=self.credentials, project=self.credentials.project_id)
self.last_userid = None
self.user_count = user_count
def execute_query(self, query):
return self.client.query(query).result()
def clean_db(self):
for key in table_id_directory.keys():
query = f'DELETE FROM {table_id_directory.get(key)} WHERE true;'
print(query)
self.execute_query(query)
def push_data(self, data, table_id):
print(data, table_id)
errors = self.client.insert_rows_json(table_id, data)
return errors
def execute(self):
# self.clean_db()
for i in range(1, self.user_count + 1):
user_data = generate_random_user_data()
self.push_data(user_data, table_id_directory.get('user'))
for j in range(3):
credit_card_data = generate_random_credit_data(user_data[0]['user_id'])
self.push_data(credit_card_data, table_id_directory.get('credit_card'))
for k in range(5):
transaction_data = generate_random_transaction_data(credit_card_data[0]['card_id'])
self.push_data(transaction_data, table_id_directory.get('transaction'))
if i % 3 == 0:
query = f'SELECT * FROM {table_id_directory.get('user')} WHERE user_id = "{self.last_userid}" LIMIT 1000;'
self.execute_query(query)
self.last_userid = user_data[0]['user_id']
if __name__ == '__main__':
key_path = os.environ['KEY_PATH']
x = IoHeavyJob(key_path=key_path, user_count=4)
x.execute()