Skip to content

Commit

Permalink
Merge pull request #3328 from seleniumbase/cdp-mode-patch-17
Browse files Browse the repository at this point in the history
CDP Mode - Patch 17
  • Loading branch information
mdmintz authored Dec 10, 2024
2 parents d200f49 + 0cf3609 commit a76dab4
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 126 deletions.
4 changes: 1 addition & 3 deletions examples/cdp_mode/raw_albertsons.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
info_selector = 'span[data-test-id*="recipe-thumb-title"]'
items = sb.cdp.find_elements("%s %s" % (item_selector, info_selector))
for item in items:
sb.sleep(0.03)
item.scroll_into_view()
sb.sleep(0.025)
sb.sleep(0.06)
if required_text in item.text:
item.flash(color="44CC88")
sb.sleep(0.025)
Expand Down
16 changes: 16 additions & 0 deletions examples/cdp_mode/raw_cf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Using CDP Mode with PyAutoGUI to bypass CAPTCHAs."""
from seleniumbase import SB

with SB(uc=True, test=True, locale_code="en") as sb:
url = "https://www.cloudflare.com/login"
sb.activate_cdp_mode(url)
sb.sleep(2)
sb.uc_gui_handle_captcha() # PyAutoGUI press Tab and Spacebar
sb.sleep(2)

with SB(uc=True, test=True, locale_code="en") as sb:
url = "https://www.cloudflare.com/login"
sb.activate_cdp_mode(url)
sb.sleep(2)
sb.uc_gui_click_captcha() # PyAutoGUI click. (Linux needs it)
sb.sleep(2)
2 changes: 1 addition & 1 deletion examples/cdp_mode/raw_nordstrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
sb.sleep(2.2)
for i in range(16):
sb.cdp.scroll_down(16)
sb.sleep(0.16)
sb.sleep(0.14)
print('*** Nordstrom Search for "%s":' % search)
unique_item_text = []
items = sb.cdp.find_elements("article")
Expand Down
19 changes: 19 additions & 0 deletions examples/cdp_mode/raw_pixelscan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from seleniumbase import SB

with SB(uc=True, incognito=True, test=True) as sb:
sb.activate_cdp_mode("https://pixelscan.net/")
sb.sleep(3)
sb.remove_elements("div.banner") # Remove the banner
sb.remove_elements("jdiv") # Remove chat widgets
sb.cdp.scroll_down(15)
not_masking_text = "You are not masking your fingerprint"
sb.assert_text(not_masking_text, "pxlscn-fingerprint-masking")
no_automation_detected = "No automation framework detected"
sb.assert_text(no_automation_detected, "pxlscn-bot-detection")
sb.highlight("span.text-success", loops=8)
sb.sleep(1)
fingerprint_masking_div = "pxlscn-fingerprint-masking div"
sb.highlight(fingerprint_masking_div, loops=9)
sb.sleep(1)
sb.highlight(".bot-detection-context", loops=10)
sb.sleep(2)
44 changes: 44 additions & 0 deletions examples/cdp_mode/raw_res_nike.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Using CDP.network.RequestWillBeSent and CDP.network.ResponseReceived."""
import colorama
import mycdp
import sys
from seleniumbase import SB

c1 = colorama.Fore.BLUE + colorama.Back.LIGHTYELLOW_EX
c2 = colorama.Fore.BLUE + colorama.Back.LIGHTGREEN_EX
cr = colorama.Style.RESET_ALL
if "linux" in sys.platform:
c1 = c2 = cr = ""


async def send_handler(event: mycdp.network.RequestWillBeSent):
r = event.request
s = f"{r.method} {r.url}"
for k, v in r.headers.items():
s += f"\n\t{k} : {v}"
print(c1 + "*** ==> RequestWillBeSent <== ***" + cr)
print(s)


async def receive_handler(event: mycdp.network.ResponseReceived):
print(c2 + "*** ==> ResponseReceived <== ***" + cr)
print(event.response)


