diff --git a/lib/pillar/ecto.ex b/lib/pillar/ecto.ex new file mode 100644 index 0000000..6b8aedc --- /dev/null +++ b/lib/pillar/ecto.ex @@ -0,0 +1,8 @@ +defmodule Pillar.Ecto do + use Ecto.Adapters.SQL, + driver: Pillar.Ecto.Driver + + def supports_ddl_transaction?(), do: false + + def lock_for_migrations(_, _, _), do: nil +end diff --git a/lib/pillar/ecto/conn_mod.ex b/lib/pillar/ecto/conn_mod.ex new file mode 100644 index 0000000..f027c9f --- /dev/null +++ b/lib/pillar/ecto/conn_mod.ex @@ -0,0 +1,165 @@ +defmodule Pillar.Ecto.ConnMod do + @moduledoc false + + use DBConnection + + alias Pillar.Connection + alias Pillar.HttpClient + alias Pillar.HttpClient.Response + alias Pillar.HttpClient.TransportError + alias Pillar.Ecto.Helpers + + def connect(opts) do + url = Keyword.get(opts, :url, "http://localhost:8123") + conn = Pillar.Connection.new(url) + {:ok, %{conn: conn}} + end + + def disconnect(_err, _state) do + :ok + end + + @doc false + def ping(state) do + {:ok, state} + end + + @doc false + def reconnect(new_opts, state) do + with :ok <- disconnect("Reconnecting", state), + do: connect(new_opts) + end + + @doc false + def checkin(state) do + {:ok, state} + end + + @doc false + def checkout(state) do + {:ok, state} + end + + @doc false + def handle_status(_, state) do + {:idle, state} + end + + @doc false + def handle_prepare(query, _, state) do + {:ok, query, state} + end + + @doc false + def handle_execute(query, _params, _opts, state) do + params = Enum.join(query.params, "&") + + url = + state.conn + |> Connection.url_from_connection() + + url = url <> "&" <> params + + url + |> HttpClient.post( + query.statement <> + " FORMAT JSONCompactEachRowWithNamesAndTypes SETTINGS date_time_output_format='iso', output_format_json_quote_64bit_integers=0", + timeout: 60_000 + ) + |> parse() + |> case do + {:error, reason} -> + {:error, reason, state} + + {:ok, body} -> + [types | rows] = + body + |> String.split("\n", trim: true) + |> Enum.map(&Jason.decode!(&1)) + |> Enum.drop(1) + + rows = + rows + |> Enum.map(fn row -> + Enum.zip(row, types) + |> Enum.map(fn {data, type} -> + Helpers.parse_type(type, data) + end) + end) + + { + :ok, + query, + to_result(rows), + state + } + end + end + + defp parse(%Response{status_code: 200, body: body}) do + {:ok, body} + end + + defp parse(%Response{status_code: _any, body: _body} = resp) do + {:error, resp} + end + + defp parse(%TransportError{} = error) do + {:error, error} + end + + defp parse(%RuntimeError{} = error) do + {:error, error} + end + + defp to_result(res) do + case res do + xs when is_list(xs) -> %{num_rows: Enum.count(xs), rows: Enum.map(xs, &to_row/1)} + nil -> %{num_rows: 0, rows: [nil]} + _ -> %{num_rows: 1, rows: [res]} + end + end + + defp to_row(xs) when is_list(xs), do: xs + defp to_row(x) when is_map(x), do: Map.values(x) + defp to_row(x), do: x + + @doc false + def handle_declare(_query, _params, _opts, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_deallocate(_query, _cursor, _opts, state) do + {:error, :cursors_not_supported, state} + end + + def handle_fetch(_query, _cursor, _opts, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_begin(_opts, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_close(_query, _opts, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_commit(_opts, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_info(_msg, state) do + {:error, :cursors_not_supported, state} + end + + @doc false + def handle_rollback(_opts, state) do + {:error, :cursors_not_supported, state} + end +end diff --git a/lib/pillar/ecto/connection.ex b/lib/pillar/ecto/connection.ex new file mode 100644 index 0000000..e9b3fa9 --- /dev/null +++ b/lib/pillar/ecto/connection.ex @@ -0,0 +1,59 @@ +defmodule Pillar.Ecto.Connection do + alias Pillar.Ecto.Query + + def child_spec(opts) do + DBConnection.child_spec(Pillar.Ecto.ConnMod, opts) + end + + def query(conn, query, params, _) when is_binary(query) do + query = %Query{name: "", statement: query, params: params} + execute(conn, query, [], []) + end + + def prepare_execute(conn, name, prepared_query, params, options) do + query = %Query{name: name, statement: prepared_query, params: params} + + case DBConnection.prepare_execute(conn, query, params, options) do + {:ok, query, result} -> + {:ok, %{query | statement: prepared_query}, result} + + {:error, error} -> + raise error + end + end + + def execute(conn, query, params, options) do + case DBConnection.prepare_execute(conn, query, params, options) do + {:ok, _query, result} -> + {:ok, result} + + {:error, error} -> + raise error + end + end + + def to_constraints(_error), do: [] + + def stream(_conn, _prepared, _params, _options), do: raise("not implemented") + + ## Queries + def all(query) do + Query.all(query) + end + + def update_all(query, prefix \\ nil), do: Query.update_all(query, prefix) + + def delete_all(query), do: Query.delete_all(query) + + def insert(prefix, table, header, rows, on_conflict, returning, _), + do: Query.insert(prefix, table, header, rows, on_conflict, returning) + + def update(prefix, table, fields, filters, returning), + do: Query.update(prefix, table, fields, filters, returning) + + def delete(prefix, table, filters, returning), + do: Query.delete(prefix, table, filters, returning) + + ## Migration + def execute_ddl(_), do: raise("No") +end diff --git a/lib/pillar/ecto/driver.ex b/lib/pillar/ecto/driver.ex new file mode 100644 index 0000000..6a7a33a --- /dev/null +++ b/lib/pillar/ecto/driver.ex @@ -0,0 +1,22 @@ +defmodule Pillar.Ecto.Driver do + alias Pillar.Ecto.Query + + def start_link(opts \\ []) do + DBConnection.start_link( + Pillar.Ecto.ConnMod, + opts |> Keyword.put(:show_sensitive_data_on_connection_error, true) + ) + end + + def child_spec(opts) do + DBConnection.child_spec(Pillar.Ecto.ConnMod, opts) + end + + def query(conn, statement, params \\ [], opts \\ []) do + DBConnection.prepare_execute(conn, %Query{name: "", statement: statement}, params, opts) + end + + def query!(conn, statement, params \\ [], opts \\ []) do + DBConnection.prepare_execute!(conn, %Query{name: "", statement: statement}, params, opts) + end +end diff --git a/lib/pillar/ecto/helpers.ex b/lib/pillar/ecto/helpers.ex new file mode 100644 index 0000000..b32cd79 --- /dev/null +++ b/lib/pillar/ecto/helpers.ex @@ -0,0 +1,158 @@ +defmodule Pillar.Ecto.Helpers do + def get_source(query, sources, ix, source, all_params) do + {expr, name, _schema} = elem(sources, ix) + + {expr, all_params} = + case expr do + nil -> Pillar.Ecto.QueryBuilder.paren_expr(source, sources, query, all_params) + _ -> {expr, all_params} + end + + {{expr, name}, all_params} + end + + def quote_qualified_name(name, sources, ix) do + {_, source, _} = elem(sources, ix) + [source, ?. | quote_name(name)] + end + + def quote_name(name, quoter \\ ?") + def quote_name(nil, _), do: [] + + def quote_name(names, quoter) when is_list(names) do + names + |> Enum.filter(&(not is_nil(&1))) + |> intersperse_map(?., "e_name(&1, nil)) + |> wrap_in(quoter) + end + + def quote_name(name, quoter) when is_atom(name) do + quote_name(Atom.to_string(name), quoter) + end + + def quote_name(name, quoter) do + if String.contains?(name, "\"") do + error!(nil, "bad name #{inspect(name)}") + end + + wrap_in(name, quoter) + end + + def wrap_in(value, nil), do: value + + def wrap_in(value, {left_wrapper, right_wrapper}) do + [left_wrapper, value, right_wrapper] + end + + def wrap_in(value, wrapper) do + [wrapper, value, wrapper] + end + + def intersperse_map(list, separator, mapper, acc \\ []) + + def intersperse_map([], _separator, _mapper, acc), + do: acc + + def intersperse_map([elem], _separator, mapper, acc), + do: [acc | mapper.(elem)] + + def intersperse_map([elem | rest], separator, mapper, acc), + do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator]) + + def intersperse_reduce(list, separator, user_acc, reducer, acc \\ []) + + def intersperse_reduce([], _separator, user_acc, _reducer, acc), + do: {acc, user_acc} + + def intersperse_reduce([elem], _separator, user_acc, reducer, acc) do + {elem, user_acc} = reducer.(elem, user_acc) + {[acc | elem], user_acc} + end + + def intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do + {elem, user_acc} = reducer.(elem, user_acc) + intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator]) + end + + def parse_type(_, nil), do: nil + def parse_type("String", s), do: s + def parse_type("LowCardinality(String)", s), do: s + def parse_type("UInt8", i), do: parse_integer(i) + def parse_type("UInt16", i), do: parse_integer(i) + def parse_type("UInt32", i), do: parse_integer(i) + def parse_type("UInt64", i), do: parse_integer(i) + def parse_type("UInt128", i), do: parse_integer(i) + def parse_type("UInt256", i), do: parse_integer(i) + def parse_type("Int8", i), do: parse_integer(i) + def parse_type("Int16", i), do: parse_integer(i) + def parse_type("Int32", i), do: parse_integer(i) + def parse_type("Int64", i), do: parse_integer(i) + def parse_type("Int128", i), do: parse_integer(i) + def parse_type("Int256", i), do: parse_integer(i) + def parse_type("Float32", i), do: parse_float(i) + def parse_type("Float64", i), do: parse_float(i) + + def parse_type("DateTime", s) do + {:ok, time, _} = DateTime.from_iso8601(s) + time + end + + @decimal_types 1..76 + |> Enum.flat_map(fn i -> + [ + "Decimal32(#{i})", + "Decimal64(#{i})", + "Decimal128(#{i})", + "Decimal256(#{i})" + ] + end) + + for type <- @decimal_types do + def parse_type(unquote(type), i), do: parse_dec(i) + end + + def parse_type(_, i), do: i + + defp parse_dec(i) when is_integer(i), do: Decimal.new(i) + defp parse_dec(i) when is_binary(i), do: Decimal.new(i) + defp parse_dec(i) when is_float(i), do: Decimal.from_float(i) + + defp parse_integer(i) when is_integer(i), do: i + + defp parse_integer(s) when is_binary(s) do + {i, ""} = Integer.parse(s) + i + end + + defp parse_float(f) when is_float(f), do: f + + defp parse_float(s) when is_binary(s) do + {f, ""} = Float.parse(s) + f + end + + def ecto_to_db({:array, t}), do: "Array(#{ecto_to_db(t)})" + def ecto_to_db(:id), do: "UInt32" + def ecto_to_db(:binary_id), do: "FixedString(36)" + def ecto_to_db(:uuid), do: "FixedString(36)" + def ecto_to_db(:string), do: "String" + def ecto_to_db(:binary), do: "FixedString(4000)" + def ecto_to_db(:integer), do: "Int32" + def ecto_to_db(:bigint), do: "Int64" + def ecto_to_db(:float), do: "Float32" + def ecto_to_db(:decimal), do: "Float64" + def ecto_to_db(:boolean), do: "UInt8" + def ecto_to_db(:date), do: "Date" + def ecto_to_db(:utc_datetime), do: "DateTime" + def ecto_to_db(:naive_datetime), do: "DateTime" + def ecto_to_db(:timestamp), do: "DateTime" + def ecto_to_db(other), do: Atom.to_string(other) + + def error!(nil, message) do + raise ArgumentError, message + end + + def error!(query, message) do + raise Ecto.QueryError, query: query, message: message + end +end diff --git a/lib/pillar/ecto/query.ex b/lib/pillar/ecto/query.ex new file mode 100644 index 0000000..c2c1488 --- /dev/null +++ b/lib/pillar/ecto/query.ex @@ -0,0 +1,150 @@ +defmodule Pillar.Ecto.Query do + alias Pillar.Ecto.QueryBuilder + + @type t :: %__MODULE__{ + name: iodata, + param_count: integer, + params: iodata | nil, + columns: [String.t()] | nil + } + + defstruct name: nil, + statement: "", + type: :select, + params: [], + param_count: 0, + columns: [] + + def new(statement) do + %__MODULE__{statement: statement} + |> DBConnection.Query.parse([]) + end + + @spec all(query :: Ecto.Query.t()) :: String.t() + def all(query) do + sources = QueryBuilder.create_names(query) + + # We now extract all parameters in the query + # this should be in the same order as we join the query down below together + + all_params = + [ + {:where, query.wheres}, + {:limit, query.limit} + ] + |> Enum.reduce([], fn {for_field, elem}, acc -> + List.wrap(elem) + |> Enum.reduce(acc, fn expr, acc -> + QueryBuilder.param_extractor(for_field, expr, sources, acc) + end) + end) + |> List.flatten() + |> Enum.reverse() + + {select_distinct, order_by_distinct} = QueryBuilder.distinct(query.distinct, sources, query) + + {from, all_params} = QueryBuilder.from(query, sources, all_params) + {select, all_params} = QueryBuilder.select(query, select_distinct, sources, all_params) + # join = QueryBuilder.join(query, sources) + {where, all_params} = QueryBuilder.where(query, sources, all_params) + {group_by, all_params} = QueryBuilder.group_by(query, sources, all_params) + {having, all_params} = QueryBuilder.having(query, sources, all_params) + {order_by, all_params} = QueryBuilder.order_by(query, order_by_distinct, sources, all_params) + {limit, _all_params} = QueryBuilder.limit(query, sources, all_params) + + res = [select, from, where, group_by, having, order_by, limit] + + IO.iodata_to_binary(res) + end + + def insert(_prefix, _table, _header, _rows, _on_conflict, _returning), + do: raise("Not supported") + + def update(_prefix, _table, _fields, _filters, _returning), do: raise("Not supported") + + def delete(_prefix, _table, _filters, _returning), do: raise("Not supported") + + def update_all(_query, _prefix \\ nil), do: raise("Not supported") + + def delete_all(_query), do: raise("Not supported") + + defmacro any_ch(field) do + quote do + fragment("any(?)", unquote(field)) + end + end + + defmacro anyLast(field) do + quote do + fragment("anyLast(?)", unquote(field)) + end + end + + defmacro anyHeavy(field) do + quote do + fragment("anyHeavy(?)", unquote(field)) + end + end + + defmacro uniq(field) do + quote do + fragment("uniq(?)", unquote(field)) + end + end + + defmacro stddevPop(field) do + quote do + fragment("stddevPop(?)", unquote(field)) + end + end + + defmacro argMin(arg, val) do + quote do + fragment("argMin(?, ?)", unquote(arg), unquote(val)) + end + end + + defmacro argMax(arg, val) do + quote do + fragment("argMax(?, ?)", unquote(arg), unquote(val)) + end + end +end + +defimpl DBConnection.Query, for: Pillar.Ecto.Query do + def parse(%{statement: statement, params: params} = query, _opts) do + params = + Regex.scan(~r/\{\w+_\d:\w+\}*/, statement) + |> List.flatten() + |> Enum.map(fn param -> + [name, _] = Regex.replace(~r/({|})/, param, "") |> String.split(":") + name + end) + |> Enum.zip(params) + |> Enum.map(fn {name, value} -> + "param_#{name}=#{value}" + end) + + %{query | param_count: length(params), params: params} + end + + def describe(query, _opts) do + query + end + + def encode(%{type: :insert}, _params, _opts), do: raise("Not supported") + + def encode(%{statement: query_part}, _params, _opts) do + query_part + end + + def decode(_query, result, _opts) do + result + end +end + +defimpl String.Chars, for: Pillar.Ecto.Query do + def to_string(%Pillar.Ecto.Query{statement: statement}) do + IO.iodata_to_binary(statement) + end +end diff --git a/lib/pillar/ecto/query_builder.ex b/lib/pillar/ecto/query_builder.ex new file mode 100644 index 0000000..fb309c5 --- /dev/null +++ b/lib/pillar/ecto/query_builder.ex @@ -0,0 +1,541 @@ +defmodule Pillar.Ecto.QueryBuilder do + alias Ecto.Query + alias Ecto.Query.BooleanExpr + alias Ecto.Query.JoinExpr + alias Ecto.Query.QueryExpr + alias Pillar.Ecto.QueryParam + + alias Pillar.Ecto.Helpers + + binary_ops = [ + ==: " = ", + !=: " != ", + <=: " <= ", + >=: " >= ", + <: " < ", + >: " > ", + and: " AND ", + or: " OR ", + ilike: " ILIKE ", + like: " LIKE ", + in: " IN ", + is_nil: " WHERE " + ] + + @binary_ops Keyword.keys(binary_ops) + + Enum.map(binary_ops, fn {op, str} -> + def handle_call(unquote(op), 2), do: {:binary_op, unquote(str)} + end) + + def handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)} + + def select(%Query{select: %{fields: fields}} = query, select_distinct, sources, all_params) do + {ex, all_params} = select_fields(fields, sources, query, all_params) + {["SELECT", select_distinct, ?\s | ex], all_params} + end + + def select_fields([], _sources, _query, all_params), do: {"'TRUE'", all_params} + + def select_fields(fields, sources, query, all_params) do + Helpers.intersperse_reduce(fields, ", ", all_params, fn + {key, value}, all_params -> + {ex, all_params} = expr(value, sources, query, all_params) + {[ex, " AS " | Helpers.quote_name(key)], all_params} + + value, all_params -> + expr(value, sources, query, all_params) + end) + end + + def distinct(nil, _, _), do: {[], []} + def distinct(%QueryExpr{expr: []}, _, _), do: {[], []} + def distinct(%QueryExpr{expr: true}, _, _), do: {" DISTINCT", []} + def distinct(%QueryExpr{expr: false}, _, _), do: {[], []} + + def distinct(%QueryExpr{expr: _exprs}, _sources, query) do + Helpers.error!( + query, + "DISTINCT ON is not supported! Use `distinct: true`, for ex. `from rec in MyModel, distinct: true, select: rec.my_field`" + ) + end + + def from(%{from: %{source: source}} = query, sources, all_params) do + {{from, name}, all_params} = Helpers.get_source(query, sources, 0, source, all_params) + {[" FROM ", from, " AS " | name], all_params} + end + + def update_fields(query, _sources) do + Helpers.error!(query, "UPDATE is not supported") + end + + def join(%Query{joins: []}, _sources, all_params), do: {[], all_params} + + def join(%Query{joins: joins} = query, sources, all_params) do + [ + ?\s + | Helpers.intersperse_reduce(joins, ?\s, all_params, fn + %JoinExpr{qual: qual, ix: ix, source: source, on: %QueryExpr{expr: on_expr}}, + all_params -> + {{join, _name}, all_params} = + Helpers.get_source(query, sources, ix, source, all_params) + + {["ANY", join_qual(qual), join, " USING ", on_join_expr(on_expr)], all_params} + end) + ] + end + + def on_join_expr({_, _, [head | tail]}) do + retorno = [on_join_expr(head) | on_join_expr(tail)] + retorno |> Enum.uniq() |> Enum.join(",") + end + + def on_join_expr([head | tail]) do + [on_join_expr(head) | tail] + end + + def on_join_expr({{:., [], [{:&, [], _}, column]}, [], []}) when is_atom(column) do + column |> Atom.to_string() + end + + def on_join_expr({:==, _, [{{_, _, [_, column]}, _, _}, _]}) when is_atom(column) do + column |> Atom.to_string() + end + + def join_qual(:inner), do: " INNER JOIN " + def join_qual(:left), do: " LEFT OUTER JOIN " + + def where(%Query{wheres: wheres} = query, sources, all_params) do + boolean(" WHERE ", wheres, sources, query, all_params) + end + + def having(%Query{havings: havings} = query, sources, all_params) do + boolean(" HAVING ", havings, sources, query, all_params) + end + + def group_by(%Query{group_bys: []}, _sources, all_params), do: {[], all_params} + + def group_by(%Query{group_bys: group_bys} = query, sources, all_params) do + {expr, all_params} = + Helpers.intersperse_reduce(group_bys, ", ", all_params, fn + %QueryExpr{expr: expr}, all_params -> + Helpers.intersperse_reduce(expr, ", ", all_params, &expr(&1, sources, query, &2)) + end) + + xs = [ + " GROUP BY " + | expr + ] + + {xs, all_params} + end + + def order_by(%Query{order_bys: []}, _distinct, _sources, all_params), do: {[], all_params} + + def order_by(%Query{order_bys: order_bys} = query, distinct, sources, all_params) do + order_bys = Enum.flat_map(order_bys, & &1.expr) + + {expr, all_params} = + Helpers.intersperse_reduce( + distinct ++ order_bys, + ", ", + all_params, + &order_by_expr(&1, sources, query, &2) + ) + + xs = [ + " ORDER BY " + | expr + ] + + {xs, all_params} + end + + def order_by_expr({dir, expr}, sources, query, all_params) do + {str, all_params} = expr(expr, sources, query, all_params) + + case dir do + :asc -> {str, all_params} + :desc -> {[str | " DESC"], all_params} + end + end + + def limit(%Query{offset: nil, limit: nil}, _sources, all_params), do: {[], all_params} + + def limit(%Query{offset: nil, limit: %QueryExpr{expr: expr}} = query, sources, all_params) do + {exp, all_params} = expr(expr, sources, query, all_params) + {[" LIMIT ", exp], all_params} + end + + def limit( + %Query{offset: %QueryExpr{expr: expr_offset}, limit: %QueryExpr{expr: expr_limit}} = + query, + sources, + all_params + ) do + {expr_offset, all_params} = expr(expr_offset, sources, query, all_params) + {expr_limit, all_params} = expr(expr_limit, sources, query, all_params) + + {[ + " LIMIT ", + expr_offset, + ", ", + expr_limit + ], all_params} + end + + def boolean(_name, [], _sources, _query, all_params), do: {[], all_params} + + def boolean( + name, + [%{expr: expr, op: op} | query_exprs], + sources, + query, + all_params + ) do + {exp, all_params} = paren_expr(expr, sources, query, all_params) + + acc = { + {op, exp}, + all_params + } + + {{_, where_filters}, all_params} = + Enum.reduce(query_exprs, acc, fn + %BooleanExpr{expr: expr, op: op}, {{op, acc}, all_params} -> + {exp, all_params} = paren_expr(expr, sources, query, all_params) + + acc = {op, [acc, operator_to_boolean(op), exp]} + + {acc, all_params} + + %BooleanExpr{expr: expr, op: op}, {{_, acc}, all_params} -> + {exp, all_params} = paren_expr(expr, sources, query, all_params) + + acc = {op, [?(, acc, ?), operator_to_boolean(op), exp]} + + {acc, all_params} + end) + + {[name | where_filters], all_params} + end + + def operator_to_boolean(:and), do: " AND " + def operator_to_boolean(:or), do: " OR " + + def paren_expr(false, _sources, _query, all_params), do: {"false", all_params} + def paren_expr(true, _sources, _query, all_params), do: {"true", all_params} + + def paren_expr(expr, sources, query, all_params) do + {ex, all_params} = expr(expr, sources, query, all_params) + {[?(, ex, ?)], all_params} + end + + def expr({_type, [literal]}, sources, query, all_params) do + expr(literal, sources, query, all_params) + end + + def expr({:^, [], [_ix]}, _sources, _query, all_params) do + [param | rest] = all_params + {to_string(param), rest} + end + + def expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query, all_params) + when is_atom(field) do + res = Helpers.quote_qualified_name(field, sources, idx) + {res, all_params} + end + + def expr({:&, _, [idx, fields, _counter]}, sources, query, all_params) do + {_, name, schema} = elem(sources, idx) + + if is_nil(schema) and is_nil(fields) do + Helpers.error!( + query, + "ClickHouse requires a schema module when using selector " <> + "#{inspect(name)} but none was given. " <> + "Please specify a schema or specify exactly which fields from " <> + "#{inspect(name)} you desire" + ) + end + + {Helpers.intersperse_map(fields, ", ", &[name, ?. | Helpers.quote_name(&1)]), all_params} + end + + def expr({:in, _, [_left, []]}, _sources, _query, all_params) do + {"0", all_params} + end + + def expr({:in, _, [left, right]}, sources, query, all_params) when is_list(right) do + {args, all_params} = + Helpers.intersperse_reduce(right, ?,, all_params, &expr(&1, sources, query, all_params)) + + {exp, all_params} = expr(left, sources, query, all_params) + {[exp, " IN (", args, ?)], all_params} + end + + def expr({:in, _, [_, {:^, _, [_, 0]}]}, _sources, _query, all_params) do + {"0", all_params} + end + + def expr({:in, _, [left, {:^, _, [_, length]}]}, sources, query, all_params) do + # TODO + args = Enum.intersperse(List.duplicate(??, length), ?,) + [expr(left, sources, query, all_params), " IN (", args, ?)] + end + + def expr({:in, _, [left, right]}, sources, query, all_params) do + {left, all_params} = expr(left, sources, query, all_params) + {right, all_params} = expr(right, sources, query, all_params) + + # TODO: Any is not being supported + {[ + left, + " = ANY(", + right, + ?) + ], all_params} + end + + def expr({:is_nil, _, [arg]}, sources, query, all_params) do + {exp, all_params} = expr(arg, sources, query, all_params) + {[exp | " IS NULL"], all_params} + end + + def expr({:not, _, [expr]}, sources, query, all_params) do + case expr do + {fun, _, _} when fun in @binary_ops -> + {ex, all_params} = expr(expr, sources, query, all_params) + {["NOT (", ex, ?)], all_params} + + _ -> + {ex, all_params} = expr(expr, sources, query, all_params) + {["~(", ex, ?)], all_params} + end + end + + def expr(%Ecto.SubQuery{query: query, params: _params}, _sources, _query, all_params) do + {Pillar.Ecto.Query.all(query), all_params} + end + + def expr({:fragment, _, [kw]}, _sources, query, _all_params) + when is_list(kw) or tuple_size(kw) == 3 do + Helpers.error!(query, "ClickHouse adapter does not support keyword or interpolated fragments") + end + + def expr({:fragment, _, parts}, sources, query, all_params) do + {expr, all_params} = + Enum.reduce(parts, {[], all_params}, fn + {:raw, part}, {acc, all_params} -> + {[part | acc], all_params} + + {:expr, expr}, {acc, all_params} -> + {expr, all_params} = expr(expr, sources, query, all_params) + {[expr | acc], all_params} + end) + + {Enum.reverse(expr), all_params} + end + + def expr({fun, _, args}, sources, query, all_params) when is_atom(fun) and is_list(args) do + {modifier, args} = + case args do + [rest, :distinct] -> {"DISTINCT ", [rest]} + _ -> {[], args} + end + + case handle_call(fun, length(args)) do + {:binary_op, op} -> + [left, right] = args + + {left, all_params} = op_to_binary(left, sources, query, all_params) + {right, all_params} = op_to_binary(right, sources, query, all_params) + + xs = [ + left, + op | right + ] + + {xs, all_params} + + {:fun, fun} -> + {exp, all_params} = + Helpers.intersperse_reduce(args, ", ", all_params, &expr(&1, sources, query, &2)) + + {[ + fun, + ?(, + modifier, + exp, + ?) + ], all_params} + end + end + + def expr({:count, _, []}, _sources, _query, all_params), do: {"count(*)", all_params} + + def expr(list, sources, query, all_params) when is_list(list) do + {exp, all_params} = + Helpers.intersperse_reduce(list, ?,, all_params, &expr(&1, sources, query, &2)) + + {["ARRAY[", exp, ?]], all_params} + end + + def expr(%Decimal{} = decimal, _sources, _query, all_params) do + {Decimal.to_string(decimal, :normal), all_params} + end + + def expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, _query, all_params) + when is_binary(binary) do + {["0x", Base.encode16(binary, case: :lower)], all_params} + end + + # def expr(%Ecto.Query.Tagged{value: other, type: {_, field}}, sources, query) do + # # We don't support joins for now. + # {_, name, schema} = elem(sources, 0) + # IO.inspect(["elem(sources, 0)", elem(sources, 0)]) + # IO.inspect(["other", other]) + # type = schema.__schema__(:type, field) + # [?", expr(other, sources, query), " AS ", Helpers.ecto_to_db(type), ")"] + # end + + def expr(%Ecto.Query.Tagged{value: other}, sources, query, all_params) do + # ["CAST(", expr(other, sources, query), " AS ", Helpers.ecto_to_db(type), ")"] + expr(other, sources, query, all_params) + end + + def expr(nil, _sources, _query, all_params), do: {"NULL", all_params} + def expr(true, _sources, _query, all_params), do: {"1", all_params} + def expr(false, _sources, _query, all_params), do: {"0", all_params} + + def expr(s, _s, _q, all_params) when is_binary(s) do + {[?\', String.replace(s, "'", "''"), ?\'], all_params} + end + + def expr(i, _s, _q, all_params) when is_integer(i), do: {Integer.to_string(i), all_params} + def expr(f, _s, _q, all_params) when is_float(f), do: {Float.to_string(f), all_params} + + def interval(count, _interval, sources, query, all_params) do + [expr(count, sources, query, all_params)] + end + + def op_to_binary({op, _, [_, _]} = expr, sources, query, all_params) when op in @binary_ops do + paren_expr(expr, sources, query, all_params) + end + + def op_to_binary(expr, sources, query, all_params) do + expr(expr, sources, query, all_params) + end + + def returning(_returning), do: raise("RETURNING is not supported!") + + def create_names(%{sources: sources}) do + create_names(sources, 0, tuple_size(sources)) |> List.to_tuple() + end + + def create_names(sources, pos, limit) when pos < limit do + [create_name(sources, pos) | create_names(sources, pos + 1, limit)] + end + + def create_names(_sources, pos, pos) do + [[]] + end + + def create_name(sources, pos, as_prefix \\ []) do + case elem(sources, pos) do + {:fragment, _, _} -> + {nil, as_prefix ++ [?f | Integer.to_string(pos)], nil} + + {table, schema, prefix} -> + name = as_prefix ++ [create_alias(table) | Integer.to_string(pos)] + {quote_table(prefix, table), name, schema} + + %Ecto.SubQuery{} -> + {nil, as_prefix ++ [?s | Integer.to_string(pos)], nil} + end + end + + defp quote_table(nil, name), do: quote_table(name) + defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)] + + defp quote_table(name) when is_atom(name), + do: quote_table(Atom.to_string(name)) + + defp quote_table(name) do + if String.contains?(name, "\"") do + raise "bad table name #{inspect(name)}" + end + + [?", name, ?"] + end + + defp create_alias(<>) when first in ?a..?z when first in ?A..?Z do + first + end + + defp create_alias(_) do + ?t + end + + alias Ecto.Query.BooleanExpr + + def param_extractor(for_field, %BooleanExpr{expr: expr}, sources, all_params) do + case expr do + {:and, _, xs} -> + Enum.reduce(xs, all_params, fn expr, all_params -> + param_extractor(for_field, expr, sources, all_params) + end) + + expr -> + param_extractor(for_field, expr, sources, all_params) + end + end + + def param_extractor(for_field, %QueryExpr{expr: expr_limit}, sources, all_params) do + maybe_to_param_name(for_field, expr_limit, sources, all_params) + end + + def param_extractor(for_field, {op, [], [param, _]}, sources, all_params) + when op in @binary_ops do + maybe_to_param_name(for_field, param, sources, all_params) + end + + def param_extractor(_, _, _, all_params), do: all_params + + def maybe_to_param_name(limit_offset, {:^, [], [_idx]}, _sources, all_params) + when limit_offset in [ + :limit, + :offset + ] do + type = :integer + + param = %QueryParam{ + type: type, + field: nil, + name: "#{Atom.to_string(limit_offset)}_#{length(all_params)}", + value: nil, + clickhouse_type: Helpers.ecto_to_db(type) + } + + [param | all_params] + end + + def maybe_to_param_name(:where, {{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, all_params) + when is_atom(field) do + {_, _name, schema} = elem(sources, idx) + + type = schema.__schema__(:type, field) + + param = %QueryParam{ + type: type, + field: field, + name: "#{Atom.to_string(field)}_#{length(all_params)}", + value: nil, + clickhouse_type: Helpers.ecto_to_db(type) + } + + [param | all_params] + end + + def maybe_to_param_name(_, _, _, all_params), do: all_params +end diff --git a/lib/pillar/ecto/query_param.ex b/lib/pillar/ecto/query_param.ex new file mode 100644 index 0000000..e6d7104 --- /dev/null +++ b/lib/pillar/ecto/query_param.ex @@ -0,0 +1,8 @@ +defmodule Pillar.Ecto.QueryParam do + defstruct [:field, :type, :name, :value, :clickhouse_type] +end + +defimpl String.Chars, for: Pillar.Ecto.QueryParam do + def to_string(%{name: name, clickhouse_type: clickhouse_type}), + do: "{#{name}:#{clickhouse_type}}" +end diff --git a/mix.exs b/mix.exs index 106ff4c..08b46a0 100644 --- a/mix.exs +++ b/mix.exs @@ -38,6 +38,8 @@ defmodule Pillar.MixProject do defp deps do [ + {:db_connection, "~> 2.0"}, + {:ecto_sql, "~> 3.9"}, {:jason, ">= 1.0.0"}, {:tesla, "~> 1.4.0"}, {:mint, "~> 1.4"}, diff --git a/mix.lock b/mix.lock index 30a3828..979fb8a 100644 --- a/mix.lock +++ b/mix.lock @@ -2,10 +2,14 @@ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "castore": {:hex, :castore, "0.1.17", "ba672681de4e51ed8ec1f74ed624d104c0db72742ea1a5e74edbc770c815182f", [:mix], [], "hexpm", "d9844227ed52d26e7519224525cb6868650c272d4a3d327ce3ca5570c12163f9"}, "certifi": {:hex, :certifi, "2.6.1", "dbab8e5e155a0763eea978c913ca280a6b544bfa115633fa20249c3d396d9493", [:rebar3], [], "hexpm", "524c97b4991b3849dd5c17a631223896272c6b0af446778ba4675a1dff53bb7e"}, + "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "credo": {:hex, :credo, "1.1.5", "caec7a3cadd2e58609d7ee25b3931b129e739e070539ad1a0cd7efeeb47014f4", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d0bbd3222607ccaaac5c0340f7f525c627ae4d7aee6c8c8c108922620c5b6446"}, + "db_connection": {:hex, :db_connection, "2.4.3", "3b9aac9f27347ec65b271847e6baeb4443d8474289bd18c1d6f4de655b70c94d", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c127c15b0fa6cfb32eed07465e05da6c815b032508d4ed7c116122871df73c12"}, "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, "dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"}, "earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"}, + "ecto": {:hex, :ecto, "3.9.4", "3ee68e25dbe0c36f980f1ba5dd41ee0d3eb0873bccae8aeaf1a2647242bffa35", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "de5f988c142a3aa4ec18b85a4ec34a2390b65b24f02385c1144252ff6ff8ee75"}, + "ecto_sql": {:hex, :ecto_sql, "3.9.2", "34227501abe92dba10d9c3495ab6770e75e79b836d114c41108a4bf2ce200ad5", [:mix], [{:db_connection, "~> 2.5 or ~> 2.4.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9.2", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1eb5eeb4358fdbcd42eac11c1fbd87e3affd7904e639d77903c1358b2abd3f70"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"}, "excoveralls": {:hex, :excoveralls, "0.12.3", "2142be7cb978a3ae78385487edda6d1aff0e482ffc6123877bb7270a8ffbcfe0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "568a3e616c264283f5dea5b020783ae40eef3f7ee2163f7a67cbd7b35bcadada"}, @@ -24,6 +28,7 @@ "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, "tesla": {:hex, :tesla, "1.4.0", "1081bef0124b8bdec1c3d330bbe91956648fb008cf0d3950a369cda466a31a87", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.3", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "~> 4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "bf1374a5569f5fca8e641363b63f7347d680d91388880979a33bc12a6eb3e0aa"}, "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, diff --git a/test/pillar/ecto/repo_test.exs b/test/pillar/ecto/repo_test.exs new file mode 100644 index 0000000..32ab0f4 --- /dev/null +++ b/test/pillar/ecto/repo_test.exs @@ -0,0 +1,229 @@ +defmodule StockTrade do + use Ecto.Schema + + @primary_key false + schema "stock_trades" do + field(:ticker, :string) + field(:size, :integer) + field(:price, :float) + field(:time, :utc_datetime) + end +end + +defmodule Repo do + use Ecto.Repo, + adapter: Pillar.Ecto, + loggers: [Ecto.LogEntry], + otp_app: :pillar +end + +defmodule Pillar.Ecto.RepoTest do + use ExUnit.Case + + import Ecto.Query + import Pillar.Ecto.Query + + setup do + defmodule PillarWorker do + use Pillar, + connection_strings: List.wrap(Application.get_env(:pillar, :connection_url)), + name: __MODULE__, + pool_size: 3 + end + + {:ok, _} = PillarWorker.start_link() + + create_table_sql = """ + CREATE TABLE stock_trades ( + ticker LowCardinality(String), + price Float64, + size UInt32, + time DateTime + ) ENGINE = MergeTree + Order By(ticker, time) + """ + + insert_query_sql = """ + INSERT INTO stock_trades + (ticker, price, size, time) + VALUES + ('SPY', 410.12, 10, '2023-04-01 16:04:55'), + ('SPY', 410.11, 4, '2023-04-01 16:04:54'), + ('SPY', 410.11, 7, '2023-04-01 16:04:54'), + ('SPY', 410.11, 6, '2023-04-01 16:04:54'), + ('JPM', 120.12, 1, '2023-04-01 16:04:53'), + ('JPM', 121.32, 10, '2023-04-01 16:04:53'), + ('JPM', 123.01, 45, '2023-04-01 16:04:52'), + ('INTC', 29.59, 22, '2023-04-01 16:04:51') + """ + + assert PillarWorker.query("drop table if exists stock_trades") == {:ok, ""} + assert PillarWorker.query(create_table_sql) == {:ok, ""} + assert PillarWorker.query(insert_query_sql) == {:ok, ""} + + Repo.start_link() + + :ok + end + + test "can select all rows" do + trades = Repo.all(StockTrade) + + assert length(trades) == 8 + end + + test "can select a single row" do + trade = Repo.one(from(st in StockTrade, limit: 1)) + + refute is_nil(trade) + end + + test "can group by ticker & count rows" do + data = + from( + st in StockTrade, + select: %{ + ticker: st.ticker, + total: count(st.ticker), + max_size: max(st.size), + min_size: min(st.size) + }, + group_by: st.ticker + ) + |> Repo.all() + + [ + %{max_size: 10, min_size: 4, ticker: "SPY", total: 4}, + %{max_size: 22, min_size: 22, ticker: "INTC", total: 1}, + %{max_size: 45, min_size: 1, ticker: "JPM", total: 3} + ] = data + end + + test "can filter with dynamic data" do + dynamic_ticker = "AAPL" + dynamic_size = 10 + + res = + from(st in StockTrade, where: st.ticker == ^dynamic_ticker, where: st.size >= ^dynamic_size) + |> Repo.all() + + assert Enum.empty?(res) + + res = + from(st in StockTrade, where: st.ticker == ^dynamic_ticker and st.size >= ^dynamic_size) + |> Repo.all() + + assert Enum.empty?(res) + end + + test "can filter with fixed values" do + res = from(st in StockTrade, where: st.ticker == "INTC") |> Repo.all() + refute Enum.empty?(res) + + res = from(st in StockTrade, where: st.ticker != "AAPL") |> Repo.all() + refute Enum.empty?(res) + + res = + from(st in StockTrade, where: st.price >= 400.0, select: st.ticker) + |> Repo.all() + |> Enum.uniq() + + assert res == ["SPY"] + end + + test "order by" do + res = + from( + st in StockTrade, + select: st.ticker, + order_by: [desc: st.price], + limit: 1 + ) + |> Repo.one() + + assert res == "SPY" + + res = + from( + st in StockTrade, + select: st.ticker, + order_by: [asc: :price], + limit: 1 + ) + |> Repo.one() + + assert res == "INTC" + + res = + from( + st in StockTrade, + select: %{ + ticker: st.ticker, + vol: sum(st.size) + }, + group_by: st.ticker, + order_by: [desc: sum(st.size)], + limit: 1 + ) + |> Repo.one() + + assert res == %{ticker: "JPM", vol: 56} + + order_field = :price + + res = + from( + st in StockTrade, + select: st.ticker, + order_by: [asc: ^order_field], + limit: 1 + ) + |> Repo.one() + + assert res == "INTC" + end + + test "fragment - sumIf" do + res = + from(st in StockTrade, + select: %{ + res: fragment("sumIf(?, ? = 'JPM')", st.size, st.ticker) + } + ) + |> Repo.all() + + assert res == [%{res: 56}] + end + + test "any" do + res = + from(st in StockTrade, + select: %{ + any_ch: any_ch(st.size), + anyHeavy: anyHeavy(st.size), + anyLast: anyLast(st.size), + ticker: st.ticker + }, + group_by: st.ticker + ) + |> Repo.all() + + assert length(res) == 3 + end + + test "limit" do + res = from(st in StockTrade, limit: 1) |> Repo.all() + + assert length(res) == 1 + + res = from(st in StockTrade, limit: 2) |> Repo.all() + + assert length(res) == 2 + + limit = 3 + + res = from(st in StockTrade, limit: ^limit) |> Repo.all() + + assert length(res) == 3 + end +end