Skip to content

Commit

Permalink
Implement retryable clickhouse queries
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Jul 26, 2024
1 parent b35c498 commit be66cc5
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 42 deletions.
90 changes: 77 additions & 13 deletions lib/sanbase/clickhouse_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,45 @@ defmodule Sanbase.ClickhouseRepo do
end

@doc ~s"""
Execute a query and apply `transform_fn/1` on each row of the result.
Execute a query described by the Query struct or the query text and arguments.
If the execution is successful, transform_fn/1 is used to transform each row
of the result. transform_fn/1 accepts as argument a single list, containing one
value per column.
Example:
ClickhouseRepo.query_transform(
"SELECT toUnixTimestamp(now())",
[],
fn [ts] -> %{datetime: DateTime.from_unix!(ts)} end
)
"""
@spec query_transform(Sanbase.Clickhouse.Query.t(), (list() -> any())) ::
{:ok, any()} | {:error, String.t()}
@spec query_transform(String.t(), list(), (list() -> any())) ::
{:ok, any()} | {:error, String.t()}
def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn) do
def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn)
when is_function(transform_fn, 1) do
with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do
query_transform(sql, args, transform_fn)
end
end

def query_transform(query, args, transform_fn) do
case execute_query_transform(query, args) do
case execute_query_transform(query, args, caller: "query_transform/{2,3}") do
{:ok, result} ->
{:ok, Enum.map(result.rows, transform_fn)}

{:error, error} ->
{:error, error}
end
rescue
e ->
log_and_return_error_from_exception(e, "query_transform/3", __STACKTRACE__)
end

@doc ~s"""
Execute a query and apply the transform_fn on every row the result.
Return a map with the transformed rows alongside some metadata -
the query id, column names and a short summary of the used resources
the clickhouse query id, column names and a short summary of the used resources
"""
@spec query_transform_with_metadata(Sanbase.Clickhouse.Query.t(), (list() -> list())) ::
{:ok, Map.t()} | {:error, String.t()}
Expand All @@ -81,7 +91,10 @@ defmodule Sanbase.ClickhouseRepo do
end

