Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use decode_rows_cont #9

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 76 additions & 58 deletions lib/ch/row_binary.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ defmodule Ch.RowBinary do
def decode_rows(<<>>, _types), do: []

def decode_rows(<<data::bytes>>, types) do
decode_rows(types, data, [], [], types)
decode_rows(types, data, 1, [], [], types)
end

defp skip_names(<<rest::bytes>>, 0, count), do: decode_types(rest, count, _acc = [])
Expand Down Expand Up @@ -191,7 +191,7 @@ defmodule Ch.RowBinary do

defp decode_types(<<rest::bytes>>, 0, types) do
types = types |> decode_types() |> :lists.reverse()
decode_rows(types, rest, _row = [], _rows = [], types)
decode_rows(types, rest, 1, _row = [], _rows = [], types)
end

defp decode_types(<<size, type::size(size)-bytes, rest::bytes>>, count, acc) do
Expand Down Expand Up @@ -298,133 +298,150 @@ defmodule Ch.RowBinary do
raise ArgumentError, "#{type} type is not supported"
end

@compile inline: [decode_string_decode_rows: 5]
@compile inline: [decode_string_decode_rows: 6]

for {pattern, size} <- varints do
defp decode_string_decode_rows(
<<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
types_rest,
row_types,
count,
row,
rows,
types
) do
decode_rows(types_rest, bin, [s | row], rows, types)
decode_rows(row_types, bin, count - 1, [s | row], rows, types)
end
end

@compile inline: [decode_array_decode_rows: 6]
defp decode_array_decode_rows(<<0, bin::bytes>>, _type, types_rest, row, rows, types) do
decode_rows(types_rest, bin, [[] | row], rows, types)
end

@compile inline: [decode_array_decode_rows: 7]
for {pattern, size} <- varints do
defp decode_array_decode_rows(
<<unquote(pattern), bin::bytes>>,
type,
types_rest,
row_types_rest,
count,
row,
rows,
types
) do
array_types = List.duplicate(type, unquote(size))
types_rest = array_types ++ [{:array_over, row} | types_rest]
decode_rows(types_rest, bin, [], rows, types)
row_types = [type, {:array_over, type, count, row} | row_types_rest]
decode_rows(row_types, bin, unquote(size), [], rows, types)
end
end

defp decode_rows([type | types_rest], <<bin::bytes>>, row, rows, types) do
# decoded all rows
defp decode_rows([], <<>>, _count, row, rows, _types) do
:lists.reverse([:lists.reverse(row) | rows])
end

# decoded cell
defp decode_rows([_decoded | types_rest], <<bin::bytes>>, _count = 0, row, rows, types) do
decode_rows(types_rest, bin, _count = 1, row, rows, types)
end

# decoded row
defp decode_rows([], <<bin::bytes>>, _count, row, rows, types) do
decode_rows(types, bin, _count = 1, [], [:lists.reverse(row) | rows], types)
end

defp decode_rows([type | row_types_rest] = row_types, <<bin::bytes>>, count, row, rows, types) do
case type do
:u8 ->
<<u, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:u16 ->
<<u::16-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:u32 ->
<<u::32-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:u64 ->
<<u::64-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:u128 ->
<<u::128-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:u256 ->
<<u::256-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [u | row], rows, types)
decode_rows(row_types, bin, count - 1, [u | row], rows, types)

:i8 ->
<<i::signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:i16 ->
<<i::16-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:i32 ->
<<i::32-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:i64 ->
<<i::64-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:i128 ->
<<i::128-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:i256 ->
<<i::256-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [i | row], rows, types)
decode_rows(row_types, bin, count - 1, [i | row], rows, types)

:f32 ->
case bin do
<<f::32-little-float, bin::bytes>> ->
decode_rows(types_rest, bin, [f | row], rows, types)
decode_rows(row_types, bin, count - 1, [f | row], rows, types)

<<_nan_or_inf::32, bin::bytes>> ->
decode_rows(types_rest, bin, [nil | row], rows, types)
decode_rows(row_types, bin, count - 1, [nil | row], rows, types)
end

:f64 ->
case bin do
<<f::64-little-float, bin::bytes>> ->
decode_rows(types_rest, bin, [f | row], rows, types)
decode_rows(row_types, bin, count - 1, [f | row], rows, types)

<<_nan_or_inf::64, bin::bytes>> ->
decode_rows(types_rest, bin, [nil | row], rows, types)
decode_rows(row_types, bin, count - 1, [nil | row], rows, types)
end

