Skip to content

Commit

Permalink
🦙 feat(data): alpaca 🦙 (#241)
Browse files Browse the repository at this point in the history
* add alpaca get_ohlc fx

* add test

* update hist script

* update script and workflow #minor

* use tokens in run

* fix feed

* try parenthesis

* provider dir

* test fixed
  • Loading branch information
alkalescent authored Jan 6, 2025
1 parent d94c6f6 commit e2ca93e
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 61 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ohlc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ jobs:
AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
S3_BUCKET: ${{ secrets.S3_BUCKET }}
POLYGON: ${{ secrets.POLYGON }}
ALPACA: ${{ secrets.ALPACA }}
ALPACA_SECRET: ${{ secrets.ALPACA_SECRET }}
run: python scripts/update_ohlc.py
2 changes: 2 additions & 0 deletions .github/workflows/sandbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ env:
PREF_EXCHANGE: ${{ secrets.PREF_EXCHANGE }}
TEST: true
GLASSNODE_PASS: ${{ secrets.GLASSNODE_PASS }}
ALPACA_PAPER: ${{ secrets.ALPACA_PAPER }}
ALPACA_PAPER_SECRET: ${{ secrets.ALPACA_PAPER_SECRET}}

jobs:
build:
Expand Down
6 changes: 6 additions & 0 deletions hyperdrive/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ def get_env_bool(var_name):
IDX_DIR = 'indices'
# providers
POLY_DIR = 'polygon'
ALPACA_DIR = 'alpaca'
# models
MODELS_DIR = 'models'

folders = {
'polygon': POLY_DIR,
'alpaca': ALPACA_DIR
}

# Column Names
Expand Down Expand Up @@ -109,6 +111,8 @@ def get_env_bool(var_name):
'X%3ALTCUSD', 'X%3AXMRUSD', 'X%3AIOTUSD'
]

ALPC_CRYPTO_SYMBOLS = ['BTC/USD', 'ETH/USD', 'LTC/USD']

SENTIMENT_SYMBOLS_IGNORE = {
'SPYD', 'VWDRY', 'BPMP',
'FOX', 'YYY', 'SDIV',
Expand All @@ -124,6 +128,8 @@ def get_env_bool(var_name):
FEW_DAYS = str(FEW) + 'd'
SCRIPT_FAILURE_THRESHOLD = 0.95

ALPACA_FREE_DELAY = 0.5

# Exchanges
BINANCE = 'BINANCE'
KRAKEN = 'KRAKEN'
Expand Down
132 changes: 110 additions & 22 deletions hyperdrive/DataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,16 @@ def save_ndx(self, **kwargs):
if os.path.exists(filename):
return filename

def log_api_call_time(self):
self.last_api_call_time = time()

def obey_free_limit(self, free_delay):
if self.free and hasattr(self, 'last_api_call_time'):
time_since_last_call = time() - self.last_api_call_time
delay = free_delay - time_since_last_call
if delay > 0:
sleep(delay)


class Indices(MarketData):
def __init__(self):
Expand All @@ -448,6 +458,102 @@ def get_ndx(self, date=datetime.now()):
return self.standardize_ndx(df)


class Alpaca(MarketData):
# AlpacaData
def __init__(
self,
token=os.environ.get('ALPACA'),
secret=os.environ.get('ALPACA_SECRET'),
free=True,
paper=False
):
super().__init__()
self.base = 'https://data.alpaca.markets'
self.token = os.environ.get(
'ALPACA_PAPER') if paper or C.TEST else token
self.secret = os.environ.get(
'ALPACA_PAPER_SECRET') if paper or C.TEST else secret
if not (self.token and self.secret):
raise Exception('missing Alpaca credentials')
self.provider = 'alpaca'
self.free = free

# def get_dividends(self, **kwargs):
# pass
# def get_splits(self, **kwargs):
# pass

def get_ohlc(self, **kwargs):
def _get_ohlc(symbol, timeframe='max'):
is_crypto = symbol in C.ALPC_CRYPTO_SYMBOLS
version = 'v1beta3' if is_crypto else 'v2'
page_token = None
start, _ = self.traveller.convert_dates(timeframe)
parts = [
self.base,
version,
'crypto/us' if is_crypto else 'stocks',
'bars',
]
url = '/'.join(parts)
pre_params = {
'symbols': symbol,
'timeframe': '1D',
'start': start,
'limit': 10000,
} | ({} if is_crypto else {'adjustment': 'all', 'feed': 'iex'})
headers = {
'APCA-API-KEY-ID': self.token,
'APCA-API-SECRET-KEY': self.secret
}
results = []
while True:
self.obey_free_limit(C.ALPACA_FREE_DELAY)
try:
post_params = {
'page_token': page_token} if page_token else {}
params = pre_params | post_params
response = requests.get(url, params, headers=headers)
if not response.ok:
raise Exception(
'Invalid response from Alpaca for OHLC',
response.status_code,
response.text
)
data = response.json()
if data.get('bars') and data['bars'].get(symbol):
results += data['bars'][symbol]
finally:
self.log_api_call_time()
if data.get('next_page_token'):
page_token = data['next_page_token']
else:
break
df = pd.DataFrame(results)
columns = {
't': 'date',
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'v': 'volume',
'vw': 'average',
'n': 'trades'
}
df = df.rename(columns=columns)
df['date'] = pd.to_datetime(df['date']).dt.tz_convert(
C.TZ).dt.tz_localize(None)
df = self.standardize_ohlc(symbol, df)
return self.reader.data_in_timeframe(df, C.TIME, timeframe)
return self.try_again(func=_get_ohlc, **kwargs)

# def get_intraday(self, **kwargs):
# pass

# def get_news(self, **kwargs):
# pass


class Polygon(MarketData):
def __init__(self, token=os.environ.get('POLYGON'), free=True):
super().__init__()
Expand All @@ -465,19 +571,9 @@ def paginate(self, gen, apply):
results.append(apply(item))
return results

def obey_free_limit(self):
if self.free and hasattr(self, 'last_api_call_time'):
time_since_last_call = time() - self.last_api_call_time
delay = C.POLY_FREE_DELAY - time_since_last_call
if delay > 0:
sleep(delay)

def log_api_call_time(self):
self.last_api_call_time = time()

def get_dividends(self, **kwargs):
def _get_dividends(symbol, timeframe='max'):
self.obey_free_limit()
self.obey_free_limit(C.POLY_FREE_DELAY)
try:
start, _ = self.traveller.convert_dates(timeframe)
response = self.paginate(
Expand All @@ -495,8 +591,6 @@ def _get_dividends(symbol, timeframe='max'):
'amount': div.cash_amount
}
)
except Exception as e:
raise e
finally:
self.log_api_call_time()
raw = pd.DataFrame(response)
Expand All @@ -506,7 +600,7 @@ def _get_dividends(symbol, timeframe='max'):

def get_splits(self, **kwargs):
def _get_splits(symbol, timeframe='max'):
self.obey_free_limit()
self.obey_free_limit(C.POLY_FREE_DELAY)
try:
start, _ = self.traveller.convert_dates(timeframe)
response = self.paginate(
Expand All @@ -522,8 +616,6 @@ def _get_splits(symbol, timeframe='max'):
'ratio': split.split_from / split.split_to
}
)
except Exception as e:
raise e
finally:
self.log_api_call_time()
raw = pd.DataFrame(response)
Expand All @@ -536,14 +628,12 @@ def _get_ohlc(symbol, timeframe='max'):
is_crypto = symbol.find('X%3A') == 0
formatted_start, formatted_end = self.traveller.convert_dates(
timeframe)
self.obey_free_limit()
self.obey_free_limit(C.POLY_FREE_DELAY)
try:
response = self.client.get_aggs(
symbol, 1, 'day',
from_=formatted_start, to=formatted_end, adjusted=True, limit=C.POLY_MAX_AGGS_LIMIT
)
except Exception as e:
raise e
finally:
self.log_api_call_time()

Expand Down Expand Up @@ -573,7 +663,7 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
raise Exception(f'No dates in timeframe: {timeframe}.')

