Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ecto support based on ClickhouseEcto #69

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
5 changes: 4 additions & 1 deletion lib/pillar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ defmodule Pillar do
end

def query(%Connection{} = connection, query, params \\ %{}, options \\ %{}) do
final_sql = QueryBuilder.query(query, params)
final_sql =
QueryBuilder.query(query, params)
|> IO.inspect()
Zarathustra2 marked this conversation as resolved.
Show resolved Hide resolved

execute_sql(connection, final_sql, options)
end

Expand Down
5 changes: 5 additions & 0 deletions lib/pillar/ecto.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Pillar.Ecto do
use Ecto.Adapters.SQL,
driver: Pillar.Ecto.Driver,
migration_lock: "FOR UPDATE"
Zarathustra2 marked this conversation as resolved.
Show resolved Hide resolved
end
117 changes: 117 additions & 0 deletions lib/pillar/ecto/conn_mod.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule Pillar.Ecto.ConnMod do
@moduledoc false

use DBConnection

def connect(_) do
conn = Pillar.Connection.new("http://localhost:8123")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all Ecto requests will be used 1 connection?
Is pool needed here? Like in BulkInsertBuffer module

{:ok, %{conn: conn}}
end

def disconnect(_err, _state) do
:ok
end

@doc false
def ping(state) do
{:ok, state}
end

@doc false
def reconnect(new_opts, state) do
with :ok <- disconnect("Reconnecting", state),
do: connect(new_opts)
end

@doc false
def checkin(state) do
{:ok, state}
end

@doc false
def checkout(state) do
{:ok, state}
end

@doc false
def handle_status(_, state) do
{:idle, state}
end

@doc false
def handle_prepare(query, _, state) do
{:ok, query, state}
end

@doc false
def handle_execute(query, _params, _opts, state) do
IO.inspect(["here--------------", query])

case Pillar.query(state.conn, query.statement <> " FORMAT JSON") |> IO.inspect() do
{:error, reason} ->
{:error, reason, state}

{:ok, result} ->
IO.inspect(["RETURN CLICKHOUSE", result])

{
:ok,
query,
to_result(result),
state
}
end
end

defp to_result(res) do
case res do
xs when is_list(xs) -> %{num_rows: Enum.count(xs), rows: Enum.map(xs, &to_row/1)}
nil -> %{num_rows: 0, rows: [nil]}
_ -> %{num_rows: 1, rows: [res]}
end
|> IO.inspect()
end

defp to_row(xs) when is_list(xs), do: xs
defp to_row(x) when is_map(x), do: Map.values(x)
defp to_row(x), do: x

