-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcrypto_API.py
90 lines (77 loc) · 3.56 KB
/
crypto_API.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import time
import numpy as np
import pandas as pd
from numpy import dtype
dtypes = {'timestamp': np.int64, 'Asset_ID': np.int8,
'Count': np.int32, 'Open': np.float64,
'High': np.float64, 'Low': np.float64,
'Close': np.float64, 'Volume': np.float64,
'VWAP': np.float64, 'Target': np.float64}
def datestring_to_timestamp(ts):
return int(pd.Timestamp(ts).timestamp())
def read_csv_slice(file_path, dtypes=dtypes, use_window=None):
df = pd.read_csv(file_path, dtype=dtypes)
if use_window is not None:
df = df[(df.timestamp >= use_window[0]) & (df.timestamp < use_window[1])]
return df
def read_sql_slice(con, dtypes=dtypes, use_window=None):
df = pd.read_sql(f"select * from mycrypto.kaggle_train where timestamp>={use_window[0]} and timestamp <= {use_window[1]}"
, con)
return df
def weighted_correlation(a, b, weights):
w = np.ravel(weights)
a = np.ravel(a)
b = np.ravel(b)
sum_w = np.sum(w)
mean_a = np.sum(a * w) / sum_w
mean_b = np.sum(b * w) / sum_w
var_a = np.sum(w * np.square(a - mean_a)) / sum_w
var_b = np.sum(w * np.square(b - mean_b)) / sum_w
cov = np.sum((a * b * w)) / np.sum(w) - mean_a * mean_b
corr = cov / np.sqrt(var_a * var_b)
return corr
#Here's code for the local API emulator.
class API:
def __init__(self, df):
df = df.astype(dtypes)
df['row_id'] = df.index
dfg = df.groupby('timestamp')
self.data_iter = dfg.__iter__()
self.init_num_times = len(dfg)
self.next_calls = 0
self.pred_calls = 0
self.predictions = []
self.targets = []
print("This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set. ;)")
def __iter__(self):
return self
def __len__(self):
return self.init_num_times - self.next_calls
def __next__(self):
assert self.pred_calls == self.next_calls, "You must call `predict()` successfully before you can get the next batch of data."
timestamp, df = next(self.data_iter)
self.next_calls += 1
data_df = df.drop(columns=['Target'])
true_df = df.drop(columns=['timestamp','Count','Open','High','Low','Close','Volume','VWAP'])
true_df = true_df[['row_id', 'Target', 'Asset_ID']]
self.targets.append(true_df)
pred_df = true_df.drop(columns=['Asset_ID'])
pred_df['Target'] = 0.
return data_df, pred_df
def predict(self, pred_df):
assert self.pred_calls == self.next_calls - 1, "You must get the next batch of data from the API before making a new prediction."
assert pred_df.columns.to_list() == ['row_id', 'Target'], "Prediction dataframe should have columns `row_id` and `Target`."
pred_df = pred_df.astype({'row_id': dtype('int64'), 'Target': dtype('float64')})
self.predictions.append(pred_df)
self.pred_calls += 1
def score(self, id_2_weight):
pred_df = pd.concat(self.predictions).rename(columns={'Target':'Prediction'})
true_df = pd.concat(self.targets)
scoring_df = pd.merge(true_df, pred_df, on='row_id', how='left')
scoring_df['Weight'] = scoring_df.Asset_ID.map(id_2_weight)
scoring_df = scoring_df[scoring_df.Target.isna()==False]
if scoring_df.Prediction.var(ddof=0) < 1e-10:
score = -1
else:
score = weighted_correlation(scoring_df.Prediction, scoring_df.Target, scoring_df.Weight)
return scoring_df, score