-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathzipline_data_bundle_v2.py
134 lines (101 loc) · 4.52 KB
/
zipline_data_bundle_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pandas as pd
from os import listdir
from pathlib import Path
path - Path(r"D:\OneDrive - Northlight Group\Data\ETF")
# Change the path to where you have your data
#path = 'C:\\Users\\Andreas Clenow\\BookSamples\\BookModels\\data\\random_stocks\\'
"""
The ingest function needs to have this exact signature,
meaning these arguments passed, as shown below.
"""
def random_stock_data(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
# Get list of files from path
# Slicing off the last part
# 'example.csv'[:-4] = 'example'
symbols = [f[:-4] for f in listdir(path)]
if not symbols:
raise ValueError("No symbols found in folder.")
# Prepare an empty DataFrame for dividends
divs = pd.DataFrame(columns=['sid',
'amount',
'ex_date',
'record_date',
'declared_date',
'pay_date']
)
# Prepare an empty DataFrame for splits
splits = pd.DataFrame(columns=['sid',
'ratio',
'effective_date']
)
# Prepare an empty DataFrame for metadata
metadata = pd.DataFrame(columns=('start_date',
'end_date',
'auto_close_date',
'symbol',
'exchange'
)
)
# Check valid trading dates, according to the selected exchange calendar
sessions = calendar.sessions_in_range(start_session, end_session)
# Get data for all stocks and write to Zipline
daily_bar_writer.write(
process_stocks(symbols, sessions, metadata, divs)
)
# Write the metadata
asset_db_writer.write(equities=metadata)
# Write splits and dividends
adjustment_writer.write(splits=splits,
dividends=divs)
"""
Generator function to iterate stocks,
build historical data, metadata
and dividend data
"""
def process_stocks(symbols, sessions, metadata, divs):
# Loop the stocks, setting a unique Security ID (SID)
for sid, symbol in enumerate(symbols):
print('Loading {}...'.format(symbol))
# Read the stock data from csv file.
df = pd.read_csv('{}/{}.csv'.format(path, symbol), index_col=[0], parse_dates=[0])
# Check first and last date.
start_date = df.index[0]
end_date = df.index[-1]
# Synch to the official exchange calendar
df = df.reindex(sessions.tz_localize(None))[start_date:end_date]
# Forward fill missing data
df.fillna(method='ffill', inplace=True)
# Drop remaining NaN
df.dropna(inplace=True)
# The auto_close date is the day after the last trade.
ac_date = end_date + pd.Timedelta(days=1)
# Add a row to the metadata DataFrame. Don't forget to add an exchange field.
metadata.loc[sid] = start_date, end_date, ac_date, symbol, "NYSE"
# If there's dividend data, add that to the dividend DataFrame
if 'dividend' in df.columns:
# Slice off the days with dividends
tmp = df[df['dividend'] != 0.0]['dividend']
div = pd.DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
# Provide empty columns as we don't have this data for now
div['record_date'] = pd.NaT
div['declared_date'] = pd.NaT
div['pay_date'] = pd.NaT
# Store the dividends and set the Security ID
div['amount'] = tmp.tolist()
div['sid'] = sid
# Start numbering at where we left off last time
ind = pd.Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
div.set_index(ind, inplace=True)
# Append this stock's dividends to the list of all dividends
divs = divs.append(div)
yield sid, df