-
Notifications
You must be signed in to change notification settings - Fork 14
/
utils.py
346 lines (284 loc) · 10.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""A collection of utility classes and methods to be used throughout
the diferent modules and classes in this project.
Todo:
- [code improvement, low priority] rewrite nearest_index using
next() and enumerate()
- [for fun, low priority] run benchmark on nearest_date_index
"""
import datetime
from datetime import datetime as dt
import os
import os.path
STOCK_DIR = "data/"
DATE_FORMAT = "%Y-%m-%d"
######
# CLASSES
#####
class SteppedAvgLookup(object):
"""A look up table/mapping of values to averages..
Given a set of keys and values and a step range, calculates the
average value for all steps based on the keys that lie in those
steps.
e.g. if step = 1, (keys,values) = (1.1, 10), (2.3, 5), (2.5, 3),
then the LUT it will build is:
0 to 2 -> 10 (0 to 2 is steps 0 to 1, and 1 to 2,
which has pairs (1.1, 10))
2 to inf -> 4 (2 to inf is steps 2 to 3, 3 to 4, ... etc,
which has pairs (2.3, 5), (2.5, 3))
Used for calculating the average "near" certain values, given
enough data for such a notion to be useful, but not enough data
to have an average for every single value.
Currently, primary use is estimating an ETF's true leverage factor
at its underlying asset's movement, e.g. UPRO and SPY. There is
enough data (every day between July 2009 and today) to estimate
how UPRO moves relative to SPY (supposedly 3x, but in reality it
varies) at a given movement of SPY.
"""
def __init__(self, step, keys, vals):
"""Initializes an empty lookup and then builds it based on the
given values.
Args:
step: A value specifying the step size, while a higher
value may give more precision, that does not always
imply more accuracy
keys: An array of keys which correspond to the values
vals: An array of values which correspond to the keys
"""
self._lut = {}
self._num_points = {}
self._build_lut(step, keys, vals)
def get(self, val):
"""Gets the average at the neatest key greater than the given
value.
Args:
val: A value for which an average is wanted
Returns:
A value corresponding the the average at the given value
"""
for key in sorted(self._lut.keys()):
if val < key:
return self._lut[key]
def get_num_points(self, val):
"""Returns the number of data points at the given step.
Used internally for calculating averages.
Args:
val: A value for which the number of data points is wanted
Returns:
A value corresponding to the number of data points at the
given value
"""
for key in sorted(self._num_points.keys()):
if val < key:
return self._num_points[key]
def _build_lut(self, step, keys, vals):
"""Internal function for building the LUT.
Args:
step: A value specifying the step size, while a higher
value may give more precision, that does not always
imply more accuracy
keys: An array of keys which correspond to the values
vals: An array of values which correspond to the keys
"""
for i in range(int(min(keys) // step), int(max(keys) // step)):
self._lut[i * step] = 0
self._num_points[i * step] = 0
self._lut[float("inf")] = 0
self._num_points[float("inf")] = 0
steps = sorted(self._lut.keys())
for i in range(0, len(keys)):
for j in range(0, len(steps)):
if keys[i] < steps[j]:
self._lut[steps[j]] = \
((self._lut[steps[j]]
* self._num_points[steps[j]] + vals[i])
/ (self._num_points[steps[j]] + 1))
break
for key in sorted(self._lut.keys()):
if self._lut[key] == 0:
del self._lut[key]
######
# FUNCTIONS
#####
def currency(number):
"""Nicer looking wrapper for converting to currency format.
Args:
number: A number value to covert to currency format
Returns:
A number in currency format
"""
return "{0:.2f}".format(float(number))
def percent(number):
"""Nicer looking wrapper for converting to percent format.
Args:
number: A number value to covert to percent format
Returns:
A number in percent format
"""
return "{0:.2f}".format(float(number * 100))
def date_obj(date):
"""Returns the equivalent datetime object for the given date or
date object.
Args:
number: A date string or date/datetime object
Returns:
A datetime object representing the given date
"""
if type(date) is dt:
return date
if type(date) is datetime.date:
return dt(date.year, date.month, date.day)
return dt.strptime(date, DATE_FORMAT)
def date_str(date):
"""Returns the equivalent date string for the given date or
date object.
Args:
number: A date string or date/datetime object
Returns:
A date string representing the given date
"""
if type(date) is str:
return date
return date.strftime(DATE_FORMAT)
def days_between(date_a, date_b):
"""Returns the number of days between two dates.
Args:
date_a: A date string or object representing the earlier date
date_b: A date string or object representing the later date
Returns:
A value representing a number of days
"""
return (date_obj(date_b) - date_obj(date_a)).days
def write_list_to_file(list, filename, overwrite):
"""Writes a list to a newline separated file.
Args:
list: An array/list to write to file
filename: A filename of a file to which to write
overwrite: A boolean for whether or not to overwrite an
existing file
Returns:
Number of lines written
"""
if overwrite and os.path.isfile(filename):
os.remove(filename)
written = 0
with open(filename, 'a') as file:
for item in list:
written += 1
file.write(item + '\n')
return written
def list_from_csv(filename, col, s_char, r_chars):
"""Extracts a specific column from a CSV file, given a split char.
Also removes all given chars from the values.
Args:
filename: A filename of a CSV file
col: A value for a column in a CSV file
s_char: A character representing a delimiter
r_chars: An array of characters to remove
Returns:
An array of values corresponding to the stripped column
"""
lines = readlines(filename)
with open(filename, 'r') as file:
lines = [line.strip() for line in file]
column_lines = []
for i in range(1, len(lines)):
column_lines.append(lines[i].split(sfilname_char)[col].strip())
for r in r_chars:
column_lines[-1] = column_lines[-1].replace(r, '')
return column_lines
def subtract_date(period, unit, date):
"""Subtracts the period from the given date, returns date in same
type as input.
Args:
period: A period value, e.g. 3 (for 3 days/months/years)
unit: A unit for the period, e.g. 'm' for month
date: A date from which to subtract
Returns:
A new date value in the same type as input
"""
diffs = {'y': 0, 'm': 0, 'd': 0}
diffs[unit.lower()] = int(period)
new = {}
new['y'] = date_obj(date).year - diffs['y'] - diffs['m'] // 12
new['m'] = date_obj(date).month - diffs['m'] % 12
if new['m'] < 1:
new['y'] = new['y'] + (new['m'] - 1) // 12
new['m'] = new['m'] - ((new['m'] - 1) // 12) * 12
new['d'] = min(calendar.monthrange(
new['y'], new['m'])[1], date_obj(date).day)
new_date = dt(new['y'], new['m'], new['d']) - \
datetime.timedelta(diffs['d'])
if type(date) is str:
return date_str(new_date)
return new_date
def nearest_index(val, vals, direction, val_type=None):
"""Given a value, finds the index of the nearest value before/after
said value in an array of values.
Using val_type uses an optimization. Currently only supports
'date' as a val_type, since dates are relatively predictable in
their distribution.
Args:
val: A value for which to find the nearest value in values
vals: An array of values to look through
direction: A 'direction' (-1 or +1) for looking, i.e. to look
for a nearest lower value or nearest higher value
val_type: A type of value - used for optimizations
Returns:
An index for the nearest value, -1 otherwise
"""
if val_type == 'date':
return nearest_date_index(val, vals, direction)
if (len(vals) == 0
or (vals[-1] < val and direction > 0)
or (vals[0] > val and direction < 0)):
return -1
if direction > 0 and vals[0] > val:
return 0
if direction < 0 and vals[-1] < val:
return len(vals) - 1
for i in range(0, (len(vals) - 1)):
if (val > vals[i] and val <= vals[i + 1] and direction > 0):
return i + 1
if (val <= vals[i] and val > vals[i + 1] and direction < 0):
return i
return -1
def nearest_date_index(date, dates, direction):
"""Optimization for nearest index for date types.
Approximates where the date would be based on starting and ending
dates in list and starts search there. In practise, only takes a
few steps.
Args:
date: A date for which to find the nearest date in dates
dates: An array of dates to look through
direction: A 'direction' (-1 or +1) for looking, i.e. to look
for a nearest lower value or nearest higher value
Returns:
An index for the nearest date
"""
if len(dates) == 0 or date_str(dates[-1]) < date_str(date):
return -1
if date_str(dates[0]) >= date_str(date):
return 0
last_date = date_obj(dates[-1])
first_date = date_obj(dates[0])
target_date = date_obj(date)
approx_factor = len(dates) / (last_date - first_date).days
i = int((target_date - first_date).days * approx_factor)
if i > 0:
i -= 1
if date_str(dates[i]) == date_str(date):
return i
if date_str(dates[i]) < date_str(date):
while date_str(dates[i]) < date_str(date):
i += 1
else:
while date_str(dates[i - 1]) >= date_str(date):
i -= 1
if direction == 0:
return min([i, i - 1],
key=lambda x: abs((date_obj(dates[x])
- date_obj(date)).days))
if direction < 0:
return i - 1
if direction > 0:
return i