-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Continuous ingestion of realtime data from live brokers #26
Comments
Here is how I attempted to implement this in a separate project. During the live algorithm initialization, I created minutes writer and reader objects and assigned them to the broker: def _create_minute_writer(self):
root = get_broker_minute_writer_root(self.broker.name)
filename = os.path.join(root, 'metadata.json')
if os.path.isfile(filename):
writer = BcolzMinuteBarWriter.open(
root, self.sim_params.end_session)
else:
writer = BcolzMinuteBarWriter(
rootdir=root,
calendar=self.trading_calendar,
minutes_per_day=NYSE,
start_session=self.sim_params.start_session,
end_session=self.sim_params.end_session,
write_metadata=True
)
self.broker.minute_writer = writer
self.broker.minute_reader = BcolzMinuteBarReader(root) Then in # Should I not use a timezone?
dt = pd.Timestamp.utcnow().floor('1 min')
value = None
if self.minute_reader is not None:
try:
# Slight delay to minimize the chances that multiple algos
# might try to hit the cache at the exact same time.
sleep_time = random.uniform(0.5, 0.8)
sleep(sleep_time)
# TODO: This does not always return a value! Why is that?
value = self.minute_reader.get_value(
sid=asset.sid,
dt=dt,
field=field
)
except Exception as e:
log.warn('minute data not found: {}'.format(e))
if value is None or np.isnan(value):
# This does not directly apply to current IB broker implementation
# I have a separate get_candles method which returns an OHLCV dict
# I use it to get spot value and histry
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
if self.minute_writer is not None:
df = pd.DataFrame(
[ohlc],
index=pd.DatetimeIndex([dt]),
columns=['open', 'high', 'low', 'close', 'volume']
)
try:
self.minute_writer.write_sid(
sid=asset.sid,
df=df
)
log.debug('wrote minute data: {}'.format(dt))
except Exception as e:
# Since the reader comes out empty, it is going to hit this exception when trying
# to get spot value for a single asset more than once in the same minutely bar
log.warn(
'unable to write minute data: {} {}'.format(dt, e)) Here is the issue: the data seems to be written correctly but |
@fredfortier : Thanks for sharing! For some reason I missed this post. Glad to see that you made progress. Do you have any update on this? |
I'm not exactly sure on this, but I suspect that you need to update the asset_db to advance the symbol's end_date after minutely ingestion. |
Yes, the code above seems to be working. If I recall correctly, the issue was with the calendar parameter. I'm trading an asset with a 24H schedule so I had to change it to OPEN. I don't personally use the asset_db, but it may need some updates when trading equities as you are suggesting. |
Continuous ingestion (i.e.: load data directly to the bundles) is not easily achievable: Therefore I implemented a much simpler solution: collect the bars and dump out to csv at the end of the session. The collected realtime bars are merged with the historical bars to provide data continuity. What's needed from the user side is to ingest EOD the dumped CSV files. I'll keep this issue open until we provide an 'official' way of ingestion EOD. |
Zipline-Live will support live data feed from Brokers. To match with Quantopian's
original design the algorithms will access the live feed through
data.current()
.As it is important to have reproducible runs for the algorithms it would be beneficial
to store (or continuously ingest) the live feed. Data is precious.
Live feed storage should be transparent to the algo and broker agnostic.
The text was updated successfully, but these errors were encountered: