diff --git a/smartmoneyconcepts/smc.py b/smartmoneyconcepts/smc.py index 115af09..034ca28 100644 --- a/smartmoneyconcepts/smc.py +++ b/smartmoneyconcepts/smc.py @@ -6,6 +6,8 @@ from datetime import datetime # Docs: https://github.com/joshyattridge/smart-money-concepts/blob/master/README.md + + def inputvalidator(input_="ohlc"): def dfcheck(func): @wraps(func) @@ -13,7 +15,8 @@ def wrap(*args, **kwargs): args = list(args) i = 0 if isinstance(args[0], pd.DataFrame) else 1 - args[i] = args[i].rename(columns={c: c.lower() for c in args[i].columns}) + args[i] = args[i].rename( + columns={c: c.lower() for c in args[i].columns}) inputs = { "o": "open", @@ -29,7 +32,8 @@ def wrap(*args, **kwargs): for l in input_: if inputs[l] not in args[i].columns: raise LookupError( - 'Must have a dataframe column named "{0}"'.format(inputs[l]) + 'Must have a dataframe column named "{0}"'.format( + inputs[l]) ) return func(*args, **kwargs) @@ -116,33 +120,32 @@ def fvg(cls, ohlc: DataFrame, join_consecutive=False) -> Series: for i in np.where(~np.isnan(fvg))[0]: mask = np.zeros(len(ohlc), dtype=np.bool_) if fvg[i] == 1: - mask = ohlc["low"][i + 2 :] <= top[i] + mask = ohlc["low"][i + 2:] <= top[i] elif fvg[i] == -1: - mask = ohlc["high"][i + 2 :] >= bottom[i] + mask = ohlc["high"][i + 2:] >= bottom[i] if np.any(mask): j = np.argmax(mask) + i + 2 mitigated_index[i] = j mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index) - return pd.concat( [ - pd.Series(fvg, name="FVG"), - pd.Series(top, name="Top"), - pd.Series(bottom, name="Bottom"), - pd.Series(mitigated_index, name="MitigatedIndex"), - ], + return pd.concat([ + pd.Series(fvg, name="FVG"), + pd.Series(top, name="Top"), + pd.Series(bottom, name="Bottom"), + pd.Series(mitigated_index, name="MitigatedIndex"), + ], axis=1, ) - class SwingMethodEvaluator(Enum): COMBINED = "combined" FRACTALS = "fractals" MOMENTUM = "momentum" WEIGHTED_ROLLING_WINDOW = "weighted_rolling_window" DEFAULT = "default" - + @classmethod def swing_highs_lows(cls, ohlc: DataFrame, swing_evaluator: SwingMethodEvaluator = SwingMethodEvaluator.DEFAULT, - swing_length: int = 5, short_swing_length: int = 10, long_swing_length=50) -> Series: + swing_length: int = 10, short_swing_length: int = 10, long_swing_length=50) -> Series: """ Swing Highs and Lows without lookahead bias. @@ -160,7 +163,6 @@ def swing_highs_lows(cls, ohlc: DataFrame, swing_evaluator: SwingMethodEvaluator HighLow = 1 if swing high, -1 if swing low Level = the level of the swing high or low """ - def fractals(): """ Identifies swing highs/lows based on a smaller subset of candles rather than a fixed-length rolling window. @@ -170,33 +172,32 @@ def fractals(): Trade-off: Less robust for noisy markets. """ swing_highs = np.where( - (ohlc["high"] > ohlc["high"].shift(1)) & - (ohlc["high"] > ohlc["high"].shift(2)) & - (ohlc["high"] > ohlc["high"].shift(-1)) & - (ohlc["high"] > ohlc["high"].shift(-2)), + (ohlc["high"] > ohlc["high"].shift(1)) & + (ohlc["high"] > ohlc["high"].shift(2)) & + (ohlc["high"] > ohlc["high"].rolling(window=swing_length, center=False).max().shift(1)), 1, np.nan, ) swing_lows = np.where( - (ohlc["low"] < ohlc["low"].shift(1)) & - (ohlc["low"] < ohlc["low"].shift(2)) & - (ohlc["low"] < ohlc["low"].shift(-1)) & - (ohlc["low"] < ohlc["low"].shift(-2)), + (ohlc["low"] < ohlc["low"].shift(1)) & + (ohlc["low"] < ohlc["low"].shift(2)) & + (ohlc["low"] < ohlc["low"].rolling(window=swing_length, center=False).min().shift(1)), -1, np.nan, ) # Combine swing highs and lows - swing_highs_lows = np.nan_to_num(swing_highs) + np.nan_to_num(swing_lows) + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) # Determine swing levels level = np.where( swing_highs_lows == 1, ohlc["high"], # Level for swing highs - np.where(swing_highs_lows == -1, ohlc["low"], np.nan), # Level for swing lows + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), ) return swing_highs_lows, level - def momentum(): """ Use momentum (e.g., rate of change or RSI) to confirm swing points earlier by identifying @@ -219,17 +220,18 @@ def momentum(): np.nan, ) # Combine swing highs and lows - swing_highs_lows = np.nan_to_num(swing_highs) + np.nan_to_num(swing_lows) + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) # Determine swing levels level = np.where( swing_highs_lows == 1, ohlc["high"], # Level for swing highs - np.where(swing_highs_lows == -1, ohlc["low"], np.nan), # Level for swing lows + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), ) return swing_highs_lows, level - def weighted_rolling_window(): """ Instead of treating all candles equally in the rolling window, give more weight to recent candles. @@ -245,17 +247,18 @@ def weighted_rolling_window(): swing_highs = np.where(ohlc["high"] >= ohlc["high_ema"], 1, np.nan) swing_lows = np.where(ohlc["low"] <= ohlc["low_ema"], -1, np.nan) # Combine swing highs and lows - swing_highs_lows = np.nan_to_num(swing_highs) + np.nan_to_num(swing_lows) + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) # Determine swing levels level = np.where( swing_highs_lows == 1, ohlc["high"], # Level for swing highs - np.where(swing_highs_lows == -1, ohlc["low"], np.nan), # Level for swing lows + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), ) return swing_highs_lows, level - def combined(): """ Combine Short and Long Swing Lengths for Swing Highs and Lows. @@ -264,25 +267,23 @@ def combined(): ohlc: DataFrame - Contains columns 'high' and 'low'. short_swing_length: int - Number of candles for short-term swings. long_swing_length: int - Number of candles for long-term swings. - - Returns: - DataFrame with columns: - - HighLow: 1 for short-term swing high, -1 for short-term swing low, - 2 for long-term swing high, -2 for long-term swing low, - NaN otherwise. - - Level: Swing high or low price level. """ # Short-term swing highs and lows - short_highs = ohlc["high"].rolling(window=short_swing_length, min_periods=1).max() - short_lows = ohlc["low"].rolling(window=short_swing_length, min_periods=1).min() + short_highs = ohlc["high"].rolling( + window=short_swing_length, min_periods=1).max() + short_lows = ohlc["low"].rolling( + window=short_swing_length, min_periods=1).min() - short_swing_highs = np.where(ohlc["high"] == short_highs, 1, np.nan) + short_swing_highs = np.where( + ohlc["high"] == short_highs, 1, np.nan) short_swing_lows = np.where(ohlc["low"] == short_lows, -1, np.nan) # Long-term swing highs and lows - long_highs = ohlc["high"].rolling(window=long_swing_length, min_periods=1).max() - long_lows = ohlc["low"].rolling(window=long_swing_length, min_periods=1).min() + long_highs = ohlc["high"].rolling( + window=long_swing_length, min_periods=1).max() + long_lows = ohlc["low"].rolling( + window=long_swing_length, min_periods=1).min() long_swing_highs = np.where(ohlc["high"] == long_highs, 2, np.nan) long_swing_lows = np.where(ohlc["low"] == long_lows, -2, np.nan) @@ -302,12 +303,13 @@ def combined(): swing_highs_lows == -1, ohlc["low"], # Short-term low np.where( swing_highs_lows == 2, ohlc["high"], # Long-term high - np.where(swing_highs_lows == -2, ohlc["low"], np.nan) # Long-term low + np.where(swing_highs_lows == -2, + ohlc["low"], np.nan) # Long-term low ) ) ) return swing_highs_lows, level - + def default(): """ Swing Highs and Lows without lookahead bias. @@ -320,29 +322,33 @@ def default(): # Calculate swing highs swing_highs = np.where( - ohlc["high"] == ohlc["high"].rolling(window=swing_length, min_periods=1).max(), + ohlc["high"] == ohlc["high"].rolling( + window=swing_length, min_periods=1).max(), 1, np.nan, ) # Calculate swing lows swing_lows = np.where( - ohlc["low"] == ohlc["low"].rolling(window=swing_length, min_periods=1).min(), + ohlc["low"] == ohlc["low"].rolling( + window=swing_length, min_periods=1).min(), -1, np.nan, ) # Combine swing highs and lows - swing_highs_lows = np.nan_to_num(swing_highs) + np.nan_to_num(swing_lows) + swing_highs_lows = np.nan_to_num( + swing_highs) + np.nan_to_num(swing_lows) # Determine swing levels level = np.where( swing_highs_lows == 1, ohlc["high"], # Level for swing highs - np.where(swing_highs_lows == -1, ohlc["low"], np.nan), # Level for swing lows + # Level for swing lows + np.where(swing_highs_lows == -1, ohlc["low"], np.nan), ) return swing_highs_lows, level - + match swing_evaluator.value: case "momentum": swing_highs_lows, level = momentum() @@ -356,8 +362,7 @@ def default(): swing_highs_lows, level = default() case _: swing_highs_lows, level = default() - - + # Return results as a DataFrame return pd.DataFrame({ "HighLow": swing_highs_lows, @@ -483,10 +488,10 @@ def bos_choch( mask = np.zeros(len(ohlc), dtype=np.bool_) # if the bos is 1 then check if the candles high has gone above the level if bos[i] == 1 or choch[i] == 1: - mask = ohlc["close" if close_break else "high"][i + 2 :] > level[i] + mask = ohlc["close" if close_break else "high"][i + 2:] > level[i] # if the bos is -1 then check if the candles low has gone below the level elif bos[i] == -1 or choch[i] == -1: - mask = ohlc["close" if close_break else "low"][i + 2 :] < level[i] + mask = ohlc["close" if close_break else "low"][i + 2:] < level[i] if np.any(mask): j = np.argmax(mask) + i + 2 broken[i] = j @@ -628,7 +633,8 @@ def ob( + _volume[close_index - 2] ) lowVolume[obIndex] = _volume[close_index - 2] - highVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + highVolume[obIndex] = _volume[close_index] + \ + _volume[close_index - 1] percentage[obIndex] = ( np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0) / np.max([highVolume[obIndex], lowVolume[obIndex]], axis=0) @@ -702,7 +708,8 @@ def ob( + _volume[close_index - 1] + _volume[close_index - 2] ) - lowVolume[obIndex] = _volume[close_index] + _volume[close_index - 1] + lowVolume[obIndex] = _volume[close_index] + \ + _volume[close_index - 1] highVolume[obIndex] = _volume[close_index - 2] percentage[obIndex] = ( np.min([highVolume[obIndex], lowVolume[obIndex]], axis=0) @@ -722,7 +729,8 @@ def ob( top_series = pd.Series(top, name="Top") bottom_series = pd.Series(bottom, name="Bottom") obVolume_series = pd.Series(obVolume, name="OBVolume") - mitigated_index_series = pd.Series(mitigated_index, name="MitigatedIndex") + mitigated_index_series = pd.Series( + mitigated_index, name="MitigatedIndex") percentage_series = pd.Series(percentage, name="Percentage") return pd.concat( @@ -783,13 +791,15 @@ def liquidity( and range_low <= swing_highs_lows["Level"][c] <= range_high ): end = c - temp_liquidity_level.append(swing_highs_lows["Level"][c]) + temp_liquidity_level.append( + swing_highs_lows["Level"][c]) swing_highs_lows.loc[c, "HighLow"] = 0 if ohlc["high"].iloc[c] >= range_high: swept = c break if len(temp_liquidity_level) > 1: - average_high = sum(temp_liquidity_level) / len(temp_liquidity_level) + average_high = sum(temp_liquidity_level) / \ + len(temp_liquidity_level) liquidity[i] = 1 liquidity_level[i] = average_high liquidity_end[i] = end @@ -811,22 +821,26 @@ def liquidity( and range_low <= swing_highs_lows["Level"][c] <= range_high ): end = c - temp_liquidity_level.append(swing_highs_lows["Level"][c]) + temp_liquidity_level.append( + swing_highs_lows["Level"][c]) swing_highs_lows.loc[c, "HighLow"] = 0 if ohlc["low"].iloc[c] <= range_low: swept = c break if len(temp_liquidity_level) > 1: - average_low = sum(temp_liquidity_level) / len(temp_liquidity_level) + average_low = sum(temp_liquidity_level) / \ + len(temp_liquidity_level) liquidity[i] = -1 liquidity_level[i] = average_low liquidity_end[i] = end liquidity_swept[i] = swept liquidity = np.where(liquidity != 0, liquidity, np.nan) - liquidity_level = np.where(~np.isnan(liquidity), liquidity_level, np.nan) + liquidity_level = np.where( + ~np.isnan(liquidity), liquidity_level, np.nan) liquidity_end = np.where(~np.isnan(liquidity), liquidity_end, np.nan) - liquidity_swept = np.where(~np.isnan(liquidity), liquidity_swept, np.nan) + liquidity_swept = np.where( + ~np.isnan(liquidity), liquidity_swept, np.nan) liquidity = pd.Series(liquidity, name="Liquidity") level = pd.Series(liquidity_level, name="Level") @@ -884,7 +898,7 @@ def previous_high_low(cls, ohlc: DataFrame, time_frame: str = "1D") -> Series: currently_broken_low = False last_broken_time = resampled_previous_index - previous_high[i] = resampled_ohlc["high"].iloc[resampled_previous_index] + previous_high[i] = resampled_ohlc["high"].iloc[resampled_previous_index] previous_low[i] = resampled_ohlc["low"].iloc[resampled_previous_index] currently_broken_high = ohlc["high"].iloc[i] > previous_high[i] or currently_broken_high currently_broken_low = ohlc["low"].iloc[i] < previous_low[i] or currently_broken_low @@ -994,7 +1008,8 @@ def sessions( and (start_time <= current_time or current_time <= end_time) ): active[i] = 1 - high[i] = max(ohlc["high"].iloc[i], high[i - 1] if i > 0 else 0) + high[i] = max(ohlc["high"].iloc[i], + high[i - 1] if i > 0 else 0) low[i] = min( ohlc["low"].iloc[i], low[i - 1] if i > 0 and low[i - 1] != 0 else float("inf"), @@ -1043,7 +1058,8 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: if direction[i - 1] == 1: current_retracement[i] = round( - 100 - (((ohlc["low"].iloc[i] - bottom) / (top - bottom)) * 100), 1 + 100 - (((ohlc["low"].iloc[i] - bottom) / + (top - bottom)) * 100), 1 ) deepest_retracement[i] = max( ( @@ -1055,7 +1071,8 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: ) if direction[i] == -1: current_retracement[i] = round( - 100 - ((ohlc["high"].iloc[i] - top) / (bottom - top)) * 100, 1 + 100 - ((ohlc["high"].iloc[i] - top) / + (bottom - top)) * 100, 1 ) deepest_retracement[i] = max( ( @@ -1088,7 +1105,9 @@ def retracements(cls, ohlc: DataFrame, swing_highs_lows: DataFrame) -> Series: break direction = pd.Series(direction, name="Direction") - current_retracement = pd.Series(current_retracement, name="CurrentRetracement%") - deepest_retracement = pd.Series(deepest_retracement, name="DeepestRetracement%") + current_retracement = pd.Series( + current_retracement, name="CurrentRetracement%") + deepest_retracement = pd.Series( + deepest_retracement, name="DeepestRetracement%") return pd.concat([direction, current_retracement, deepest_retracement], axis=1)