Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reader): Add ClickHouse HTTPReader #819

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 99 additions & 13 deletions snuba/clickhouse/http.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import json
import re
from typing import Callable, Iterable, Mapping, MutableMapping, Optional
from urllib.parse import urlencode
from typing import Callable, Iterable

from dateutil.parser import parse as dateutil_parse
from urllib3.connectionpool import HTTPConnectionPool
from urllib3.exceptions import HTTPError
from urllib3.response import HTTPResponse

from snuba.clickhouse.errors import ClickhouseError
from snuba.clickhouse.native import transform_date, transform_datetime
from snuba.clickhouse.query import ClickhouseQuery
from snuba.datasets.schemas.tables import TableSchema
from snuba.reader import Reader, Result, build_result_transformer
from snuba.writer import BatchWriter, WriterTableRow


Expand All @@ -16,6 +22,21 @@
)


def raise_for_error_response(response: HTTPResponse) -> None:
if response.status != 200:
# XXX: This should be switched to just parse the JSON body after
# https://github.com/yandex/ClickHouse/issues/6272 is available.
content = response.data.decode("utf8")
details = CLICKHOUSE_ERROR_RE.match(content)
if details is not None:
code, type, message = details.groups()
fpacifici marked this conversation as resolved.
Show resolved Hide resolved
raise ClickhouseError(int(code), message)
else:
raise HTTPError(
f"Received unexpected {response.status} response: {content}"
)


class HTTPBatchWriter(BatchWriter):
def __init__(
self,
Expand Down Expand Up @@ -73,15 +94,80 @@ def write(self, rows: Iterable[WriterTableRow]):
chunked=True,
)

if response.status != 200:
# XXX: This should be switched to just parse the JSON body after
# https://github.com/yandex/ClickHouse/issues/6272 is available.
content = response.data.decode("utf8")
details = CLICKHOUSE_ERROR_RE.match(content)
if details is not None:
code, type, message = details.groups()
raise ClickhouseError(int(code), message)
else:
raise HTTPError(
f"Received unexpected {response.status} response: {content}"
)
raise_for_error_response(response)


def parse_and_transform_date(value: str) -> str:
return transform_date(dateutil_parse(value))


def parse_and_transform_datetime(value: str) -> str:
return transform_datetime(dateutil_parse(value))


transform_column_types = build_result_transformer(
[
(re.compile(r"^Date(\(.+\))?$"), parse_and_transform_date),
(re.compile(r"^DateTime(\(.+\))?$"), parse_and_transform_datetime),
]
)


class HTTPReader(Reader[ClickhouseQuery]):
def __init__(
self, host: str, port: int, settings: Optional[Mapping[str, str]] = None
):
fpacifici marked this conversation as resolved.
Show resolved Hide resolved
if settings is not None:
assert "query_id" not in settings, "query_id cannot be passed as a setting"

Comment on lines +120 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new condition we did not have on native?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of.

The settings passed here are supposed to be closer to "session settings" (client_settings for the ClickhousePool constructor) than they are to "query settings", but we don't use ClickHouse HTTP sessions here due to not wanting to deal with session timeouts, etc. I needed to be some way to allow constant (session-like) settings here to support things like output_format_json_quote_64bit_integers without leaking that into raw_query. This is more of a manifestation of query_id being passed as a setting, rather than some other way (e.g. a header.) I don't know how the native driver here would deal with query_id, such as whether or not it considers that a valid setting, or if that's an implementation detail of the way settings are implemented in the HTTP interface. (I don't know what it would do if query_id was provided in both the settings mapping as well as the query_id keyword argument to execute.)

self.__pool = HTTPConnectionPool(host, port)
fpacifici marked this conversation as resolved.
Show resolved Hide resolved

self.__default_settings: MutableMapping[str, str] = (
{**settings} if settings is not None else {}
fpacifici marked this conversation as resolved.
Show resolved Hide resolved
)

