Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement retryable clickhouse queries #4355

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions lib/sanbase/clickhouse_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,45 @@ defmodule Sanbase.ClickhouseRepo do
end

@doc ~s"""
Execute a query and apply `transform_fn/1` on each row of the result.
Execute a query described by the Query struct or the query text and arguments.

If the execution is successful, transform_fn/1 is used to transform each row
of the result. transform_fn/1 accepts as argument a single list, containing one
value per column.

Example:

ClickhouseRepo.query_transform(
"SELECT toUnixTimestamp(now())",
[],
fn [ts] -> %{datetime: DateTime.from_unix!(ts)} end
)
"""
@spec query_transform(Sanbase.Clickhouse.Query.t(), (list() -> any())) ::
{:ok, any()} | {:error, String.t()}
@spec query_transform(String.t(), list(), (list() -> any())) ::
{:ok, any()} | {:error, String.t()}
def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn) do
def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn)
when is_function(transform_fn, 1) do
with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do
query_transform(sql, args, transform_fn)
end
end

def query_transform(query, args, transform_fn) do
case execute_query_transform(query, args) do
case execute_query_transform(query, args, caller: "query_transform/{2,3}") do
{:ok, result} ->
{:ok, Enum.map(result.rows, transform_fn)}

{:error, error} ->
{:error, error}
end
rescue
e ->
log_and_return_error_from_exception(e, "query_transform/3", __STACKTRACE__)
end

@doc ~s"""
Execute a query and apply the transform_fn on every row the result.
Return a map with the transformed rows alongside some metadata -
the query id, column names and a short summary of the used resources
the clickhouse query id, column names and a short summary of the used resources
"""
@spec query_transform_with_metadata(Sanbase.Clickhouse.Query.t(), (list() -> list())) ::
{:ok, Map.t()} | {:error, String.t()}
Expand All @@ -81,7 +91,10 @@ defmodule Sanbase.ClickhouseRepo do
end

def query_transform_with_metadata(query, args, transform_fn) do
case execute_query_transform(query, args, propagate_error: true) do
case execute_query_transform(query, args,
caller: "query_transform_with_metadata/3",
propagate_error: true
) do
{:ok, result} ->
{:ok,
%{
Expand Down Expand Up @@ -114,11 +127,15 @@ defmodule Sanbase.ClickhouseRepo do
when acc: any
def query_reduce(%Sanbase.Clickhouse.Query{} = query, init, reducer) do
with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do
query_reduce(sql, args, init, reducer)
execute_query_reduce(sql, args, init, reducer)
end
end

def query_reduce(query, args, init, reducer) do
execute_query_reduce(query, args, init, reducer)
end

defp execute_query_reduce(query, args, init, reducer, attempts_left \\ 1) do
ordered_params = order_params(query, args)
sanitized_query = sanitize_query(query)

Expand All @@ -130,14 +147,18 @@ defmodule Sanbase.ClickhouseRepo do
{:ok, Enum.reduce(result.rows, init, reducer)}

{:error, error} ->
log_and_return_error(error, "query_reduce/4")
if retryable_error?(error) and attempts_left > 0 do
execute_query_reduce(query, args, init, reducer, attempts_left - 1)
else
log_and_return_error(error, "query_reduce/4")
end
end
rescue
e ->
log_and_return_error_from_exception(e, "query_reduce/4", __STACKTRACE__)
end

defp execute_query_transform(query, args, opts \\ []) do
defp execute_query_transform(query, args, opts, attempts_left \\ 1) do
ordered_params = order_params(query, args)
sanitized_query = sanitize_query(query)

Expand All @@ -149,11 +170,20 @@ defmodule Sanbase.ClickhouseRepo do
{:ok, result}

{:error, error} ->
log_and_return_error(error, "query_transform/3", opts)
if retryable_error?(error) and attempts_left > 0 do
execute_query_transform(query, args, opts, attempts_left - 1)
else
log_and_return_error(error, opts[:caller], opts)
end
end
rescue
e ->
log_and_return_error_from_exception(e, opts[:caller], __STACKTRACE__,
propagate_error: Keyword.get(opts, :propagate_error, false)
)
end

@masked_error_message "Cannot execute database query. If issue persists please contact Santiment Support."
@masked_error_message "Cannot execute ClickHouse database query. If issue persists please contact Santiment Support."
defp log_and_return_error_from_exception(
%{} = exception,
function_executed,
Expand Down Expand Up @@ -330,4 +360,38 @@ defmodule Sanbase.ClickhouseRepo do
)
|> to_string()
end

def retryable_error?(error) do
error_str =
case error do
e when is_binary(e) -> e
%Clickhousex.Error{message: message} -> message
end

non_retryable_errors = [
"(SYNTAX_ERROR)",
"(ILLEGAL_TYPE_OF_ARGUMENT)",
"(UNKNOWN_IDENTIFIER)",
"(ACCESS_DENIED)",
"(UNKNOWN_TABLE)",
"(MEMORY_LIMIT_EXCEEDED)",
"(AMBIGUOUS_COLUMN_NAME)",
["number of params received", "does not match expected"]
]

has_non_retryable_error? =
Enum.any?(non_retryable_errors, fn
error when is_binary(error) ->
String.contains?(error_str, error)

# in case of lists, all elements in the list must be present in the error
# can be done with regexes, too, but this is simpler
errors_list when is_list(errors_list) ->
Enum.all?(errors_list, fn error ->
String.contains?(error_str, error)
end)
end)

not has_non_retryable_error?
end
end
31 changes: 9 additions & 22 deletions lib/sanbase/queries/refresh/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,19 @@ defmodule Sanbase.Queries.RefreshWorker do
defp maybe_remove_scheduled_job(result, nil), do: result

defp maybe_remove_scheduled_job({:error, error_str}, scheduled_job) do
case retryable_error?(error_str) do
true ->
{:error, error_str}

false ->
Oban.cancel_job(@oban_conf_name, scheduled_job)
{:error, error_str}
if clickhouse_error?(error_str) and Sanbase.ClickhouseRepo.retryable_error?(error_str) do
{:error, error_str}
else
Oban.cancel_job(@oban_conf_name, scheduled_job)
{:error, error_str}
end
end

defp maybe_remove_scheduled_job(result, _), do: result

defp retryable_error?(error_str) do
non_retryable_errors = [
"(SYNTAX_ERROR)",
"(ILLEGAL_TYPE_OF_ARGUMENT)",
"(UNKNOWN_IDENTIFIER)",
"(ACCESS_DENIED)",
"(UNKNOWN_TABLE)",
"(MEMORY_LIMIT_EXCEEDED)",
"(AMBIGUOUS_COLUMN_NAME)"
]

has_non_retryable_error? =
Enum.any?(non_retryable_errors, fn error -> String.contains?(error_str, error) end)

not has_non_retryable_error?
defp clickhouse_error?(error_str) do
# Clickhouse errors returned from our ClickhouseRepo
# start with "Cannot execute ClickHouse database query"
String.contains?(error_str, "ClickHouse")
end
end
2 changes: 1 addition & 1 deletion test/sanbase/clickhouse/api_call_data_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ defmodule Sanbase.Clickhouse.ApiCallDataTest do
{:error, error} =
ApiCallData.api_call_history(context.user.id, dt1, dt3, "1d", :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end) =~ error_msg
end)
end
Expand Down
4 changes: 2 additions & 2 deletions test/sanbase/clickhouse/clickhouse_repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do
{:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1)

assert error =~
"Cannot execute database query. If issue persists please contact Santiment Support"
"Cannot execute ClickHouse database query. If issue persists please contact Santiment Support"

# assert returned error message does not contain internal details
refute error =~ error_msg
Expand All @@ -38,7 +38,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do
{:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1)

assert error =~
"Cannot execute database query. If issue persists please contact Santiment Support"
"Cannot execute ClickHouse database query. If issue persists please contact Santiment Support"

# assert returned error message does not contain internal details
refute error =~ error_msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
|> Sanbase.Mock.run_with_mocks(fn ->
{:error, error} = TrendingWords.get_trending_words(dt1, dt3, "1d", 2, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -250,7 +250,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
|> Sanbase.Mock.run_with_mocks(fn ->
{:error, error} = TrendingWords.get_currently_trending_words(10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -281,7 +281,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
{:error, error} =
TrendingWords.get_word_trending_history("word", dt1, dt2, "1h", 10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -316,7 +316,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
{:error, error} =
TrendingWords.get_project_trending_history(project.slug, dt1, dt2, "1h", 10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down