:string ->
decode_string_decode_rows(bin, types_rest, row, rows, types)
decode_string_decode_rows(bin, row_types, count, row, rows, types)

{:string, size} ->
<<s::size(size)-bytes, bin::bytes>> = bin
decode_rows(types_rest, bin, [s | row], rows, types)
decode_rows(row_types, bin, count - 1, [s | row], rows, types)

:boolean ->
case bin do
<<0, bin::bytes>> -> decode_rows(types_rest, bin, [false | row], rows, types)
<<1, bin::bytes>> -> decode_rows(types_rest, bin, [true | row], rows, types)
<<0, bin::bytes>> ->
decode_rows(row_types, bin, count - 1, [false | row], rows, types)

<<1, bin::bytes>> ->
decode_rows(row_types, bin, count - 1, [true | row], rows, types)
end

:uuid ->
<<u1::64-little, u2::64-little, bin::bytes>> = bin
uuid = <<u1::64, u2::64>>
decode_rows(types_rest, bin, [uuid | row], rows, types)
decode_rows(row_types, bin, count - 1, [uuid | row], rows, types)

:date ->
<<d::16-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [Date.add(@epoch_date, d) | row], rows, types)
d = Date.add(@epoch_date, d)
decode_rows(row_types, bin, count - 1, [d | row], rows, types)

:date32 ->
<<d::32-little-signed, bin::bytes>> = bin
decode_rows(types_rest, bin, [Date.add(@epoch_date, d) | row], rows, types)
d = Date.add(@epoch_date, d)
decode_rows(row_types, bin, count - 1, [d | row], rows, types)

{:datetime, timezone} ->
<<s::32-little, bin::bytes>> = bin
Expand All @@ -437,26 +454,36 @@ defmodule Ch.RowBinary do
_ -> s |> DateTime.from_unix!() |> DateTime.shift_zone!(timezone)
end

decode_rows(types_rest, bin, [dt | row], rows, types)
decode_rows(row_types, bin, count - 1, [dt | row], rows, types)

{:decimal, p, s} ->
size = decimal_size(p)
<<val::size(size)-little, bin::bytes>> = bin
sign = if val < 0, do: -1, else: 1
d = Decimal.new(sign, abs(val), -s)
decode_rows(types_rest, bin, [d | row], rows, types)
decode_rows(row_types, bin, count - 1, [d | row], rows, types)

{:nullable, type} ->
case bin do
<<1, bin::bytes>> -> decode_rows(types_rest, bin, [nil | row], rows, types)
<<0, bin::bytes>> -> decode_rows([type | types_rest], bin, row, rows, types)
<<1, bin::bytes>> ->
decode_rows(row_types, bin, count - 1, [nil | row], rows, types)

<<0, bin::bytes>> ->
decode_rows([type | row_types_rest], bin, count, row, rows, types)
end

{:array, type} ->
decode_array_decode_rows(bin, type, types_rest, row, rows, types)

{:array_over, original_row} ->
decode_rows(types_rest, bin, [:lists.reverse(row) | original_row], rows, types)
decode_array_decode_rows(bin, type, row_types_rest, count, row, rows, types)

{:array_over, type, original_count, original_row} ->
decode_rows(
[{:array, type} | row_types_rest],
bin,
original_count - 1,
[:lists.reverse(row) | original_row],
rows,
types
)

{:datetime64, time_unit, timezone} ->
<<s::64-little-signed, bin::bytes>> = bin
Expand All @@ -475,27 +502,18 @@ defmodule Ch.RowBinary do
|> DateTime.shift_zone!(timezone)
end

decode_rows(types_rest, bin, [dt | row], rows, types)
decode_rows(row_types, bin, count - 1, [dt | row], rows, types)

{:enum8, mapping} ->
<<v, bin::bytes>> = bin
decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
decode_rows(row_types, bin, count - 1, [Map.fetch!(mapping, v) | row], rows, types)

{:enum16, mapping} ->
<<v::16-little, bin::bytes>> = bin
decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
decode_rows(row_types, bin, count - 1, [Map.fetch!(mapping, v) | row], rows, types)
end
end

defp decode_rows([], <<>>, row, rows, _types) do
:lists.reverse([:lists.reverse(row) | rows])
end

defp decode_rows([], <<bin::bytes>>, row, rows, types) do
row = :lists.reverse(row)
decode_rows(types, bin, [], [row | rows], types)
end

# TODO eval once, at type decoding
defp decimal_size(p) do
cond do
Expand Down