if "output_format_json_quote_64bit_integers" not in self.__default_settings:
self.__default_settings["output_format_json_quote_64bit_integers"] = "0"

def execute(
self,
query: ClickhouseQuery,
settings: Optional[Mapping[str, str]] = None,
query_id: Optional[str] = None,
with_totals: bool = False, # NOTE: unnecessary with FORMAT JSON
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment I guess is a bit misleading. FORMAT JSON doesn't implicitly add WITH TOTALS or anything, but it does already split out the totals row from the data rows so that the caller doesn't need to. When the AST is available everywhere, it'd be better to remove with_totals from this function signature and just use the query attribute to decide whether or not the totals row should exist or not for the native reader.

) -> Result:
query_settings: MutableMapping[str, str] = (
{**settings} if settings is not None else {}
)

# XXX: mypy won't allow redefining ``settings`` as mutable, so delete
# the original variable to avoid accidentally referencing ``settings``
# instead of ``query_settings``.
del settings
Comment on lines +139 to +146
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of my least favorite things about mypy.


assert (
"query_id" not in query_settings
), "query_id cannot be passed as a setting"

if query_id is not None:
query_settings["query_id"] = query_id

response = self.__pool.urlopen(
"POST",
"/?" + urlencode({**self.__default_settings, **query_settings}),
headers={"Connection": "keep-alive", "Accept-Encoding": "gzip,deflate"},
body=query.format_sql("JSON"),
)
Comment on lines +155 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be useful to set up retries for network connection errors. Also what's the default timeout we would have here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be useful to set up retries for network connection errors.

Makes sense, will look into that.

Also what's the default timeout we would have here?

Not sure. Will investigate tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context on how we handled the retries and timeout in the queries from sentry to snuba with the reasoning around retries if it helps
getsentry/sentry@6d54866

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be useful to set up retries for network connection errors.

I started writing a comment about how on second thought, I didn't think we should just blindly retry on errors without a reason to, but then I checked the distinction between execute and execute_robust in ClickhousePool and I guess we retry there as well.

Also what's the default timeout we would have here?

This basically falls through to to socket.getdefaulttimeout:

(snuba) ted@veneno % docker run --rm -it getsentry/snuba python -c 'import socket; print(socket.getdefaulttimeout())'
+ '[' python = bash ']'
+ '[' p = - ']'
+ '[' python = api ']'
+ snuba python --help
+ exec gosu snuba python -c 'import socket; print(socket.getdefaulttimeout())'
None