with SB(uc=True, test=True, locale_code="en", ad_block=True) as sb:
url = "https://www.nike.com/"
sb.activate_cdp_mode(url)
sb.cdp.add_handler(mycdp.network.RequestWillBeSent, send_handler)
sb.cdp.add_handler(mycdp.network.ResponseReceived, receive_handler)
sb.sleep(2.5)
sb.cdp.gui_click_element('div[data-testid="user-tools-container"]')
sb.sleep(1.5)
search = "Nike Air Force 1"
sb.cdp.press_keys('input[type="search"]', search)
sb.sleep(4)
elements = sb.cdp.select_all('ul[data-testid*="products"] figure .details')
if elements:
print('**** Found results for "%s": ****' % search)
for element in elements:
print("* " + element.text)
sb.sleep(2)
2 changes: 1 addition & 1 deletion examples/cdp_mode/raw_tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
print(sb.cdp.get_text('h2[data-e2e="user-bio"]'))
for i in range(55):
sb.cdp.scroll_down(12)
sb.sleep(0.06)
sb.sleep(0.05)
sb.sleep(1)
2 changes: 1 addition & 1 deletion examples/raw_pixelscan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from seleniumbase import SB

with SB(uc=True, incognito=True, test=True) as sb:
sb.driver.uc_open_with_reconnect("https://pixelscan.net/", 10)
sb.driver.uc_open_with_reconnect("https://pixelscan.net/", 7)
sb.remove_elements("div.banner") # Remove the banner
sb.remove_elements("jdiv") # Remove chat widgets
no_automation_detected = "No automation framework detected"
Expand Down
2 changes: 1 addition & 1 deletion seleniumbase/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# seleniumbase package
__version__ = "4.33.6"
__version__ = "4.33.7"
74 changes: 47 additions & 27 deletions seleniumbase/core/browser_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None):
ctype = "cf_t"
else:
return
if not driver.is_connected():
if not driver.is_connected() and not __is_cdp_swap_needed(driver):
driver.connect()
time.sleep(2)
install_pyautogui_if_missing(driver)
Expand All @@ -1425,15 +1425,18 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None):
)
with gui_lock: # Prevent issues with multiple processes
needs_switch = False
is_in_frame = js_utils.is_in_frame(driver)
if not __is_cdp_swap_needed(driver):
is_in_frame = js_utils.is_in_frame(driver)
else:
is_in_frame = False
selector = "#challenge-stage"
if ctype == "g_rc":
selector = "#recaptcha-token"
if is_in_frame and driver.is_element_present(selector):
driver.switch_to.parent_frame()
needs_switch = True
is_in_frame = js_utils.is_in_frame(driver)
if not is_in_frame:
if not is_in_frame and not __is_cdp_swap_needed(driver):
# Make sure the window is on top
page_actions.switch_to_window(
driver, driver.current_window_handle, 2, uc_lock=False
Expand Down Expand Up @@ -1500,17 +1503,18 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None):
and frame == "iframe"
):
frame = 'iframe[title="reCAPTCHA"]'
if not is_in_frame or needs_switch:
# Currently not in frame (or nested frame outside CF one)
try:
if visible_iframe or ctype == "g_rc":
driver.switch_to_frame(frame)
except Exception:
if visible_iframe or ctype == "g_rc":
if driver.is_element_present("iframe"):
driver.switch_to_frame("iframe")
else:
return
if not __is_cdp_swap_needed(driver):
if not is_in_frame or needs_switch:
# Currently not in frame (or nested frame outside CF one)
try:
if visible_iframe or ctype == "g_rc":
driver.switch_to_frame(frame)
except Exception:
if visible_iframe or ctype == "g_rc":
if driver.is_element_present("iframe"):
driver.switch_to_frame("iframe")
else:
return
try:
selector = "div.cf-turnstile"
if ctype == "g_rc":
Expand All @@ -1526,11 +1530,11 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None):
tab_count += 1
time.sleep(0.027)
active_element_css = js_utils.get_active_element_css(driver)
print(active_element_css)
if (
active_element_css.startswith(selector)
or active_element_css.endswith(" > div" * 2)
or (special_form and active_element_css.endswith(" div"))
or (ctype == "g_rc" and "frame[name" in active_element_css)
):
found_checkbox = True
sb_config._saved_cf_tab_count = tab_count
Expand All @@ -1550,6 +1554,7 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None):
)
and hasattr(sb_config, "_saved_cf_tab_count")
and sb_config._saved_cf_tab_count
and not __is_cdp_swap_needed(driver)
):
driver.uc_open_with_disconnect(driver.current_url, 3.8)
with suppress(Exception):
Expand Down Expand Up @@ -1764,17 +1769,27 @@ def _add_chrome_proxy_extension(
):
# Single-threaded
if zip_it:
proxy_helper.create_proxy_ext(
proxy_string, proxy_user, proxy_pass, bypass_list
)
proxy_zip = proxy_helper.PROXY_ZIP_PATH
chrome_options.add_extension(proxy_zip)
proxy_zip_lock = fasteners.InterProcessLock(PROXY_ZIP_LOCK)
with proxy_zip_lock:
proxy_helper.create_proxy_ext(
proxy_string, proxy_user, proxy_pass, bypass_list
)
proxy_zip = proxy_helper.PROXY_ZIP_PATH
chrome_options.add_extension(proxy_zip)
else:
proxy_helper.create_proxy_ext(
proxy_string, proxy_user, proxy_pass, bypass_list, zip_it=False
)
proxy_dir_path = proxy_helper.PROXY_DIR_PATH
chrome_options = add_chrome_ext_dir(chrome_options, proxy_dir_path)
proxy_dir_lock = fasteners.InterProcessLock(PROXY_DIR_LOCK)
with proxy_dir_lock:
proxy_helper.create_proxy_ext(
proxy_string,
proxy_user,
proxy_pass,
bypass_list,
zip_it=False,
)
proxy_dir_path = proxy_helper.PROXY_DIR_PATH
chrome_options = add_chrome_ext_dir(
chrome_options, proxy_dir_path
)
else:
# Multi-threaded
if zip_it:
Expand Down Expand Up @@ -1803,7 +1818,7 @@ def _add_chrome_proxy_extension(
proxy_user,
proxy_pass,
bypass_list,
False,
zip_it=False,
)
chrome_options = add_chrome_ext_dir(
chrome_options, proxy_helper.PROXY_DIR_PATH
Expand Down Expand Up @@ -4845,7 +4860,12 @@ def get_local_driver(
)
uc_activated = True
except URLError as e:
if cert in e.args[0] and IS_MAC:
if (
IS_MAC
and hasattr(e, "args")
and isinstance(e.args, (list, tuple))
and cert in e.args[0]
):
mac_certificate_error = True
else:
raise
Expand Down
14 changes: 10 additions & 4 deletions seleniumbase/core/log_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from seleniumbase import config as sb_config
from seleniumbase.config import settings
from seleniumbase.fixtures import constants
from seleniumbase.fixtures import shared_utils

python3_11_or_newer = False
if sys.version_info >= (3, 11):
Expand All @@ -33,6 +34,8 @@ def log_screenshot(test_logpath, driver, screenshot=None, get=False):
if screenshot != screenshot_warning:
with open(screenshot_path, "wb") as file:
file.write(screenshot)
with suppress(Exception):
shared_utils.make_writable(screenshot_path)
else:
print("WARNING: %s" % screenshot_warning)
if get:
Expand Down Expand Up @@ -282,13 +285,14 @@ def log_test_failure_data(test, test_logpath, driver, browser, url=None):
sb_config._report_time = the_time
sb_config._report_traceback = traceback_message
sb_config._report_exception = exc_message
with suppress(Exception):
if not os.path.exists(test_logpath):
if not os.path.exists(test_logpath):
with suppress(Exception):
os.makedirs(test_logpath)
with suppress(Exception):
log_file = codecs.open(basic_file_path, "w+", encoding="utf-8")
log_file.writelines("\r\n".join(data_to_save))
log_file.close()
shared_utils.make_writable(basic_file_path)


def log_skipped_test_data(test, test_logpath, driver, browser, reason):
Expand Down Expand Up @@ -343,6 +347,7 @@ def log_skipped_test_data(test, test_logpath, driver, browser, reason):
log_file = codecs.open(file_path, "w+", encoding="utf-8")
log_file.writelines("\r\n".join(data_to_save))
log_file.close()
shared_utils.make_writable(file_path)


def log_page_source(test_logpath, driver, source=None):
Expand All @@ -365,14 +370,15 @@ def log_page_source(test_logpath, driver, source=None):
"unresponsive, or closed prematurely!</h4>"
)
)
with suppress(Exception):
if not os.path.exists(test_logpath):
if not os.path.exists(test_logpath):
with suppress(Exception):
os.makedirs(test_logpath)
html_file_path = os.path.join(test_logpath, html_file_name)
with suppress(Exception):
html_file = codecs.open(html_file_path, "w+", encoding="utf-8")
html_file.write(page_source)
html_file.close()
shared_utils.make_writable(html_file_path)


def get_test_id(test):
Expand Down
Loading

0 comments on commit a76dab4

Please sign in to comment.