From 6592b86be9e78f5fd9b1222406d9dc5c530a7ed8 Mon Sep 17 00:00:00 2001 From: Alexander Malysh Date: Tue, 10 Jan 2023 12:17:22 +0100 Subject: [PATCH] updated PMAX indicator Updated PMAX indicator: 1. added normalize ATR option 2. added fill_na option 3. removed multiple for loops and use only one: speedup --- technical/indicators/indicators.py | 125 ++++++++++++++++------------- 1 file changed, 70 insertions(+), 55 deletions(-) diff --git a/technical/indicators/indicators.py b/technical/indicators/indicators.py index e72a645a..5a73199c 100644 --- a/technical/indicators/indicators.py +++ b/technical/indicators/indicators.py @@ -1101,7 +1101,7 @@ def SSLChannels(dataframe, length=10, mode="sma"): return df["sslDown"], df["sslUp"] -def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noqa: C901 +def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1, normalize_atr=False, fill_na=True): # noqa: C901 """ Function to compute PMAX Source: https://www.tradingview.com/script/sU9molfV/ @@ -1113,6 +1113,8 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq multiplier : Integer indicates value to multiply the ATR length: moving averages length MAtype: type of the moving average + normalize_atr: normalize ATR? + fill_na: fill NA with 0? Returns : df : Pandas DataFrame with new columns added for @@ -1121,6 +1123,7 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq PMAX Direction (pmX_$period_$multiplier_$length_$Matypeint) """ import talib.abstract as ta + from numpy import minimum, maximum df = dataframe.copy() mavalue = "MA_" + str(MAtype) + "_" + str(length) @@ -1161,63 +1164,75 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq df[mavalue] = vwma(df, length) elif MAtype == 9: df[mavalue] = zema(df, period=length) - # Compute basic upper and lower bands - df["basic_ub"] = df[mavalue] + (multiplier * df[atr]) - df["basic_lb"] = df[mavalue] - (multiplier * df[atr]) - # Compute final upper and lower bands - df["final_ub"] = 0.00 - df["final_lb"] = 0.00 - for i in range(period, len(df)): - df["final_ub"].iat[i] = ( - df["basic_ub"].iat[i] - if ( - df["basic_ub"].iat[i] < df["final_ub"].iat[i - 1] - or df[mavalue].iat[i - 1] > df["final_ub"].iat[i - 1] - ) - else df["final_ub"].iat[i - 1] - ) - df["final_lb"].iat[i] = ( - df["basic_lb"].iat[i] - if ( - df["basic_lb"].iat[i] > df["final_lb"].iat[i - 1] - or df[mavalue].iat[i - 1] < df["final_lb"].iat[i - 1] - ) - else df["final_lb"].iat[i - 1] - ) - # Set the Pmax value - df[pm] = 0.00 - for i in range(period, len(df)): - df[pm].iat[i] = ( - df["final_ub"].iat[i] - if ( - df[pm].iat[i - 1] == df["final_ub"].iat[i - 1] - and df[mavalue].iat[i] <= df["final_ub"].iat[i] - ) - else df["final_lb"].iat[i] - if ( - df[pm].iat[i - 1] == df["final_ub"].iat[i - 1] - and df[mavalue].iat[i] > df["final_ub"].iat[i] - ) - else df["final_lb"].iat[i] - if ( - df[pm].iat[i - 1] == df["final_lb"].iat[i - 1] - and df[mavalue].iat[i] >= df["final_lb"].iat[i] - ) - else df["final_ub"].iat[i] - if ( - df[pm].iat[i - 1] == df["final_lb"].iat[i - 1] - and df[mavalue].iat[i] < df["final_lb"].iat[i] - ) - else 0.00 - ) + ''' + MAvg=getMA(src, length) + // lowerband + longStop = Normalize ? MAvg - Multiplier*atr/close : MAvg - Multiplier*atr + longStopPrev = nz(longStop[1], longStop) + longStop := MAvg > longStopPrev ? max(longStop, longStopPrev) : longStop + // upperband + shortStop = Normalize ? MAvg + Multiplier*atr/close : MAvg + Multiplier*atr + shortStopPrev = nz(shortStop[1], shortStop) + shortStop := MAvg < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop + dir = 1 + dir := nz(dir[1], dir) + dir := dir == -1 and MAvg > shortStopPrev ? 1 : dir == 1 and MAvg < longStopPrev ? -1 : dir + PMax = dir==1 ? longStop: shortStop + ''' + # Calculate Results + m = df['close'].size + dir_, pmax = [1] * m, [0] * m - # Mark the trend direction up/down - df[pmx] = np.where((df[pm] > 0.00), np.where((df[mavalue] < df[pm]), "down", "up"), np.NaN) - # Remove basic and final bands from the columns - df.drop(["basic_ub", "basic_lb", "final_ub", "final_lb"], inplace=True, axis=1) + # Compute basic upper and lower bands + upperband = df[mavalue] + (multiplier * df[atr]) / (df['close'] if normalize_atr else 1) + lowerband = df[mavalue] - (multiplier * df[atr]) / (df['close'] if normalize_atr else 1) + + for i in range(1, m): + ''' + // upperband + shortStop = Normalize ? MAvg + Multiplier*atr/close : MAvg + Multiplier*atr + shortStopPrev = nz(shortStop[1], shortStop) + shortStop := MAvg < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop + ''' + if df[mavalue].iloc[i] < upperband.iloc[i - 1]: + upperband.iloc[i] = minimum(upperband.iloc[i], upperband.iloc[i - 1]) + + ''' + // lowerband + longStop = Normalize ? MAvg - Multiplier*atr/close : MAvg - Multiplier*atr + longStopPrev = nz(longStop[1], longStop) + longStop := MAvg > longStopPrev ? max(longStop, longStopPrev) : longStop + ''' + if df[mavalue].iloc[i] > lowerband.iloc[i - 1]: + lowerband.iloc[i] = maximum(lowerband.iloc[i - 1], lowerband.iloc[i]) + + ''' + dir = 1 + dir := nz(dir[1], dir) + dir := dir == -1 and MAvg > shortStopPrev ? 1 : dir == 1 and MAvg < longStopPrev ? -1 : dir + PMax = dir==1 ? longStop: shortStop + ''' + dir_[i] = dir_[i - 1] + if dir_[i] < 0 and df[mavalue].iloc[i] > upperband.iloc[i - 1]: + dir_[i] = 1 + elif dir_[i] > 0 and df[mavalue].iloc[i] < lowerband.iloc[i - 1]: + dir_[i] = -1 + + if dir_[i] > 0: + pmax[i] = lowerband.iloc[i] + else: + pmax[i] = upperband.iloc[i] - df.fillna(0, inplace=True) + # Prepare DataFrame to return + df = DataFrame({ + mavalue: df[mavalue], + pmx: dir_, + pm: pmax, + }, index=df['close'].index) + + if fill_na: + df.fillna(0, inplace=True) return df