def query_transform_with_metadata(query, args, transform_fn) do
case execute_query_transform(query, args, propagate_error: true) do
case execute_query_transform(query, args,
caller: "query_transform_with_metadata/3",
propagate_error: true
) do
{:ok, result} ->
{:ok,
%{
Expand Down Expand Up @@ -114,11 +127,15 @@ defmodule Sanbase.ClickhouseRepo do
when acc: any
def query_reduce(%Sanbase.Clickhouse.Query{} = query, init, reducer) do
with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do
query_reduce(sql, args, init, reducer)
execute_query_reduce(sql, args, init, reducer)
end
end

def query_reduce(query, args, init, reducer) do
execute_query_reduce(query, args, init, reducer)
end

defp execute_query_reduce(query, args, init, reducer, attempts_left \\ 1) do
ordered_params = order_params(query, args)
sanitized_query = sanitize_query(query)

Expand All @@ -130,14 +147,18 @@ defmodule Sanbase.ClickhouseRepo do
{:ok, Enum.reduce(result.rows, init, reducer)}

{:error, error} ->
log_and_return_error(error, "query_reduce/4")
if retryable_error?(error) and attempts_left > 0 do
execute_query_reduce(query, args, init, reducer, attempts_left - 1)
else
log_and_return_error(error, "query_reduce/4")
end
end
rescue
e ->
log_and_return_error_from_exception(e, "query_reduce/4", __STACKTRACE__)
end

defp execute_query_transform(query, args, opts \\ []) do
defp execute_query_transform(query, args, opts, attempts_left \\ 1) do
ordered_params = order_params(query, args)
sanitized_query = sanitize_query(query)

Expand All @@ -149,11 +170,20 @@ defmodule Sanbase.ClickhouseRepo do
{:ok, result}

{:error, error} ->
log_and_return_error(error, "query_transform/3", opts)
if retryable_error?(error) and attempts_left > 0 do
execute_query_transform(query, args, opts, attempts_left - 1)
else
log_and_return_error(error, opts[:caller], opts)
end
end
rescue
e ->
log_and_return_error_from_exception(e, opts[:caller], __STACKTRACE__,
propagate_error: Keyword.get(opts, :propagate_error, false)
)
end

@masked_error_message "Cannot execute database query. If issue persists please contact Santiment Support."
@masked_error_message "Cannot execute ClickHouse database query. If issue persists please contact Santiment Support."
defp log_and_return_error_from_exception(
%{} = exception,
function_executed,
Expand Down Expand Up @@ -330,4 +360,38 @@ defmodule Sanbase.ClickhouseRepo do
)
|> to_string()
end

def retryable_error?(error) do
error_str =
case error do
e when is_binary(e) -> e
%Clickhousex.Error{message: message} -> message
end

non_retryable_errors = [
"(SYNTAX_ERROR)",
"(ILLEGAL_TYPE_OF_ARGUMENT)",
"(UNKNOWN_IDENTIFIER)",
"(ACCESS_DENIED)",
"(UNKNOWN_TABLE)",
"(MEMORY_LIMIT_EXCEEDED)",
"(AMBIGUOUS_COLUMN_NAME)",
["number of params received", "does not match expected"]
]

has_non_retryable_error? =
Enum.any?(non_retryable_errors, fn
error when is_binary(error) ->
String.contains?(error_str, error)

# in case of lists, all elements in the list must be present in the error
# can be done with regexes, too, but this is simpler
errors_list when is_list(errors_list) ->
Enum.all?(errors_list, fn error ->
String.contains?(error_str, error)
end)
end)

not has_non_retryable_error?
end
end
31 changes: 9 additions & 22 deletions lib/sanbase/queries/refresh/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,19 @@ defmodule Sanbase.Queries.RefreshWorker do
defp maybe_remove_scheduled_job(result, nil), do: result

defp maybe_remove_scheduled_job({:error, error_str}, scheduled_job) do
case retryable_error?(error_str) do
true ->
{:error, error_str}

false ->
Oban.cancel_job(@oban_conf_name, scheduled_job)
{:error, error_str}
if clickhouse_error?(error_str) and Sanbase.ClickhouseRepo.retryable_error?(error_str) do
{:error, error_str}
else
Oban.cancel_job(@oban_conf_name, scheduled_job)
{:error, error_str}
end
end

defp maybe_remove_scheduled_job(result, _), do: result

defp retryable_error?(error_str) do
non_retryable_errors = [
"(SYNTAX_ERROR)",
"(ILLEGAL_TYPE_OF_ARGUMENT)",
"(UNKNOWN_IDENTIFIER)",
"(ACCESS_DENIED)",
"(UNKNOWN_TABLE)",
"(MEMORY_LIMIT_EXCEEDED)",
"(AMBIGUOUS_COLUMN_NAME)"
]

has_non_retryable_error? =
Enum.any?(non_retryable_errors, fn error -> String.contains?(error_str, error) end)

not has_non_retryable_error?
defp clickhouse_error?(error_str) do
# Clickhouse errors returned from our ClickhouseRepo
# start with "Cannot execute ClickHouse database query"
String.contains?(error_str, "ClickHouse")
end
end
2 changes: 1 addition & 1 deletion test/sanbase/clickhouse/api_call_data_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ defmodule Sanbase.Clickhouse.ApiCallDataTest do
{:error, error} =
ApiCallData.api_call_history(context.user.id, dt1, dt3, "1d", :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end) =~ error_msg
end)
end
Expand Down
4 changes: 2 additions & 2 deletions test/sanbase/clickhouse/clickhouse_repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do
{:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1)

assert error =~
"Cannot execute database query. If issue persists please contact Santiment Support"
"Cannot execute ClickHouse database query. If issue persists please contact Santiment Support"

# assert returned error message does not contain internal details
refute error =~ error_msg
Expand All @@ -38,7 +38,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do
{:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1)

assert error =~
"Cannot execute database query. If issue persists please contact Santiment Support"
"Cannot execute ClickHouse database query. If issue persists please contact Santiment Support"

# assert returned error message does not contain internal details
refute error =~ error_msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
|> Sanbase.Mock.run_with_mocks(fn ->
{:error, error} = TrendingWords.get_trending_words(dt1, dt3, "1d", 2, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -250,7 +250,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
|> Sanbase.Mock.run_with_mocks(fn ->
{:error, error} = TrendingWords.get_currently_trending_words(10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -281,7 +281,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
{:error, error} =
TrendingWords.get_word_trending_history("word", dt1, dt2, "1h", 10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down Expand Up @@ -316,7 +316,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do
{:error, error} =
TrendingWords.get_project_trending_history(project.slug, dt1, dt2, "1h", 10, :all)

assert error =~ "Cannot execute database query."
assert error =~ "Cannot execute ClickHouse database query."
end)
end
end
Expand Down

0 comments on commit be66cc5

Please sign in to comment.