Skip to content

Commit

Permalink
Add unnamed type handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Strech committed Jul 17, 2024
1 parent 99e6942 commit 7340ef8
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 82 deletions.
12 changes: 6 additions & 6 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ index 50b5b41a..535b7a0e 100755

# Bring up more containers
-docker-compose up --no-recreate -d schemaregistry connect control-center
+# FIXME
+# PATCHED
+# docker-compose up --no-recreate -d schemaregistry connect control-center
+docker-compose up --no-recreate -d schemaregistry

Expand All @@ -59,7 +59,7 @@ index 50b5b41a..535b7a0e 100755
-MAX_WAIT=240
-echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start"
-retry $MAX_WAIT host_check_up connect || exit 1
+# FIXME
+# PATCHED
+# MAX_WAIT=240
+# echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start"
+# retry $MAX_WAIT host_check_up connect || exit 1
Expand All @@ -68,7 +68,7 @@ index 50b5b41a..535b7a0e 100755

-echo -e "\nStart streaming from the Wikipedia SSE source connector:"
-${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1
+# FIXME
+# PATCHED
+# echo -e "\nStart streaming from the Wikipedia SSE source connector:"
+# ${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1

Expand Down Expand Up @@ -102,7 +102,7 @@ index 50b5b41a..535b7a0e 100755
-echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start"
-retry $MAX_WAIT host_check_up control-center || exit 1
+# # Verify Confluent Control Center has started
+# FIXME
+# PATCHED
+# MAX_WAIT=300
+# echo
+# echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start"
Expand All @@ -120,7 +120,7 @@ index 50b5b41a..535b7a0e 100755

-# Start more containers
-docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy
+# FIXME
+# PATCHED
+# # Start more containers
+# docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy

Expand Down Expand Up @@ -149,7 +149,7 @@ index 50b5b41a..535b7a0e 100755
-echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1"
-${DIR}/consumers/listen_WIKIPEDIANOBOT.sh
-${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh
+# FIXME
+# PATCHED
+# echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1"
+# ${DIR}/consumers/listen_WIKIPEDIANOBOT.sh
+# ${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh
Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ defmodule Avrora.Codec do
<<72, 48, 48, 48, 48, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48,
48, 45, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 123, 20, 174, 71, 225, 250, 47, 64>>
"""
@callback encode(payload :: binary() | map(), options :: keyword(Avrora.Schema.t())) ::
@callback encode(payload :: term(), options :: keyword(Avrora.Schema.t())) ::
{:ok, result :: binary()} | {:error, reason :: term()}
end
2 changes: 1 addition & 1 deletion lib/avrora/codec/object_container_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Avrora.Codec.ObjectContainerFile do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema),
{:ok, body} <- Codec.Plain.encode(payload, schema: schema),
{:ok, schema} <- SchemaEncoder.to_erlavro(schema) do
Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec/plain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Avrora.Codec.Plain do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema), do: do_encode(payload, schema)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/avrora/codec/schema_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule Avrora.Codec.SchemaRegistry do
end

@impl true
def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do
def encode(payload, schema: schema) do
with {:ok, schema} <- resolve(schema) do
schema = if is_nil(schema.id), do: {:error, :invalid_schema_id}, else: {:ok, schema}

Expand Down
118 changes: 59 additions & 59 deletions lib/avrora/schema/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Avrora.Schema.Encoder do
alias Avrora.Schema.ReferenceCollector

@type reference_lookup_fun :: (String.t() -> {:ok, String.t()} | {:error, term()})
@undefined_name :undefined
@reference_lookup_fun &__MODULE__.reference_lookup/1

@doc """
Expand All @@ -20,23 +21,27 @@ defmodule Avrora.Schema.Encoder do
iex> schema.full_name
"io.acme.Payment"
"""
@spec from_json(String.t(), reference_lookup_fun) :: {:ok, Schema.t()} | {:error, term()}
def from_json(payload, reference_lookup_fun \\ @reference_lookup_fun) when is_binary(payload) do
@spec from_json(String.t()) :: {:ok, Schema.t()} | {:error, term()}
def from_json(definition),
do: from_json(definition, name: @undefined_name, reference_lookup_fun: @reference_lookup_fun)

def from_json(definition, name: name),
do: from_json(definition, name: name, reference_lookup_fun: @reference_lookup_fun)

def from_json(definition, reference_lookup_fun: reference_lookup_fun),
do: from_json(definition, name: @undefined_name, reference_lookup_fun: reference_lookup_fun)

@spec from_json(String.t(), name: :undefined | String.t(), reference_lookup_fun: reference_lookup_fun()) ::
{:ok, Schema.t()} | {:error, term()}
def from_json(definition, name: name, reference_lookup_fun: reference_lookup_fun) do
lookup_table = ets().new()

with {:ok, [schema | _]} <- parse_recursive(payload, lookup_table, reference_lookup_fun),
{:ok, full_name} <- extract_full_name(schema),
{:ok, schema} <- do_compile(full_name, lookup_table) do
{
:ok,
%Schema{
id: nil,
version: nil,
full_name: full_name,
lookup_table: lookup_table,
json: to_json(schema)
}
}
with {:ok, full_name} <- parse_recursive(definition, name, lookup_table, reference_lookup_fun),
{:ok, erlavro} <- do_expand(full_name, lookup_table) do
# NOTE: It could be that json field will be moved to be a method because of
# schema registry support of references. OR we should care about how
# to calculate it
{:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: to_json(erlavro)}}
else
{:error, reason} ->
true = :ets.delete(lookup_table)
Expand Down Expand Up @@ -70,22 +75,13 @@ defmodule Avrora.Schema.Encoder do
"io.acme.Payment"
"""
@spec from_erlavro(term(), keyword()) :: {:ok, Schema.t()} | {:error, term()}
def from_erlavro(schema, attributes \\ []) do
def from_erlavro(erlavro, attributes \\ []) do
lookup_table = ets().new()

with {:ok, full_name} <- extract_full_name(schema),
lookup_table <- :avro_schema_store.add_type(schema, lookup_table),
json <- Keyword.get_lazy(attributes, :json, fn -> to_json(schema) end) do
{
:ok,
%Schema{
id: nil,
version: nil,
full_name: full_name,
lookup_table: lookup_table,
json: json
}
}
with {:ok, full_name} <- extract_full_name(erlavro),
lookup_table <- :avro_schema_store.add_type(erlavro, lookup_table),
json <- Keyword.get_lazy(attributes, :json, fn -> to_json(erlavro) end) do
{:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: json}}
else
{:error, reason} ->
true = :ets.delete(lookup_table)
Expand All @@ -108,48 +104,42 @@ defmodule Avrora.Schema.Encoder do
"""
@spec to_erlavro(Schema.t()) :: {:ok, term()} | {:error, term()}
def to_erlavro(%Schema{} = schema),
do: do_compile(schema.full_name, schema.lookup_table)

defp to_json(schema), do: :avro_json_encoder.encode_type(schema)

defp parse_recursive(payload, lookup_table, reference_lookup_fun) do
with {:ok, schema} <- do_parse(payload),
{:ok, _} <- extract_full_name(schema),
{:ok, references} <- ReferenceCollector.collect(schema),
lookup_table <- :avro_schema_store.add_type(schema, lookup_table) do
payloads =
references
|> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table))
|> Enum.map(fn reference ->
reference |> reference_lookup_fun.() |> unwrap!()
end)

schemas =
Enum.flat_map(payloads, fn payload ->
payload |> parse_recursive(lookup_table, reference_lookup_fun) |> unwrap!()
end)

{:ok, [schema | schemas]}
do: do_expand(schema.full_name, schema.lookup_table)

defp to_json(erlavro), do: :avro_json_encoder.encode_type(erlavro)

defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: throw(error)

defp parse_recursive(definition, name, lookup_table, reference_lookup_fun) do
with {:ok, erlavro} <- do_parse(definition),
{:ok, references} <- ReferenceCollector.collect(erlavro),
{:ok, full_name} <- do_add_type(name, erlavro, lookup_table) do
references
|> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table))
|> Enum.each(fn reference_name ->
reference_lookup_fun.(reference_name)
|> unwrap!()
|> parse_recursive(reference_name, lookup_table, reference_lookup_fun)
|> unwrap!()
end)

{:ok, full_name}
end
catch
error -> {:error, error}
end

defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: throw(error)

defp extract_full_name(schema) do
case schema do
defp extract_full_name(erlavro) do
case erlavro do
{:avro_fixed_type, _, _, _, _, full_name, _} -> {:ok, full_name}
{:avro_enum_type, _, _, _, _, _, full_name, _} -> {:ok, full_name}
{:avro_record_type, _, _, _, _, _, full_name, _} -> {:ok, full_name}
_ -> {:error, :unnamed_type}
end
end

# Compile complete version of the `erlavro` format with all references
# being resolved, converting errors to error return
defp do_compile(full_name, lookup_table) do
defp do_expand(full_name, lookup_table) do
{:ok, :avro_util.expand_type(full_name, lookup_table)}
rescue
_ in MatchError -> {:error, :bad_reference}
Expand All @@ -165,5 +155,15 @@ defmodule Avrora.Schema.Encoder do
error in ErlangError -> {:error, error.original}
end

defp do_add_type(name, erlavro, lookup_table) do
name = if :avro.is_named_type(erlavro), do: :undefined, else: name
full_name = if :avro.is_named_type(erlavro), do: :avro.get_type_fullname(erlavro), else: name

:avro_schema_store.add_type(name, erlavro, lookup_table)
{:ok, full_name}
rescue
error in ErlangError -> {:error, error.original}
end

defp ets, do: Config.self().ets_lib()
end
7 changes: 5 additions & 2 deletions lib/avrora/storage/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ defmodule Avrora.Storage.File do
"""
@impl true
def get(key) when is_binary(key) do
with {:ok, body} <- read_schema_file_by_name(key),
do: SchemaEncoder.from_json(body, &read_schema_file_by_name/1)
with {:ok, schema_name} <- Name.parse(key),
{:ok, body} <- read_schema_file_by_name(key) do
SchemaEncoder.from_json(body, name: schema_name.name, reference_lookup_fun: &read_schema_file_by_name/1)
end
end

@impl true
def get(key) when is_integer(key), do: {:error, :unsupported}

@impl true
Expand Down
26 changes: 21 additions & 5 deletions lib/avrora/storage/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ defmodule Avrora.Storage.Registry do
{:ok, version} <- Map.fetch(response, "version"),
{:ok, schema} <- Map.fetch(response, "schema"),
{:ok, references} <- extract_references(response),
{:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do
{:ok, schema} <-
SchemaEncoder.from_json(schema, name: name, reference_lookup_fun: make_reference_lookup_fun(references)) do
Logger.debug("obtaining schema `#{schema_name.name}` with version `#{version}`")

{:ok, %{schema | id: id, version: version}}
Expand All @@ -51,10 +52,18 @@ defmodule Avrora.Storage.Registry do
with {:ok, response} <- http_client_get("schemas/ids/#{key}"),
{:ok, schema} <- Map.fetch(response, "schema"),
{:ok, references} <- extract_references(response),
{:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do
Logger.debug("obtaining schema with global id `#{key}`")

{:ok, %{schema | id: key}}
# TODO: Add note in the readme that:
# There is no such endpoint in registry versions < 5.5.0
{:ok, response} <- http_client_get("schemas/ids/#{key}/versions"),
{:ok, schema_name} <- extract_name(response),
{:ok, schema} <-
SchemaEncoder.from_json(schema,
name: schema_name.name,
reference_lookup_fun: make_reference_lookup_fun(references)
) do
Logger.debug("obtaining schema and version with global id `#{key}`")

{:ok, %{schema | id: key, version: schema_name.version}}
end
end

Expand Down Expand Up @@ -110,6 +119,13 @@ defmodule Avrora.Storage.Registry do
error -> {:error, error}
end

defp extract_name(response) do
case response do
[%{"subject" => name, "version" => version}] -> {:ok, %Name{name: name, version: version}}
_ -> {:error, :invalid_versions}
end
end

defp make_reference_lookup_fun(map) when map_size(map) == 0,
do: &SchemaEncoder.reference_lookup/1

Expand Down
43 changes: 43 additions & 0 deletions test/avrora/codec/plain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,42 @@ defmodule Avrora.Codec.PlainTest do

assert encoded == "59B02128"
end

test "when payload is matching the Union schema and schema is resolvable" do
union_schema = union_schema()

Avrora.Storage.MemoryMock
|> expect(:get, fn key ->
assert key == "io.acme.Union"

{:ok, nil}
end)
|> expect(:put, fn key, value ->
assert key == "io.acme.Union"
assert value == union_schema

{:ok, value}
end)

Avrora.Storage.RegistryMock
|> expect(:put, fn key, value ->
assert key == "io.acme.Union"
assert value == union_json()

{:error, :unconfigured_registry_url}
end)

Avrora.Storage.FileMock
|> expect(:get, fn key ->
assert key == "io.acme.Union"

{:ok, union_schema}
end)

{:ok, encoded} = Codec.Plain.encode(123, schema: %Schema{full_name: "io.acme.Union"})

assert encoded == <<0, 246, 1>>
end
end

defp missing_field_error do
Expand Down Expand Up @@ -306,6 +342,11 @@ defmodule Avrora.Codec.PlainTest do
%{schema | id: nil, version: nil}
end

defp union_schema do
{:ok, schema} = Schema.Encoder.from_json(union_json(), name: "io.acme.Union")
%{schema | id: nil, version: nil}
end

defp payment_json do
~s({"namespace":"io.acme","name":"Payment","type":"record","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]})
end
Expand All @@ -329,4 +370,6 @@ defmodule Avrora.Codec.PlainTest do
defp fixed_json do
~s({"namespace":"io.acme","name":"CRC32","type":"fixed","size":8})
end

defp union_json, do: ~s(["int","string"])
end
Loading

0 comments on commit 7340ef8

Please sign in to comment.