Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated PMAX indicator #308

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 70 additions & 55 deletions technical/indicators/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ def SSLChannels(dataframe, length=10, mode="sma"):
return df["sslDown"], df["sslUp"]


def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noqa: C901
def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1, normalize_atr=False, fill_na=True): # noqa: C901
"""
Function to compute PMAX
Source: https://www.tradingview.com/script/sU9molfV/
Expand All @@ -1113,6 +1113,8 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq
multiplier : Integer indicates value to multiply the ATR
length: moving averages length
MAtype: type of the moving average
normalize_atr: normalize ATR?
fill_na: fill NA with 0?

Returns :
df : Pandas DataFrame with new columns added for
Expand All @@ -1121,6 +1123,7 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq
PMAX Direction (pmX_$period_$multiplier_$length_$Matypeint)
"""
import talib.abstract as ta
from numpy import minimum, maximum

df = dataframe.copy()
mavalue = "MA_" + str(MAtype) + "_" + str(length)
Expand Down Expand Up @@ -1161,63 +1164,75 @@ def PMAX(dataframe, period=10, multiplier=3, length=12, MAtype=1, src=1): # noq
df[mavalue] = vwma(df, length)
elif MAtype == 9:
df[mavalue] = zema(df, period=length)
# Compute basic upper and lower bands
df["basic_ub"] = df[mavalue] + (multiplier * df[atr])
df["basic_lb"] = df[mavalue] - (multiplier * df[atr])
# Compute final upper and lower bands
df["final_ub"] = 0.00
df["final_lb"] = 0.00
for i in range(period, len(df)):
df["final_ub"].iat[i] = (
df["basic_ub"].iat[i]
if (
df["basic_ub"].iat[i] < df["final_ub"].iat[i - 1]
or df[mavalue].iat[i - 1] > df["final_ub"].iat[i - 1]
)
else df["final_ub"].iat[i - 1]
)
df["final_lb"].iat[i] = (
df["basic_lb"].iat[i]
if (
df["basic_lb"].iat[i] > df["final_lb"].iat[i - 1]
or df[mavalue].iat[i - 1] < df["final_lb"].iat[i - 1]
)
else df["final_lb"].iat[i - 1]
)

# Set the Pmax value
df[pm] = 0.00
for i in range(period, len(df)):
df[pm].iat[i] = (
df["final_ub"].iat[i]
if (
df[pm].iat[i - 1] == df["final_ub"].iat[i - 1]
and df[mavalue].iat[i] <= df["final_ub"].iat[i]
)
else df["final_lb"].iat[i]
if (
df[pm].iat[i - 1] == df["final_ub"].iat[i - 1]
and df[mavalue].iat[i] > df["final_ub"].iat[i]
)
else df["final_lb"].iat[i]
if (
df[pm].iat[i - 1] == df["final_lb"].iat[i - 1]
and df[mavalue].iat[i] >= df["final_lb"].iat[i]
)
else df["final_ub"].iat[i]
if (
df[pm].iat[i - 1] == df["final_lb"].iat[i - 1]
and df[mavalue].iat[i] < df["final_lb"].iat[i]
)
else 0.00
)
'''
MAvg=getMA(src, length)
// lowerband
longStop = Normalize ? MAvg - Multiplier*atr/close : MAvg - Multiplier*atr
longStopPrev = nz(longStop[1], longStop)
longStop := MAvg > longStopPrev ? max(longStop, longStopPrev) : longStop
// upperband
shortStop = Normalize ? MAvg + Multiplier*atr/close : MAvg + Multiplier*atr
shortStopPrev = nz(shortStop[1], shortStop)
shortStop := MAvg < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop
dir = 1
dir := nz(dir[1], dir)
dir := dir == -1 and MAvg > shortStopPrev ? 1 : dir == 1 and MAvg < longStopPrev ? -1 : dir
PMax = dir==1 ? longStop: shortStop
'''
# Calculate Results
m = df['close'].size
dir_, pmax = [1] * m, [0] * m

# Mark the trend direction up/down
df[pmx] = np.where((df[pm] > 0.00), np.where((df[mavalue] < df[pm]), "down", "up"), np.NaN)
# Remove basic and final bands from the columns
df.drop(["basic_ub", "basic_lb", "final_ub", "final_lb"], inplace=True, axis=1)
# Compute basic upper and lower bands
upperband = df[mavalue] + (multiplier * df[atr]) / (df['close'] if normalize_atr else 1)
lowerband = df[mavalue] - (multiplier * df[atr]) / (df['close'] if normalize_atr else 1)

for i in range(1, m):
'''
// upperband
shortStop = Normalize ? MAvg + Multiplier*atr/close : MAvg + Multiplier*atr
shortStopPrev = nz(shortStop[1], shortStop)
shortStop := MAvg < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop
'''
if df[mavalue].iloc[i] < upperband.iloc[i - 1]:
upperband.iloc[i] = minimum(upperband.iloc[i], upperband.iloc[i - 1])

'''
// lowerband
longStop = Normalize ? MAvg - Multiplier*atr/close : MAvg - Multiplier*atr
longStopPrev = nz(longStop[1], longStop)
longStop := MAvg > longStopPrev ? max(longStop, longStopPrev) : longStop
'''
if df[mavalue].iloc[i] > lowerband.iloc[i - 1]:
lowerband.iloc[i] = maximum(lowerband.iloc[i - 1], lowerband.iloc[i])

'''
dir = 1
dir := nz(dir[1], dir)
dir := dir == -1 and MAvg > shortStopPrev ? 1 : dir == 1 and MAvg < longStopPrev ? -1 : dir
PMax = dir==1 ? longStop: shortStop
'''
dir_[i] = dir_[i - 1]
if dir_[i] < 0 and df[mavalue].iloc[i] > upperband.iloc[i - 1]:
dir_[i] = 1
elif dir_[i] > 0 and df[mavalue].iloc[i] < lowerband.iloc[i - 1]:
dir_[i] = -1

if dir_[i] > 0:
pmax[i] = lowerband.iloc[i]
else:
pmax[i] = upperband.iloc[i]

df.fillna(0, inplace=True)
# Prepare DataFrame to return
df = DataFrame({
mavalue: df[mavalue],
pmx: dir_,
pm: pmax,
}, index=df['close'].index)

if fill_na:
df.fillna(0, inplace=True)

return df

Expand Down