Skip to content

Commit 35099b1

Browse files
committed
Add multi df dataframe divergence support
1 parent 9f40c65 commit 35099b1

File tree

4 files changed

+286
-3
lines changed

4 files changed

+286
-3
lines changed

pyindicators/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
has_slope_above_threshold, has_any_lower_then_threshold, \
66
has_values_above_threshold, has_values_below_threshold, is_down_trend, \
77
is_up_trend, up_and_downtrends, detect_peaks, \
8-
bearish_divergence, bullish_divergence, stochastic_oscillator
8+
bearish_divergence, bullish_divergence, stochastic_oscillator, \
9+
bearish_divergence_multi_dataframe
910
from .exceptions import PyIndicatorException
1011
from .date_range import DateRange
1112

@@ -42,4 +43,5 @@
4243
'bullish_divergence',
4344
'is_divergence',
4445
'stochastic_oscillator',
46+
'bearish_divergence_multi_dataframe'
4547
]

pyindicators/indicators/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from .is_up_trend import is_up_trend
1717
from .up_and_down_trends import up_and_downtrends
1818
from .divergence import detect_peaks, bearish_divergence, \
19-
bullish_divergence
19+
bullish_divergence, bearish_divergence_multi_dataframe
2020
from .stochastic_oscillator import stochastic_oscillator
2121

2222
__all__ = [
@@ -50,4 +50,5 @@
5050
'bullish_divergence',
5151
'is_divergence',
5252
'stochastic_oscillator',
53+
'bearish_divergence_multi_dataframe'
5354
]

