-
Notifications
You must be signed in to change notification settings - Fork 1
/
begin_compare.py
150 lines (113 loc) · 4.52 KB
/
begin_compare.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import argparse
import os
import shutil
import sys
import time
from multiprocessing import Pool
from collect_result import collect
from config import csv_config, redis_config
from core.common.config import log
from src.client.kyligence import KE
from src.database.reader import CsvReader
from src.database.reader import RedisReader
from src.database.writer import CsvWriter, clean_dirs
from src.database.writer import RedisWriter
from src.entry.csv_format import CsvFormat
from src.entry.response import GoreplayReceive
parser = argparse.ArgumentParser(description='command line arguments')
parser.add_argument('--process', type=int,
help='Child process number.', required=False,
default=10)
parser.add_argument('--date', type=str,
help='The sql data date.Format yyyy-MM-dd', required=False,
default="")
parser.add_argument('--batch', type=str,
help='The execute id.', required=True,
default="")
parser.add_argument('--mod', type=str,
help='', required=True,
default="date")
def sub_process(process_number: int, batch_name: str):
log.info('Run child process %s (%s)...' % (str(process_number), os.getpid()))
r = RedisReader()
if not r.ready():
log.warn("Redis not ready.")
log.info('Stop child process %s (%s)...' % (str(process_number), os.getpid()))
return
clean_dirs(csv_config["server_result"] + os.sep + batch_name)
csv_writer = CsvWriter(csv_config["server_result"] + os.sep + batch_name)
while True:
source_message = r.read_goreplay(redis_config["daily"])
if source_message is None:
log.info('Stop child process %s (%s)...' % (str(process_number), os.getpid()))
return
if source_message == "":
time.sleep(1)
continue
log.info('Process %s (%s) consume 1 message.' % (str(process_number), os.getpid()))
ke = KE(source_message)
csv_writer.insert(str(process_number), ke.query())
def prepare_redis_data(batch_name: str, dt: str, m: str) -> bool:
r = RedisReader()
# clean batch
while True:
source_message = r.read_goreplay(redis_config["daily"])
if source_message is None:
break
log.info("Clean redis data, batch: %s. Maybe saved by last job.", batch_name)
r.close()
w = RedisWriter()
def read_to_redis(value: CsvFormat):
w.insert_goreplay(redis_config["daily"], value.to_redis_format())
if m == "error":
reader = CsvReader(csv_config["backup"] + os.sep + dt)
for file in os.listdir(reader.file_dir):
if file.endswith(".csv") and file != "NOT_FOUND.csv" and file != "SUCCESS.csv":
result = GoreplayReceive()
res = reader.read_to_other(file, result, read_to_redis)
if not res:
return False
return True
else:
reader = CsvReader(csv_config["goreplay_data_dir_name"])
result = GoreplayReceive()
return reader.read_to_other(dt, result, read_to_redis)
def clean_history_dirs(dirs: str, cur: int, dt: str, md: str):
reader = CsvReader(dirs)
files = []
for file in os.listdir(reader.file_dir):
if os.path.isdir(reader.file_dir + os.sep + file):
files.append(int(file))
files.sort()
if len(files) < 20:
return
for index in range(0, len(files) - 20 + 1):
if cur == files[index] or (md == "error" and int(dt) == files[index]):
continue
log.info("Clean history dirs %d", files[index])
shutil.rmtree(reader.file_dir + os.sep + str(files[index]))
def clean_history(bt: str, dt: str, md: str):
current = int(bt)
clean_history_dirs(csv_config["server_result"], current, dt, md)
clean_history_dirs(csv_config["compare_result"], current, dt, md)
clean_history_dirs(csv_config["backup"], current, dt, md)
if __name__ == '__main__':
args = vars(parser.parse_args())
process_num = args["process"]
date = args["date"]
batch = args["batch"]
mod = args["mod"]
if process_num <= 0:
process_num = 1
if process_num >= 20:
process_num = 20
clean_history(batch, date, mod)
if not prepare_redis_data(batch, date, mod):
log.error("Prepare data failed.")
sys.exit(-1)
pool = Pool(process_num)
for i in range(process_num):
pool.apply_async(sub_process, args=(i, batch,))
pool.close()
pool.join()
collect(batch)