forked from EdwardBetts/FincLab
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
269 lines (223 loc) · 10.7 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import queue
import time
import logging
import multiprocessing as mp
import feeder.PrepareData
"""
Class: Event-Driven Engine
Author: Peter Lee
Date created: 2016-Jan-19
Two parallel processes:
1.) Event-driven engine
2.) User Interface
Notes
-----
The "Event-Driven Engine" is the core component of the system that:
- Encapsulates the event-handling logic (by directing the events to appropriate components)
- Manages events in a queue in a timely order, and
- Process all events in the queue and sleeps for the period of "heartbeat" before waking up to process new events.
The Engine object comprises of two while-loops:
- The outer-loop (the "heartbeat" loop) : decides the speed of resolution of the system.
* In a backtesting setting, the "heartbeat" rate is set to zero since all data points are historic and available. The backtesting ends if the DataHandler object lets the Engine ojbect know, by using a boolean .is_running attribute.
* In a live trading setting, the "heartbeat" rate decides the resolution of the system, and is dependent upon the applied strategy. For example, a heart rate of 600 (seconds) means that the "Event Queue" is will query and queue up the MarketEvent every 10 minutes.
- The inner-loop (the life cycle of an "Event") : Starting with receiving a MarketEvent, follow-up events are directly to corresponding components to be processed. As a result, the "Event Queue" is continually being populated and depopulated with events. The loop must be closed before a new MarketEvent can be processed.
Flow-Chart
----------
Main loop : The main process cycle of events.
MarketEvent --> Strategy object (to calculate signal)
--> SignalEvent --> Portfolio object (to transform into orders)
--> (a set of) OrderEvent --> ExecutionHandler object (to simulate the execution in a backtesting or send to broker's API in live trading)
--> FillEvent --> Portfolio object (to update positions given the fill orders)
Side loop : Some events are processed by multiple targets.
MarketEvent --> Portfolio object (to reindex the time and update positions in the portfolio)
"""
logger = logging.getLogger("FincLab.engine")
class Engine(mp.Process):
"""
Core of the event-driven system : Sets up the event queue and directs event to the correct system component.
"""
def __init__(self,
config,
data_handler,
execution_handler,
portfolio,
strategy,
event_queue=queue.Queue()):
"""
Initialises the Engine object.
Parameters
----------
config : configparser()
The system configuration object.
data_handler : DataHandler (class) or an inherited class, default HistoricCSVDataHandler
Handles the market data feed.
execution_handler : ExecutionHandler (Class) or an inherited class, default SimulatedExecutionHandler
Handles the orders, and fills for trades.
portfolio : Portfolio (Class) or an inherited class, default Portfolio.
Keeps track of portfolio current and prior positions.
event_queue : queue.Queue()
Queue to queue up different events
strategy : Strategy (Class) or an inherited class, default MovingAverageCrossoverStrategy.
Generates signals based on market data.
"""
self.config = config
self.initial_capital = float(self.config['general']['initial_capital'])
self.heartbeat = float(self.config['engine']['heartbeat'])
self.data_handler_cls = data_handler
self.execution_handler_cls = execution_handler
self.portfolio_cls = portfolio
self.strategy_cls = strategy
self.event_queue = event_queue
self.start_date = config.dt_start_date
self.end_date = config.dt_end_date
self.symbol_list = []
# Initialise a logger
self.logger = logging.getLogger("FincLab.Engine")
self.logger.propagate = True
self.num_signals = 0 # Number of processed SignalEvents
self.num_orders = 0 # Number of processed OrderEvents
self.num_fills = 0 # Number of process FillEvents
self.num_strats = 1 # Number of strategies
def _initialise_system_components(self):
"""
Attaches the trading objects (DataHandler, Strategy, Portfolio and ExecutionHandler) to internal members.
"""
self.logger.info("Initialising system components: data feeder, strategy, portfolio and execution handler...")
self.data_handler = self.data_handler_cls(
config=self.config,
event_queue=self.event_queue,
symbol_list=self.symbol_list,
)
self.strategy = self.strategy_cls(
bars=self.data_handler,
event_queue=self.event_queue
)
self.portfolio = self.portfolio_cls(
bars=self.data_handler,
event_queue=self.event_queue,
config=self.config
)
self.execution_handler = self.execution_handler_cls(
event_queue=self.event_queue
)
def _run_engine(self):
"""
Using two while-loops to execute the event-driven engine.
The engine is single treaded, and all events are placed in a FIFO queue.
Notes
-----
The core of the engine comprises of two while-loops:
- The outer keeps track of the heartbeat of the system.
- The inner checks if there is an event in the Queue object, and acts on it.
Flow-Chart
----------
1) Main flow: The main process cycle of events.
MarketEvent --> Strategy object (to calculate signal)
--> SignalEvent --> Portfolio object (to transform into orders)
--> (a set of) OrderEvent --> ExecutionHandler object (to simulate the execution in a backtesting or send to broker's API in live trading)
--> FillEvent --> Portfolio object (to update positions given the fill orders)
2) Side loop : Some events are processed by multiple targets.
MarketEvent --> Portfolio object (to reindex the time and update positions in the portfolio)
"""
i = 0
while True:
i += 1
# self.logger.debug(i)
# self.logger.info("Process ID: {}, I Love YOU!".format(os.getpid()))
# Check if DataHandler is running --> if yes, get a mareket update
if self.data_handler.is_running:
self.data_handler.update_bars() # push latest bar and put a market event into the queue
else:
break # request to "QUIT" from DataHandler
# To process all remaining events following this MarketEvent
while True:
try:
event = self.event_queue.get(block=False, timeout=None)
except queue.Empty:
# self.logger.debug("queue empty!")
break # End of queue --> get more data
else:
if event is not None:
# self.logger.debug("event type: " + event.type)
if event.type == 'MARKET':
self.strategy.calculate_signals(event)
self.portfolio.update_timeindex(event)
self.logger.info("[Status]current_datetime:{}".format(event.datetime.strftime("%Y-%m-%d")))
elif event.type == 'SIGNAL':
self.num_signals += 1
self.portfolio.update_signal(event)
elif event.type == 'ORDER':
self.num_orders += 1
self.execution_handler.execute_order(event)
elif event.type == 'FILL':
self.num_fills += 1
self.portfolio.update_fill(event)
time.sleep(self.heartbeat)
def _output_performance(self):
"""
Outputs the strategy performance from the engine.
Notes
-----
The performance of the strategy can be displayed to the terminal/console.
The equity curve pandas DataFrame is created and the summary statistics are displayed, as well as the count of Signals, Orders and Fills.
"""
self.portfolio.create_equity_curve_dataframe()
self.logger.info("Creating summary statistics...")
stats = self.portfolio.output_summary_stats()
self.logger.info("Creating equity curve...")
self.logger.info(self.portfolio.equity_curve.tail(10))
self.logger.info(stats)
self.logger.info("Number of Signals: {}".format(self.num_signals))
self.logger.info("Number of Orders: {}".format(self.num_orders))
self.logger.info("Number of Fills: {}".format(self.num_fills))
def prepare_data(self):
"""
Start to prepare the datasets
"""
self.logger.info("Preparing datasets for backtesting...")
prepare_data = feeder.PrepareData.PrepareData(config=self.config)
self.symbol_list = prepare_data.get_symbol_list()
def run(self):
"""
Starts the event-driven system.
"""
self.logger.info("[Status]Engine:Prepare Data")
self.prepare_data()
# Attach system components to internal members
self.logger.info("[Status]Engine:Initialising")
self._initialise_system_components()
self.logger.info("[Status]Engine:Backtesting")
# Display the strategy description
self.logger.info("[Desc]{}".format(self.strategy.description))
self._run_engine()
self._output_performance()
self.logger.info("[Status]Engine:Completed")
return
if __name__ == '__main__':
from config import config
from feeder.HistoricData import HistoricData as DataHandler
from execution.SimulatedExecutionHandler import SimulatedExecutionHandler as ExecutionHandler
from strategy.MovingAverageCrossover import MovingAverageCrossover as Strategy
from portfolio.Portfolio import Portfolio
from logger import create_logger
import logging.handlers
print(config.dt_end_date, config.dt_start_date)
event_queue = queue.Queue()
heartbeat = 0
initial_capital = 100000
log_queue = queue.Queue()
logger = create_logger(log_queue)
logger.setLevel(logging.DEBUG)
listener = logging.handlers.QueueListener(log_queue, logging.StreamHandler())
listener.start()
system = Engine(
config=config,
data_handler=DataHandler,
execution_handler=ExecutionHandler,
portfolio=Portfolio,
strategy=Strategy,
event_queue=event_queue
)
system.run()
listener.stop()
print("Complete.")