pyindicators/indicators/divergence.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,3 +462,122 @@ def bearish_divergence(
462462

463463
df[result_column] = result
464464
return pl.DataFrame(df) if is_polars else df
465+
466+
467+
def bearish_divergence_multi_dataframe(
468+
first_df: Union[pd.DataFrame, pl.DataFrame],
469+
second_df: Union[pd.DataFrame, pl.DataFrame],
470+
result_df: Union[pd.DataFrame, pl.DataFrame],
471+
first_column: str,
472+
second_column: str,
473+
window_size: int = 1,
474+
result_column: str = "bearish_divergence",
475+
number_of_neighbors_to_compare: int = 5,
476+
min_consecutive: int = 2
477+
) -> Union[pd.DataFrame, pl.DataFrame]:
478+
"""
479+
Detect bearish divergence between two different DataFrames.
480+
481+
Args:
482+
first_df: DataFrame containing the indicator data (e.g., RSI).
483+
second_df: DataFrame containing the price data.
484+
result_df: DataFrame used to store results. Must be aligned in time.
485+
first_column: Column in first_df (e.g., RSI).
486+
second_column: Column in second_df (e.g., price).
487+
window_size: Number of bars to consider for pattern.
488+
result_column: Output column name.
489+
number_of_neighbors_to_compare: For peak detection.
490+
min_consecutive: Minimum consecutive peaks required.
491+
492+
Returns:
493+
A DataFrame with a new column indicating bearish divergence.
494+
"""
495+
is_polars = isinstance(first_df, pl.DataFrame)
496+
497+
if is_polars:
498+
first_df = first_df.to_pandas()
499+
second_df = second_df.to_pandas()
500+
result_df = result_df.to_pandas()
501+
502+
# Validate columns
503+
for df, col, label in [
504+
(first_df, first_column, "first_df"),
505+
(second_df, second_column, "second_df")
506+
]:
507+
if col not in df.columns:
508+
raise PyIndicatorException(f"{col} column is missing in {label}")
509+
510+
# Determine which df has more granular datetime index
511+
first_freq = first_df.index.to_series().diff().median()
512+
second_freq = second_df.index.to_series().diff().median()
513+
514+
if first_freq < second_freq:
515+
align_index = first_df.index
516+
else:
517+
align_index = second_df.index
518+
519+
if len(result_df) != len(align_index):
520+
raise PyIndicatorException(
521+
"result_df must have the same length as the aligned index"
522+
)
523+
524+
# Reindex all DataFrames to the most granular one
525+
first_df = first_df.reindex(align_index)
526+
second_df = second_df.reindex(align_index)
527+
528+
# Peak detection
529+
first_highs_col = f"{first_column}_highs"
530+
second_highs_col = f"{second_column}_highs"
531+
532+
if first_highs_col not in first_df.columns:
533+
first_df = detect_peaks(
534+
first_df,
535+
source_column=first_column,
536+
number_of_neighbors_to_compare=number_of_neighbors_to_compare,
537+
min_consecutive=min_consecutive
538+
)
539+
540+
if second_highs_col not in second_df.columns:
541+
second_df = detect_peaks(
542+
second_df,
543+
source_column=second_column,
544+
number_of_neighbors_to_compare=number_of_neighbors_to_compare,
545+
min_consecutive=min_consecutive
546+
)
547+
548+
# Now align and merge
549+
merged_df = pd.concat([
550+
first_df[[first_highs_col]],
551+
second_df[[second_highs_col]],
552+
result_df.copy()
553+
], axis=1, join='inner')
554+
555+
# Validate enough data
556+
if len(merged_df) < window_size:
557+
raise PyIndicatorException(
558+
f"Not enough data points (need at least {window_size}, "
559+
f"got {len(merged_df)})"
560+
)
561+
562+
# Apply divergence detection
563+
indicator_highs = merged_df[first_highs_col].values
564+
price_highs = merged_df[second_highs_col].values
565+
result = [False] * len(merged_df)
566+
567+
i = window_size - 1
568+
while i < len(merged_df):
569+
win_a = indicator_highs[i - window_size + 1:i + 1]
570+
win_b = price_highs[i - window_size + 1:i + 1]
571+
572+
if check_divergence_pattern(win_a, win_b):
573+
result[i] = True
574+
i += window_size # Skip forward to avoid overlap
575+
else:
576+
i += 1
577+
578+
merged_df[result_column] = result
579+
580+
# Merge back result column to result_df using the original index
581+
result_df[result_column] = merged_df[result_column]
582+
583+
return pl.DataFrame(result_df) if is_polars else result_df

tests/indicators/test_divergence.py

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import numpy as np
12
from unittest import TestCase
2-
from pyindicators import is_divergence
3+
from pyindicators import is_divergence, bearish_divergence_multi_dataframe, \
4+
PyIndicatorException
35

46
import pandas as pd
57

@@ -100,3 +102,162 @@ def test_detect_bearish_sequence_with_wrong_params_pandas(self):
100102
"DateTime": pd.date_range("2021-01-01", periods=10, freq="D")
101103
})
102104
self.assertFalse(is_divergence(df, window_size=1, number_of_data_points=10, column_one="RSI_highs", column_two="Close_highs"))
105+
106+
107+
class TestBearishDivergenceMultiDataFrame(TestCase):
108+
109+
def test_bearish_divergence_detected(self):
110+
# Setup indicator (e.g., RSI) and price (e.g., Close) with divergence
111+
indicator = pd.DataFrame({
112+
"RSI": [50, 60, 70, 65, 60, 58, 55],
113+
}, index=pd.date_range("2022-01-01", periods=7))
114+
price = pd.DataFrame({
115+
"Close": [100, 102, 105, 108, 110, 112, 115], # Higher highs
116+
}, index=pd.date_range("2022-01-01", periods=7))
117+
118+
result = pd.DataFrame(index=indicator.index)
119+
120+
# Force peaks manually for deterministic test
121+
indicator["RSI_highs"] = [0, 0, 0, 0, -1, 0, 0] # Two indicator highs
122+
price["Close_highs"] = [0, 0, 0, 0, 1, 0, 0] # Two price highs
123+
124+
out = bearish_divergence_multi_dataframe(
125+
first_df=indicator,
126+
second_df=price,
127+
result_df=result,
128+
first_column="RSI",
129+
second_column="Close",
130+
window_size=2,
131+
result_column="bearish_divergence"
132+
)
133+
134+
self.assertIn("bearish_divergence", out.columns)
135+
self.assertTrue(any(out["bearish_divergence"]))
136+
137+
indicator = pd.DataFrame({
138+
"RSI": [50, 60, 70, 65, 60, 58, 55],
139+
}, index=pd.date_range("2022-01-01", periods=7))
140+
price = pd.DataFrame({
141+
"Close": [100, 102, 105, 108, 110, 112, 115], # Higher highs
142+
}, index=pd.date_range("2022-01-01", periods=7))
143+
144+
result = pd.DataFrame(index=indicator.index)
145+
146+
# Force peaks manually for deterministic test
147+
indicator["RSI_highs"] = [0, 0, 0, 0, 1, 0, 0] # Two indicator highs
148+
price["Close_highs"] = [0, 0, 0, 0, 1, 0, 0] # Two price highs
149+
150+
out = bearish_divergence_multi_dataframe(
151+
first_df=indicator,
152+
second_df=price,
153+
result_df=result,
154+
first_column="RSI",
155+
second_column="Close",
156+
window_size=2,
157+
result_column="bearish_divergence"
158+
)
159+
160+
self.assertIn("bearish_divergence", out.columns)
161+
self.assertFalse(any(out["bearish_divergence"]))
162+
163+
indicator = pd.DataFrame({
164+
"RSI": [50, 60, 70, 65, 60, 58, 55],
165+
}, index=pd.date_range("2022-01-01", periods=7))
166+
price = pd.DataFrame({
167+
"Close": [100, 102, 105, 108, 110, 112, 115], # Higher highs
168+
}, index=pd.date_range("2022-01-01", periods=7))
169+
170+
result = pd.DataFrame(index=indicator.index)
171+
172+
# Force peaks manually for deterministic test
173+
indicator["RSI_highs"] = [0, 0, 0, -1, 0, 0, 0] # Two indicator highs
174+
price["Close_highs"] = [0, 0, 0, 0, 0, 1, 0] # Two price highs
175+
176+
out = bearish_divergence_multi_dataframe(
177+
first_df=indicator,
178+
second_df=price,
179+
result_df=result,
180+
first_column="RSI",
181+
second_column="Close",
182+
window_size=2,
183+
result_column="bearish_divergence"
184+
)
185+
186+
self.assertIn("bearish_divergence", out.columns)
187+
self.assertFalse(any(out["bearish_divergence"]))
188+
189+
out = bearish_divergence_multi_dataframe(
190+
first_df=indicator,
191+
second_df=price,
192+
result_df=result,
193+
first_column="RSI",
194+
second_column="Close",
195+
window_size=3,
196+
result_column="bearish_divergence"
197+
)
198+
199+
self.assertIn("bearish_divergence", out.columns)
200+
self.assertTrue(any(out["bearish_divergence"]))
201+
202+
def test_missing_column_exception(self):
203+
df1 = pd.DataFrame({"RSI": [50, 60]}, index=pd.date_range("2022-01-01", periods=2))
204+
df2 = pd.DataFrame({"Close": [100, 110]}, index=pd.date_range("2022-01-01", periods=2))
205+
result = pd.DataFrame(index=df1.index)
206+
207+
with self.assertRaises(PyIndicatorException):
208+
bearish_divergence_multi_dataframe(
209+
first_df=df1.drop("RSI", axis=1),
210+
second_df=df2,
211+
result_df=result,
212+
first_column="RSI",
213+
second_column="Close"
214+
)
215+
216+
def test_not_enough_data_exception(self):
217+
df1 = pd.DataFrame({"RSI": [50]}, index=pd.date_range("2022-01-01", periods=1))
218+
df2 = pd.DataFrame({"Close": [100]}, index=pd.date_range("2022-01-01", periods=1))
219+
result = pd.DataFrame(index=df1.index)
220+
221+
# Assume detect_peaks adds _highs column
222+
df1["RSI_highs"] = [1]
223+
df2["Close_highs"] = [1]
224+
225+
with self.assertRaises(PyIndicatorException):
226+
bearish_divergence_multi_dataframe(
227+
first_df=df1,
228+
second_df=df2,
229+
result_df=result,
230+
first_column="RSI",
231+
second_column="Close",
232+
window_size=3
233+
)
234+
235+
def test_different_timeframes_align_correctly(self):
236+
daily_index = pd.date_range("2022-01-01", periods=2, freq="D")
237+
indicator_df = pd.DataFrame({
238+
"RSI": [65, 60],
239+
}, index=daily_index)
240+
241+
# 2-hour close prices — only some times will match the daily timestamps
242+
two_hour_index = pd.date_range("2022-01-01", periods=12, freq="2h")
243+
price_df = pd.DataFrame({
244+
"Close": [100, 102, 105, 108, 110, 112, 115, 117, 120, 122, 125, 130]
245+
}, index=two_hour_index)
246+
247+
result_df = pd.DataFrame(index=price_df.index)
248+
249+
# Inject fake peaks
250+
indicator_df["RSI_highs"] = [-1, 0]
251+
price_df["Close_highs"] = [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
252+
253+
result = bearish_divergence_multi_dataframe(
254+
first_df=indicator_df,
255+
second_df=price_df,
256+
result_df=result_df,
257+
first_column="RSI",
258+
second_column="Close",
259+
window_size=2,
260+
result_column="bearish_divergence"
261+
)
262+
self.assertIn("bearish_divergence", result.columns)
263+
self.assertTrue(any(result["bearish_divergence"]))

0 commit comments

Comments
 (0)