diff --git a/scrapegraphai/nodes/fetch_node.py b/scrapegraphai/nodes/fetch_node.py index 2637a70a..88b73b63 100644 --- a/scrapegraphai/nodes/fetch_node.py +++ b/scrapegraphai/nodes/fetch_node.py @@ -4,6 +4,7 @@ import json from typing import List, Optional +import concurrent.futures import requests from langchain_community.document_loaders import PyPDFLoader @@ -68,6 +69,10 @@ def __init__( else node_config.get("openai_md_enabled", False) ) + # Timeout in seconds for blocking operations (HTTP requests, PDF parsing, etc.). + # If set to None, no timeout will be applied. + self.timeout = None if node_config is None else node_config.get("timeout", 30) + self.cut = False if node_config is None else node_config.get("cut", True) self.browser_base = ( @@ -174,7 +179,19 @@ def load_file_content(self, source, input_type): if input_type == "pdf": loader = PyPDFLoader(source) - return loader.load() + # PyPDFLoader.load() can be blocking for large PDFs. Run it in a thread and + # enforce the configured timeout if provided. + if self.timeout is None: + return loader.load() + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(loader.load) + try: + return future.result(timeout=self.timeout) + except concurrent.futures.TimeoutError: + raise TimeoutError( + f"PDF parsing exceeded timeout of {self.timeout} seconds" + ) elif input_type == "csv": try: import pandas as pd @@ -260,7 +277,12 @@ def handle_web_source(self, state, source): self.logger.info(f"--- (Fetching HTML from: {source}) ---") if self.use_soup: - response = requests.get(source) + # Apply configured timeout to blocking HTTP requests. If timeout is None, + # don't pass the timeout argument (requests will block until completion). + if self.timeout is None: + response = requests.get(source) + else: + response = requests.get(source, timeout=self.timeout) if response.status_code == 200: if not response.text.strip(): raise ValueError("No HTML body content found in the response.") @@ -286,6 +308,11 @@ def handle_web_source(self, state, source): if self.node_config: loader_kwargs = self.node_config.get("loader_kwargs", {}) + # If a global timeout is configured on the node and no loader-specific timeout + # was provided, propagate it to ChromiumLoader so it can apply the same limit. + if "timeout" not in loader_kwargs and self.timeout is not None: + loader_kwargs["timeout"] = self.timeout + if self.browser_base: try: from ..docloaders.browser_base import browser_base_fetch diff --git a/tests/test_fetch_node_timeout.py b/tests/test_fetch_node_timeout.py new file mode 100644 index 00000000..98fa195a --- /dev/null +++ b/tests/test_fetch_node_timeout.py @@ -0,0 +1,241 @@ +""" +Unit tests for FetchNode timeout functionality. + +These tests verify that: +1. The timeout configuration is properly read and stored +2. HTTP requests use the configured timeout +3. PDF parsing respects the timeout +4. Timeout is propagated to ChromiumLoader via loader_kwargs +""" +import sys +import time +import unittest +from unittest.mock import Mock, patch, MagicMock +from pathlib import Path + +# Add the project root to path to import modules +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +class TestFetchNodeTimeout(unittest.TestCase): + """Test suite for FetchNode timeout configuration and usage.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock all the heavy external dependencies at import time + self.mock_modules = {} + for module in ['langchain_core', 'langchain_core.documents', + 'langchain_community', 'langchain_community.document_loaders', + 'langchain_openai', 'minify_html', 'pydantic', + 'langchain', 'langchain.prompts']: + if module not in sys.modules: + sys.modules[module] = MagicMock() + + # Create mock Document class + class MockDocument: + def __init__(self, page_content, metadata=None): + self.page_content = page_content + self.metadata = metadata or {} + + sys.modules['langchain_core.documents'].Document = MockDocument + + # Create mock PyPDFLoader + class MockPyPDFLoader: + def __init__(self, source): + self.source = source + + def load(self): + time.sleep(0.1) # Simulate some work + return [MockDocument(page_content=f"PDF content from {self.source}")] + + sys.modules['langchain_community.document_loaders'].PyPDFLoader = MockPyPDFLoader + + # Now import FetchNode + from scrapegraphai.nodes.fetch_node import FetchNode + self.FetchNode = FetchNode + + def tearDown(self): + """Clean up after tests.""" + # Remove mocked modules + for module in list(sys.modules.keys()): + if 'langchain' in module or module in ['minify_html', 'pydantic']: + if module in self.mock_modules or module.startswith('langchain'): + sys.modules.pop(module, None) + + def test_timeout_default_value(self): + """Test that default timeout is set to 30 seconds.""" + node = self.FetchNode( + input="url", + output=["doc"], + node_config={} + ) + self.assertEqual(node.timeout, 30) + + def test_timeout_custom_value(self): + """Test that custom timeout value is properly stored.""" + node = self.FetchNode( + input="url", + output=["doc"], + node_config={"timeout": 10} + ) + self.assertEqual(node.timeout, 10) + + def test_timeout_none_value(self): + """Test that timeout can be disabled by setting to None.""" + node = self.FetchNode( + input="url", + output=["doc"], + node_config={"timeout": None} + ) + self.assertIsNone(node.timeout) + + def test_timeout_no_config(self): + """Test that timeout defaults to 30 when no node_config provided.""" + node = self.FetchNode( + input="url", + output=["doc"], + node_config=None + ) + self.assertEqual(node.timeout, 30) + + @patch('scrapegraphai.nodes.fetch_node.requests') + def test_requests_get_with_timeout(self, mock_requests): + """Test that requests.get is called with timeout when use_soup=True.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.text = "Test content" + mock_requests.get.return_value = mock_response + + node = self.FetchNode( + input="url", + output=["doc"], + node_config={"use_soup": True, "timeout": 15} + ) + + # Execute with a URL + state = {"url": "https://example.com"} + node.execute(state) + + # Verify requests.get was called with timeout + mock_requests.get.assert_called_once() + call_args = mock_requests.get.call_args + self.assertEqual(call_args[1].get('timeout'), 15) + + @patch('scrapegraphai.nodes.fetch_node.requests') + def test_requests_get_without_timeout_when_none(self, mock_requests): + """Test that requests.get is called without timeout argument when timeout=None.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.text = "Test content" + mock_requests.get.return_value = mock_response + + node = self.FetchNode( + input="url", + output=["doc"], + node_config={"use_soup": True, "timeout": None} + ) + + # Execute with a URL + state = {"url": "https://example.com"} + node.execute(state) + + # Verify requests.get was called without timeout + mock_requests.get.assert_called_once() + call_args = mock_requests.get.call_args + self.assertNotIn('timeout', call_args[1]) + + def test_pdf_parsing_with_timeout(self): + """Test that PDF parsing completes within timeout.""" + node = self.FetchNode( + input="pdf", + output=["doc"], + node_config={"timeout": 5} + ) + + # Execute with a PDF file + state = {"pdf": "test.pdf"} + result = node.execute(state) + + # Should complete successfully + self.assertIn("doc", result) + self.assertIsNotNone(result["doc"]) + + def test_pdf_parsing_timeout_exceeded(self): + """Test that PDF parsing raises TimeoutError when timeout is exceeded.""" + # Create a mock loader that takes longer than timeout + class SlowPyPDFLoader: + def __init__(self, source): + self.source = source + + def load(self): + time.sleep(2) # Sleep longer than timeout + return [] + + with patch('scrapegraphai.nodes.fetch_node.PyPDFLoader', SlowPyPDFLoader): + node = self.FetchNode( + input="pdf", + output=["doc"], + node_config={"timeout": 0.5} # Very short timeout + ) + + # Execute should raise TimeoutError + state = {"pdf": "slow.pdf"} + with self.assertRaises(TimeoutError) as context: + node.execute(state) + + self.assertIn("PDF parsing exceeded timeout", str(context.exception)) + + @patch('scrapegraphai.nodes.fetch_node.ChromiumLoader') + def test_timeout_propagated_to_chromium_loader(self, mock_loader_class): + """Test that timeout is propagated to ChromiumLoader via loader_kwargs.""" + mock_loader = Mock() + mock_doc = Mock() + mock_doc.page_content = "Test" + mock_loader.load.return_value = [mock_doc] + mock_loader_class.return_value = mock_loader + + node = self.FetchNode( + input="url", + output=["doc"], + node_config={"timeout": 20, "headless": True} + ) + + # Execute with a URL (not using soup, so ChromiumLoader is used) + state = {"url": "https://example.com"} + node.execute(state) + + # Verify ChromiumLoader was instantiated with timeout in kwargs + mock_loader_class.assert_called_once() + call_kwargs = mock_loader_class.call_args[1] + self.assertEqual(call_kwargs.get('timeout'), 20) + + @patch('scrapegraphai.nodes.fetch_node.ChromiumLoader') + def test_timeout_not_overridden_in_loader_kwargs(self, mock_loader_class): + """Test that existing timeout in loader_kwargs is not overridden.""" + mock_loader = Mock() + mock_doc = Mock() + mock_doc.page_content = "Test" + mock_loader.load.return_value = [mock_doc] + mock_loader_class.return_value = mock_loader + + node = self.FetchNode( + input="url", + output=["doc"], + node_config={ + "timeout": 20, + "loader_kwargs": {"timeout": 50} # Explicit loader timeout + } + ) + + # Execute with a URL + state = {"url": "https://example.com"} + node.execute(state) + + # Verify ChromiumLoader got the loader_kwargs timeout, not node timeout + mock_loader_class.assert_called_once() + call_kwargs = mock_loader_class.call_args[1] + self.assertEqual(call_kwargs.get('timeout'), 50) + + +if __name__ == '__main__': + unittest.main()