for _, date in enumerate(dates):
self.obey_free_limit()
self.obey_free_limit(C.POLY_FREE_DELAY)
try:
response = self.client.get_aggs(
symbol, min, 'minute', from_=date, to=date,
Expand All @@ -582,8 +672,6 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
except exceptions.NoResultsError:
# This is to prevent breaking the loop over weekends
continue
except Exception as e:
raise e
finally:
self.log_api_call_time()

Expand Down
39 changes: 30 additions & 9 deletions scripts/update_hist_ohlc.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import os
import sys
from multiprocessing import Process
sys.path.append('hyperdrive')
from DataSource import Polygon # noqa autopep8
from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8

from hyperdrive.DataSource import Polygon, Alpaca
from hyperdrive.Constants import PathFinder
import hyperdrive.Constants as C

alpc = Alpaca(paper=C.TEST)
poly = Polygon(os.environ['POLYGON'])
stock_symbols = poly.get_symbols()
crypto_symbols = POLY_CRYPTO_SYMBOLS
all_symbols = stock_symbols + crypto_symbols
poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS
alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols)
timeframe = '2m'

# Double redundancy
# 1st pass


def update_poly_ohlc():
for symbol in all_symbols:
for symbol in poly_symbols:
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=poly.provider)
try:
Expand All @@ -23,10 +25,29 @@ def update_poly_ohlc():
print(f'Polygon.io OHLC update failed for {symbol}.')
print(e)
finally:
if CI and os.path.exists(filename):
if C.CI and os.path.exists(filename):
os.remove(filename)

# 2nd pass


def update_alpc_ohlc():
for symbol in alpc_symbols:
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=alpc.provider)
try:
alpc.save_ohlc(symbol=symbol, timeframe=timeframe)
except Exception as e:
print(f'Alpaca OHLC update failed for {symbol}.')
print(e)
finally:
if C.CI and os.path.exists(filename):
os.remove(filename)


p1 = Process(target=update_poly_ohlc)
p2 = Process(target=update_alpc_ohlc)
p1.start()
p2.start()
p1.join()
p2.join()
39 changes: 29 additions & 10 deletions scripts/update_ohlc.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import os
import sys
from multiprocessing import Process, Value
sys.path.append('hyperdrive')
from DataSource import Polygon # noqa autopep8
from Constants import PathFinder, POLY_CRYPTO_SYMBOLS, FEW_DAYS # noqa autopep8
import Constants as C # noqa autopep8
from hyperdrive.DataSource import Polygon, Alpaca
from hyperdrive.Constants import PathFinder
import hyperdrive.Constants as C

counter = Value('i', 0)
alpc = Alpaca(paper=C.TEST)
poly = Polygon(os.environ['POLYGON'])
stock_symbols = poly.get_symbols()
crypto_symbols = POLY_CRYPTO_SYMBOLS
all_symbols = stock_symbols + crypto_symbols
poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS
alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols)


def update_poly_ohlc():
for symbol in all_symbols:
for symbol in poly_symbols:
try:
filename = poly.save_ohlc(
symbol=symbol, timeframe=FEW_DAYS, retries=1)
symbol=symbol, timeframe=C.FEW_DAYS, retries=1)
with counter.get_lock():
counter.value += 1
except Exception as e:
Expand All @@ -30,9 +29,29 @@ def update_poly_ohlc():
os.remove(filename)


def update_alpc_ohlc():
for symbol in alpc_symbols:
try:
filename = alpc.save_ohlc(
symbol=symbol, timeframe=C.FEW_DAYS, retries=1)
with counter.get_lock():
counter.value += 1
except Exception as e:
print(f'Alpaca OHLC update failed for {symbol}.')
print(e)
finally:
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=alpc.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)


p1 = Process(target=update_poly_ohlc)
p2 = Process(target=update_alpc_ohlc)
p1.start()
p2.start()
p1.join()
p2.join()

if counter.value / len(all_symbols) < 0.95:
if counter.value / (len(poly_symbols) + len(alpc_symbols)) < 0.95:
exit(1)
Loading

0 comments on commit e2ca93e

Please sign in to comment.