I guess this should be able to be parameterized (the writer also has this issue.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the retries, the difference between execute_robust and using the default Retry object of urllib3 is that execute_robust also waits by default 1 second (besides being more flexible on the type of errors).
I would say we should always retry (a given amount of times) for errors when establishing the connection (NetworkError, SocketTimeout, etc.) That covers the transient network issues that we run into. In these case you will get the Exception from urllib.
Whether to retry on ServerError that is debatable. Here (this is only a read path so idempotent) it is not dangerous but it puts useless load to the DB if we do that on errors that cannot be transient. I am not sure we have a clean way to make this distinction on clickhouse ServerErrors, unless the HTTP reader clickhosue used HTTP 5xx for actual server errors and HTTP 4xx for query errors. If it does not we would have to selectively blacklist/whitelist error codes I guess.
I'd be ok being conservative and not retry at all for these cases. (Sentry retries them by the way).
I would certainly not retry on ReadTimeout errors.
How does it sound as a strategy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the Retry object from urllib3 is already capable of making the distinction between network errors, read errors and redirect errors so you do not need to implement the logic in your code to do network connection retries:
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the retries, the difference between execute_robust and using the default Retry object of urllib3 is that execute_robust also waits by default 1 second (besides being more flexible on the type of errors).

Yeah, my original comment was unclear — I meant that i was surprised that execute also had some number of baked in retries. (That doesn't really change anything about your comment, though.) It makes some more sense why after this conversation.

Whether to retry on ServerError that is debatable.

In my opinion, we should try and avoid implicit retries of application operations as much as possible, and allow (well, not so much allow as require) the caller to retry when necessary. I don't think a one-size-fits-all solution in the reader (or writer, or connection pool, etc.) is a good idea.

Like you mentioned, I do think it makes sense to always retry on transport level errors (connection timeouts.) I don't think of this so much as the responsibility of the Reader as the transport itself (specifically the connection pool, native or HTTP) so it probably makes sense to just pass the instantiated pool to the constructor, rather than constructing it in the reader itself.

This also is similar to some of the conversations around #844 around connection sharing. It'd be unnecessarily wasteful to create a new connection pool for every reader if they are all communicating with the same cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be totally ok if we decided to retry only on transport errors. I am not a big fan of baking application specific retries in the Reader (or lower) layer either.


raise_for_error_response(response)

result = json.loads(response.data.decode("utf-8"))

# Remove any extra keys that are not part of the Result data structure.
for k in [*result.keys()]:
if k not in {"meta", "data", "totals"}:
del result[k]

transform_column_types(result)

return result
60 changes: 60 additions & 0 deletions tests/clickhouse/test_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Sequence, Tuple

import pytest

from snuba import settings
from snuba.clickhouse.errors import ClickhouseError
from snuba.clickhouse.http import HTTPReader
from snuba.clickhouse.native import NativeDriverReader
from snuba.clickhouse.query import ClickhouseQuery
from snuba.environment import clickhouse_ro
from snuba.reader import Reader


class SimpleClickhouseQuery(ClickhouseQuery):
def __init__(self, columns: Sequence[Tuple[str, str]]) -> None:
self.__columns = columns

def _format_query_impl(self) -> str:
columns = ", ".join(f"{value} as {alias}" for alias, value in self.__columns)
return f"SELECT {columns}"
Comment on lines +14 to +20
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to use the AST for this but that would have meant either

  1. relying on the other steps of query processing and executing the query against an actual dataset, coupling this test to a lot of unnecessary things
  2. refactoring the AstClickhouseQuery constructor so that it would be more like a dataclass, independent of the Query object entirely

Both of those are a rabbit holes I have no interest in falling down into right now. We should do the second one eventually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it is fine not to use the AST to write a unit test at this level, anyway if you want to use it, you don't need to depend on dataset to build a Query object and transform it into a Clickhosue query since the Query object depends on a DataSource (which is the abstract class above TableStorage and can be created from scratch without all the heavy weight of the dataset.

Example:
If you just want a Query object starting from the AST you can do it this way
https://github.com/getsentry/snuba/blob/master/tests/query/parser/test_query.py#L20-L32
(you have to use ASTClickhouseQuery instead of DictClickhouseQuery).



@pytest.mark.parametrize(
"reader",
[
NativeDriverReader(clickhouse_ro),
HTTPReader(settings.CLICKHOUSE_HOST, settings.CLICKHOUSE_HTTP_PORT),
],
)
def test_reader(reader: Reader[ClickhouseQuery]) -> None:
assert reader.execute(
SimpleClickhouseQuery(
[
("datetime", "toDateTime('2020-01-02 03:04:05')"),
("date", "toDate('2020-01-02')"),
("int64", "toInt64(1)"),
("uuid", "toUUID('00000000-0000-4000-8000-000000000000')"),
]
)
) == {
"meta": [
{"name": "datetime", "type": "DateTime"},
{"name": "date", "type": "Date"},
{"name": "int64", "type": "Int64"},
{"name": "uuid", "type": "UUID"},
],
"data": [
{
"date": "2020-01-02T00:00:00+00:00",
"datetime": "2020-01-02T03:04:05+00:00",
"int64": 1,
"uuid": "00000000-0000-4000-8000-000000000000",
}
],
}

with pytest.raises(ClickhouseError) as e:
reader.execute(SimpleClickhouseQuery([("invalid", '"')]))

assert e.value.code == 62