Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/mode): add connection timeouts to avoid RemoteDisconnected errors #11245

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import yaml
from liquid import Template, Undefined
from pydantic import Field, validator
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
Expand Down Expand Up @@ -127,6 +129,10 @@ class ModeAPIConfig(ConfigModel):
max_attempts: int = Field(
default=5, description="Maximum number of attempts to retry before failing"
)
timeout: int = Field(
default=40,
description="Timout setting, how long to wait for the Mode rest api to send data before giving up",
)


class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase):
Expand Down Expand Up @@ -299,7 +305,15 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
self.report = ModeSourceReport()
self.ctx = ctx

self.session = requests.session()
self.session = requests.Session()
# Handling retry and backoff
retries = 3
backoff_factor = 10
retry = Retry(total=retries, backoff_factor=backoff_factor)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)

self.session.auth = HTTPBasicAuth(
self.config.token,
self.config.password.get_secret_value(),
Expand Down Expand Up @@ -1469,15 +1483,16 @@ def _get_request_json(self, url: str) -> Dict:
multiplier=self.config.api_options.retry_backoff_multiplier,
max=self.config.api_options.max_retry_interval,
),
retry=retry_if_exception_type(HTTPError429),
retry=retry_if_exception_type((HTTPError429, ConnectionError)),
stop=stop_after_attempt(self.config.api_options.max_attempts),
)

@r.wraps
def get_request():
try:
response = self.session.get(url)
response.raise_for_status()
response = self.session.get(
url, timeout=self.config.api_options.timeout
)
return response.json()
except HTTPError as http_error:
error_response = http_error.response
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def __init__(self, error_list, status_code):
def json(self):
return self.json_data

def get(self, url):
def mount(self, prefix, adaptor):
return self

def get(self, url, timeout=40):
self.url = url
self.timeout = timeout
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
with open(response_json_path) as file:
data = json.loads(file.read())
Expand Down Expand Up @@ -74,7 +78,7 @@ def mocked_requests_failure(*args, **kwargs):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_sucess,
):
pipeline = Pipeline.create(
Expand Down Expand Up @@ -111,7 +115,7 @@ def test_mode_ingest_success(pytestconfig, tmp_path):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_failure,
):
global test_resources_dir
Expand Down
Loading