@doc false
def handle_declare(_query, _params, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_deallocate(_query, _cursor, _opts, state) do
{:error, :cursors_not_supported, state}
end

def handle_fetch(_query, _cursor, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_begin(_opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_close(_query, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_commit(_opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_info(_msg, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_rollback(_opts, state) do
{:error, :cursors_not_supported, state}
end
end
58 changes: 58 additions & 0 deletions lib/pillar/ecto/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Pillar.Ecto.Connection do
alias Pillar.Ecto.Query

def child_spec(opts) do
DBConnection.child_spec(Pillar.Ecto.ConnMod, opts)
end

def prepare_execute(conn, name, prepared_query, params, options) do
query = %Query{name: name, statement: prepared_query}
IO.inspect(["here", query])

case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, query, result} ->
IO.inspect("result")
{:ok, %{query | statement: prepared_query}, result}

{:error, error} ->
raise error
end
end

def execute(conn, query, params, options) do
IO.inspect(["here", query])

case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, _query, result} ->
{:ok, result}

{:error, error} ->
raise error
end
end

def to_constraints(_error), do: []

def stream(_conn, _prepared, _params, _options), do: raise("not implemented")

## Queries
def all(query) do
Query.all(query)
end

def update_all(query, prefix \\ nil), do: Query.update_all(query, prefix)

def delete_all(query), do: Query.delete_all(query)

def insert(prefix, table, header, rows, on_conflict, returning),
do: Query.insert(prefix, table, header, rows, on_conflict, returning)

def update(prefix, table, fields, filters, returning),
do: Query.update(prefix, table, fields, filters, returning)

def delete(prefix, table, filters, returning),
do: Query.delete(prefix, table, filters, returning)

## Migration
def execute_ddl(_), do: raise("No")
end
22 changes: 22 additions & 0 deletions lib/pillar/ecto/driver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Pillar.Ecto.Driver do
alias Pillar.Ecto.Query

def start_link(opts \\ []) do
DBConnection.start_link(
Pillar.Ecto.ConnMod,
opts |> Keyword.put(:show_sensitive_data_on_connection_error, true)
)
end

def child_spec(opts) do
DBConnection.child_spec(Pillar.Ecto.ConnMod, opts)
end

def query(conn, statement, params \\ [], opts \\ []) do
DBConnection.prepare_execute(conn, %Query{name: "", statement: statement}, params, opts)
end

def query!(conn, statement, params \\ [], opts \\ []) do
DBConnection.prepare_execute!(conn, %Query{name: "", statement: statement}, params, opts)
end
end
112 changes: 112 additions & 0 deletions lib/pillar/ecto/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
defmodule Pillar.Ecto.Helpers do
def get_source(query, sources, ix, source) do
{expr, name, _schema} = elem(sources, ix)
{expr || Pillar.Ecto.QueryBuilder.paren_expr(source, sources, query), name}
end

def quote_qualified_name(name, sources, ix) do
{_, source, _} = elem(sources, ix)
[source, ?. | quote_name(name)]
end

def quote_name(name, quoter \\ ?")
def quote_name(nil, _), do: []

def quote_name(names, quoter) when is_list(names) do
names
|> Enum.filter(&(not is_nil(&1)))
|> intersperse_map(?., &quote_name(&1, nil))
|> wrap_in(quoter)
end

def quote_name(name, quoter) when is_atom(name) do
quote_name(Atom.to_string(name), quoter)
end

def quote_name(name, quoter) do
if String.contains?(name, "\"") do
error!(nil, "bad name #{inspect(name)}")
end

wrap_in(name, quoter)
end

def wrap_in(value, nil), do: value

def wrap_in(value, {left_wrapper, right_wrapper}) do
[left_wrapper, value, right_wrapper]
end

def wrap_in(value, wrapper) do
[wrapper, value, wrapper]
end

def quote_table(prefix, name)
def quote_table(nil, name), do: quote_name(name)
def quote_table(prefix, name), do: intersperse_map([prefix, name], ?., &quote_name/1)

def single_quote(value), do: value |> escape_string |> wrap_in(?')

def intersperse_map(list, separator, mapper, acc \\ [])

def intersperse_map([], _separator, _mapper, acc),
do: acc

def intersperse_map([elem], _separator, mapper, acc),
do: [acc | mapper.(elem)]

def intersperse_map([elem | rest], separator, mapper, acc),
do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator])

def intersperse_reduce(list, separator, user_acc, reducer, acc \\ [])

def intersperse_reduce([], _separator, user_acc, _reducer, acc),
do: {acc, user_acc}

def intersperse_reduce([elem], _separator, user_acc, reducer, acc) do
{elem, user_acc} = reducer.(elem, user_acc)
{[acc | elem], user_acc}
end

def intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do
{elem, user_acc} = reducer.(elem, user_acc)
intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator])
end

def if_do(condition, value) do
if condition, do: value, else: []
end

def if_do(condition, value, else_value) do
if condition, do: value, else: else_value
end

def escape_string(value) when is_binary(value) do
:binary.replace(value, "'", "''", [:global])
end

def ecto_to_db({:array, t}), do: "Array(#{ecto_to_db(t)})"
def ecto_to_db(:id), do: "UInt32"
def ecto_to_db(:binary_id), do: "FixedString(36)"
def ecto_to_db(:uuid), do: "FixedString(36)"
def ecto_to_db(:string), do: "String"
def ecto_to_db(:binary), do: "FixedString(4000)"
def ecto_to_db(:integer), do: "Int32"
def ecto_to_db(:bigint), do: "Int64"
def ecto_to_db(:float), do: "Float32"
def ecto_to_db(:decimal), do: "Float64"
def ecto_to_db(:boolean), do: "UInt8"
def ecto_to_db(:date), do: "Date"
def ecto_to_db(:utc_datetime), do: "DateTime"
def ecto_to_db(:naive_datetime), do: "DateTime"
def ecto_to_db(:timestamp), do: "DateTime"
def ecto_to_db(other), do: Atom.to_string(other)

def error!(nil, message) do
raise ArgumentError, message
end

def error!(query, message) do
raise Ecto.QueryError, query: query, message: message
end
end
Loading