-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathb_wss.py
executable file
·189 lines (146 loc) · 6.97 KB
/
b_wss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/python -W ignore
import websocket
import json
import lib_v2_globals as g
import lib_v2_ohlc as o
import toml
import os, sys, getopt, mmap, time, math
import pandas as pd
from colorama import Fore, Style
from colorama import init as colorama_init
def on_message(ws, message):
# !'Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume'])
global long_file_handle_ary
global long_file_ary
global long_json_file_ary
global spair
dary = json.loads(message)
epoch =dary['E']
# Timestamp = f"{datetime.utcfromtimestamp(epoch / 1000).replace(microsecond = epoch % 1000 * 1000)}"
Timestamp = epoch
Open = float(dary['k']['o'])
High = float(dary['k']['h'])
Low = float(dary['k']['l'])
Close = float(dary['k']['c'])
Volume = float(dary['k']['v'])
line_ary = [Timestamp,Open,High,Low,Close,Volume]
# line_str = f"{Timestamp},{Open},{High},{Low},{Close},{Volume}\n"
if g.gcounter == 1:
g.last_close = Close
_data = []
g.tmp1 = {'columns': ['Date', 'Open', 'High', 'Low', 'Close', 'Volume'], 'index': [], 'data': []}
for f in range(len(g.wss_filters)):
g.running_total[f] += Close - g.last_close
if abs(g.running_total[f]) >= g.wss_filters[f]:
g.recover += 1
if g.verbose:
print(Fore.YELLOW,g.recover, g.gcounter, line_ary, f"f={g.wss_filters[f]}", f'{g.running_total[f]:,.2f}', Style.RESET_ALL)
# * save data to small file
g.wss_small.append(line_ary) # * add each line to list
iloc_s = g.cvars['datawindow'] * -1 # * get size to trim array
g.wss_small = g.wss_small[iloc_s:] # * create resize list
# * update timestamp to be 1sec intervals descing from current time
# * this is to fix matplotlib's issues with non-linear x-axis dates
redate = int(time.time())
# * update timestamp in CSV string also
line_str = f"{redate*1000},{Open},{High},{Low},{Close},{Volume}\n"
for i in range(len(g.wss_small)-1,-1,-1):
if not math.isnan(g.wss_small[i][1]):
g.wss_small[i][0] = redate*1000
redate -= 1
ppjson = json.dumps(g.wss_small) # * create JSON format of data
# spair = g.cvars['pair'].replace("/","") # * get restringed pair name
outfile = f"/tmp/_{spair}_0m_{g.wss_filters[f]}f.tmp"
# print(f"WRITING to {outfile}")
# * save to file - don't keep file open as we are rewriting fromn scratch.
with open(outfile, 'w') as fo: # open the file in write mode
fo.write(ppjson)
fo.close()
# * save data to long file - flush every n writes
long_file_handle_ary[f].write(line_str)
if g.gcounter % 10 == 0:
long_file_handle_ary[f].flush()
# * update long files every datawindow cycle
# if g.gcounter % 1 == 0:
if g.gcounter % g.cvars['datawindow'] == 0:
long_file_handle_ro = open(long_file_ary[f], "r")
line = long_file_handle_ro.readline().strip()
nidx = 0
g.wss_large = []
_index = []
while(line):
line = long_file_handle_ro.readline().strip()
g.wss_large.append(line.split(","))
_index.append(nidx)
nidx += 1
# g.tmp1['columns'] = g.dprep['columns']
g.tmp1['index'] = _index
g.tmp1['data'] = g.wss_large
long_ppjson = json.dumps(g.tmp1)
long_file_handle_ary[f].flush()
with open(long_json_file_ary[f], 'w') as fo: # open the file in write mode
fo.write(long_ppjson)
fo.close()
if g.verbose:
print("wrote final/length OHLC file:", long_json_file_ary[f], len(g.tmp1['data']))
# # * mv when done - atomic action to prevent read error
os.rename(f'/tmp/_{spair}_0m_{g.wss_filters[f]}f.tmp', f'/tmp/_{spair}_0m_{g.wss_filters[f]}f.json')
# shutil.copy2(f'/tmp/_stream_filter_{g.filteramt}_{spair}.json',f'/tmp/cp_stream_filter_{g.filteramt}_{spair}.json')
g.running_total[f] = 0
else:
if g.verbose:
print(Fore.RED, g.recover, g.gcounter, line_ary, f"f={g.wss_filters[f]}", f'{g.running_total[f]:,.2f}', Style.RESET_ALL)
g.last_close = Close
g.gcounter += 1
def on_error(ws,error):
print(error)
def on_close(ws,a,b):
print(f"### closed [{a}] [{b}] ###")
# + ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓
#
# if o.checkIfProcessRunning('/home/jw/store/src/jmcap/v2bot/b_wss.py'):
# print('Already running... exiting')
# exit()
os.chdir("/home/jw/src/jmcap/v2bot")
g.cvars = toml.load(g.cfgfile)
g.wss_filters = g.cvars['wss_filters']
g.filteramt = 0 # * def no filter
# + ≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡
argv = sys.argv[1:]
g.verbose = False
g.pair = g.cvars['pair']
try:
opts, args = getopt.getopt(argv, "-hva:p:", ["help", "verbose", "pair="])
except getopt.GetoptError as err:
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
print("-h, --help")
print("-v, --verbose")
print("-p, --pair")
sys.exit(0)
if opt in ("-v", "--verbose"):
g.verbose = True
if opt in ("-p", "--pair"):
g.pair = arg
# + ≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡
colorama_init()
g.recover = 0
spair = g.pair.replace("/","")
# * define and open here to reduse file i/o
long_file_ary = []
long_json_file_ary = []
long_file_handle_ary = []
g.running_total = []
for f in range(len(g.wss_filters)):
long_file_ary.append(f"data/{spair}_0m_{g.wss_filters[f]}f.csv")
long_json_file_ary.append(f"data/{spair}_0m_{g.wss_filters[f]}f.json")
long_file_handle_ary.append(open(long_file_ary[f], "a")) # append mode
g.running_total.append(0)
dline = [float("Nan"),float("Nan"),float("Nan"),float("Nan"),float("Nan"),float("Nan")]
g.wss_small = [dline]*g.cvars['datawindow']
cc = "btcusdt"
socket = f"wss://stream.binance.com:9443/ws/{cc}@kline_1m"
g.gcounter +=1 # * add one to skipe the first mod 0 condition in the flush routine
ws = websocket.WebSocketApp(socket,on_message = on_message, on_error = on_error, on_close = on_close)
ws.run_forever()