Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backward compatibility for elasticsearch<8 #33281

Merged
merged 4 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from operator import attrgetter
from time import time
from typing import TYPE_CHECKING, Any, Callable, List, Tuple
from urllib.parse import quote
from urllib.parse import quote, urlparse

# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
Expand Down Expand Up @@ -98,6 +98,15 @@ def __init__(
log_id_template: str | None = None,
):
es_kwargs = es_kwargs or {}
# For elasticsearch>8,arguments like retry_timeout have changed for elasticsearch to retry_on_timeout
# in Elasticsearch() compared to previous versions.
# Read more at: https://elasticsearch-py.readthedocs.io/en/v8.8.2/api.html#module-elasticsearch
if es_kwargs:
retry_timeout = es_kwargs.get("retry_timeout")
Copy link
Member

@pankajkoti pankajkoti Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, #33135 should be marked as a breaking change and released with a major bump. This is one argument that we have found, but there could be more such arguments and adding back compat for them will be challenging and will be discovered only when users face issues based on the params they are using.

Copy link
Member

@pankajkoti pankajkoti Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@potiuk potiuk Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it indeed HAS potential of breaking things. If we can make it "reasonably compatible" - i.e. fix back-compatibilities that we know and smooth the migration, that would be great, but I would be for marking the next ES release as MAJOR regardless cc: @eladkal

Also there is a little twist to it. Unless I am mistaken, I think elasticsearch handler integration was anyhow broken and not working before :) . So well. it could also be seen as bugfix :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too think we do a major release since the underline dependency had also changed from 7.* to 8.*

Copy link
Member

@pankajkoti pankajkoti Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. yes +1 to make backward-compatible as much as possible.

I am not aware of how far it was broken. But, we have few tests and users for whom the elasticsearch handler integration was known to be working well. However, only yesterday our tests caught that the PR broke existing working setup :)

Perhaps our QA expert @vatsrahul1001 can provide more confirmation here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#33135 did not break any airflow code.

We agreed that bumping dependency is not a breaking change and as expected every time you bump x.y.z to x+1.y.z something can be broken for users.

I am not convencied next release should be a major one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal , I think if our helm chart can't work with the new Es provider then it should be considered as breaking change.
Also, maybe we should consider bumping to major versions whenever we bump dependencies to major versions?

Copy link
Contributor

@utkarsharma2 utkarsharma2 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal I'm not entirely sure, but the below code was part of airflow's codebase which led to the issue because of upgrade -

retry_timeout: 'True'

Copy link
Member

@pankajkoti pankajkoti Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree bumping some dependency from x.y.z to x+1.y.z may not need to be a breaking change in general. 👍🏽

However, would like to bring up and discuss the following point.
For this provider, elasticsearch is the main underlying dependency, and we're updating it to the next major version from 7.x to 8.x. It does not affect core Airflow code/functionality, but since providers are versioned and released independently, upgrading to this provider release might affect existing deployments for users.

Copy link
Contributor

@eladkal eladkal Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ephraimbuddy I didn't see helm tests were broken?
and to my understanding this PR brings back support for elastic search 7 so what is the motivation for the major release? Once this PR is merged the code is backward compatible.

Also, maybe we should consider bumping to major versions whenever we bump dependencies to major versions?

This was discussed and had lazy consequence if I remember correctly. Need to lookup the thread.

if retry_timeout:
es_kwargs["retry_on_timeout"] = retry_timeout
del es_kwargs["retry_timeout"]
host = self.format_url(host)
super().__init__(base_log_folder, filename_template)
self.closed = False

Expand Down Expand Up @@ -126,6 +135,27 @@ def __init__(
self._doc_type_map: dict[Any, Any] = {}
self._doc_type: list[Any] = []

@staticmethod
def format_url(host: str) -> str:
"""
Formats the given host string to ensure it starts with 'http' or 'https'.
Checks if the host string represents a valid URL.

:params host: The host string to format and check.
"""
parsed_url = urlparse(host)

# Check if the scheme is either http or https
if not parsed_url.scheme:
host = "http://" + host
parsed_url = urlparse(host)

# Basic validation for a valid URL
if not parsed_url.netloc:
raise ValueError(f"'{host}' is not a valid URL.")

return host

def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
with create_session() as session:
dag_run = ti.get_dagrun(session=session)
Expand Down
2 changes: 1 addition & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ config:
elasticsearch_configs:
max_retries: 3
timeout: 30
retry_timeout: 'True'
retry_on_timeout: 'True'
kerberos:
keytab: '{{ .Values.kerberos.keytabPath }}'
reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
Expand Down
63 changes: 63 additions & 0 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,69 @@ def concat_logs(lines):
"on 2023-07-09 07:47:32+00:00"
)

@pytest.mark.parametrize(
"host, expected",
[
("http://localhost:9200", "http://localhost:9200"),
("https://localhost:9200", "https://localhost:9200"),
("localhost:9200", "http://localhost:9200"),
("someurl", "http://someurl"),
("https://", "ValueError"),
],
)
def test_format_url(self, host, expected):
"""
Test the format_url method of the ElasticsearchTaskHandler class.
"""
if expected == "ValueError":
with pytest.raises(ValueError):
assert ElasticsearchTaskHandler.format_url(host) == expected
else:
assert ElasticsearchTaskHandler.format_url(host) == expected

def test_elasticsearch_constructor_retry_timeout_handling(self):
"""
Test if the ElasticsearchTaskHandler constructor properly handles the retry_timeout argument.
"""
# Mock the Elasticsearch client
with mock.patch(
"airflow.providers.elasticsearch.log.es_task_handler.elasticsearch.Elasticsearch"
) as mock_es:
# Test when 'retry_timeout' is present in es_kwargs
es_kwargs = {"retry_timeout": 10}
ElasticsearchTaskHandler(
base_log_folder="dummy_folder",
end_of_log_mark="end_of_log_mark",
write_stdout=False,
json_format=False,
json_fields="fields",
host_field="host",
offset_field="offset",
es_kwargs=es_kwargs,
)

# Check the arguments with which the Elasticsearch client is instantiated
mock_es.assert_called_once_with("http://localhost:9200", retry_on_timeout=10)

# Reset the mock for the next test
mock_es.reset_mock()

# Test when 'retry_timeout' is not present in es_kwargs
es_kwargs = {}
ElasticsearchTaskHandler(
base_log_folder="dummy_folder",
end_of_log_mark="end_of_log_mark",
write_stdout=False,
json_format=False,
json_fields="fields",
host_field="host",
offset_field="offset",
es_kwargs=es_kwargs,
)

# Check that the Elasticsearch client is instantiated without the 'retry_on_timeout' argument
mock_es.assert_called_once_with("http://localhost:9200")

def test_client(self):
assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch)
assert self.es_task_handler.index_patterns == "_all"
Expand Down