diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index d4ea7bd..a1fd5c1 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -47,7 +47,7 @@ index 50b5b41a..535b7a0e 100755 # Bring up more containers -docker-compose up --no-recreate -d schemaregistry connect control-center -+# FIXME ++# PATCHED +# docker-compose up --no-recreate -d schemaregistry connect control-center +docker-compose up --no-recreate -d schemaregistry @@ -59,7 +59,7 @@ index 50b5b41a..535b7a0e 100755 -MAX_WAIT=240 -echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start" -retry $MAX_WAIT host_check_up connect || exit 1 -+# FIXME ++# PATCHED +# MAX_WAIT=240 +# echo -e "\nWaiting up to $MAX_WAIT seconds for Connect to start" +# retry $MAX_WAIT host_check_up connect || exit 1 @@ -68,7 +68,7 @@ index 50b5b41a..535b7a0e 100755 -echo -e "\nStart streaming from the Wikipedia SSE source connector:" -${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1 -+# FIXME ++# PATCHED +# echo -e "\nStart streaming from the Wikipedia SSE source connector:" +# ${DIR}/connectors/submit_wikipedia_sse_config.sh || exit 1 @@ -102,7 +102,7 @@ index 50b5b41a..535b7a0e 100755 -echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start" -retry $MAX_WAIT host_check_up control-center || exit 1 +# # Verify Confluent Control Center has started -+# FIXME ++# PATCHED +# MAX_WAIT=300 +# echo +# echo "Waiting up to $MAX_WAIT seconds for Confluent Control Center to start" @@ -120,7 +120,7 @@ index 50b5b41a..535b7a0e 100755 -# Start more containers -docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy -+# FIXME ++# PATCHED +# # Start more containers +# docker-compose up --no-recreate -d ksqldb-server ksqldb-cli restproxy @@ -149,7 +149,7 @@ index 50b5b41a..535b7a0e 100755 -echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1" -${DIR}/consumers/listen_WIKIPEDIANOBOT.sh -${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh -+# FIXME ++# PATCHED +# echo -e "\nStart additional consumers to read from topics WIKIPEDIANOBOT, WIKIPEDIA_COUNT_GT_1" +# ${DIR}/consumers/listen_WIKIPEDIANOBOT.sh +# ${DIR}/consumers/listen_WIKIPEDIA_COUNT_GT_1.sh diff --git a/lib/avrora/codec.ex b/lib/avrora/codec.ex index 48b8999..1d8c288 100644 --- a/lib/avrora/codec.ex +++ b/lib/avrora/codec.ex @@ -74,6 +74,6 @@ defmodule Avrora.Codec do <<72, 48, 48, 48, 48, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48, 48, 45, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 123, 20, 174, 71, 225, 250, 47, 64>> """ - @callback encode(payload :: binary() | map(), options :: keyword(Avrora.Schema.t())) :: + @callback encode(payload :: term(), options :: keyword(Avrora.Schema.t())) :: {:ok, result :: binary()} | {:error, reason :: term()} end diff --git a/lib/avrora/codec/object_container_file.ex b/lib/avrora/codec/object_container_file.ex index 6c68f8e..764476e 100644 --- a/lib/avrora/codec/object_container_file.ex +++ b/lib/avrora/codec/object_container_file.ex @@ -49,7 +49,7 @@ defmodule Avrora.Codec.ObjectContainerFile do end @impl true - def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do + def encode(payload, schema: schema) do with {:ok, schema} <- resolve(schema), {:ok, body} <- Codec.Plain.encode(payload, schema: schema), {:ok, schema} <- SchemaEncoder.to_erlavro(schema) do diff --git a/lib/avrora/codec/plain.ex b/lib/avrora/codec/plain.ex index a2f9140..748fc3a 100644 --- a/lib/avrora/codec/plain.ex +++ b/lib/avrora/codec/plain.ex @@ -26,7 +26,7 @@ defmodule Avrora.Codec.Plain do end @impl true - def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do + def encode(payload, schema: schema) do with {:ok, schema} <- resolve(schema), do: do_encode(payload, schema) end diff --git a/lib/avrora/codec/schema_registry.ex b/lib/avrora/codec/schema_registry.ex index 6fcb48f..b3cf8eb 100644 --- a/lib/avrora/codec/schema_registry.ex +++ b/lib/avrora/codec/schema_registry.ex @@ -66,7 +66,7 @@ defmodule Avrora.Codec.SchemaRegistry do end @impl true - def encode(payload, schema: schema) when is_binary(payload) or is_map(payload) do + def encode(payload, schema: schema) do with {:ok, schema} <- resolve(schema) do schema = if is_nil(schema.id), do: {:error, :invalid_schema_id}, else: {:ok, schema} diff --git a/lib/avrora/schema/encoder.ex b/lib/avrora/schema/encoder.ex index 8963ffd..f4226c5 100644 --- a/lib/avrora/schema/encoder.ex +++ b/lib/avrora/schema/encoder.ex @@ -8,6 +8,7 @@ defmodule Avrora.Schema.Encoder do alias Avrora.Schema.ReferenceCollector @type reference_lookup_fun :: (String.t() -> {:ok, String.t()} | {:error, term()}) + @undefined_name :undefined @reference_lookup_fun &__MODULE__.reference_lookup/1 @doc """ @@ -20,23 +21,27 @@ defmodule Avrora.Schema.Encoder do iex> schema.full_name "io.acme.Payment" """ - @spec from_json(String.t(), reference_lookup_fun) :: {:ok, Schema.t()} | {:error, term()} - def from_json(payload, reference_lookup_fun \\ @reference_lookup_fun) when is_binary(payload) do + @spec from_json(String.t()) :: {:ok, Schema.t()} | {:error, term()} + def from_json(definition), + do: from_json(definition, name: @undefined_name, reference_lookup_fun: @reference_lookup_fun) + + def from_json(definition, name: name), + do: from_json(definition, name: name, reference_lookup_fun: @reference_lookup_fun) + + def from_json(definition, reference_lookup_fun: reference_lookup_fun), + do: from_json(definition, name: @undefined_name, reference_lookup_fun: reference_lookup_fun) + + @spec from_json(String.t(), name: :undefined | String.t(), reference_lookup_fun: reference_lookup_fun()) :: + {:ok, Schema.t()} | {:error, term()} + def from_json(definition, name: name, reference_lookup_fun: reference_lookup_fun) do lookup_table = ets().new() - with {:ok, [schema | _]} <- parse_recursive(payload, lookup_table, reference_lookup_fun), - {:ok, full_name} <- extract_full_name(schema), - {:ok, schema} <- do_compile(full_name, lookup_table) do - { - :ok, - %Schema{ - id: nil, - version: nil, - full_name: full_name, - lookup_table: lookup_table, - json: to_json(schema) - } - } + with {:ok, full_name} <- parse_recursive(definition, name, lookup_table, reference_lookup_fun), + {:ok, erlavro} <- do_expand(full_name, lookup_table) do + # NOTE: It could be that json field will be moved to be a method because of + # schema registry support of references. OR we should care about how + # to calculate it + {:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: to_json(erlavro)}} else {:error, reason} -> true = :ets.delete(lookup_table) @@ -70,22 +75,13 @@ defmodule Avrora.Schema.Encoder do "io.acme.Payment" """ @spec from_erlavro(term(), keyword()) :: {:ok, Schema.t()} | {:error, term()} - def from_erlavro(schema, attributes \\ []) do + def from_erlavro(erlavro, attributes \\ []) do lookup_table = ets().new() - with {:ok, full_name} <- extract_full_name(schema), - lookup_table <- :avro_schema_store.add_type(schema, lookup_table), - json <- Keyword.get_lazy(attributes, :json, fn -> to_json(schema) end) do - { - :ok, - %Schema{ - id: nil, - version: nil, - full_name: full_name, - lookup_table: lookup_table, - json: json - } - } + with {:ok, full_name} <- extract_full_name(erlavro), + lookup_table <- :avro_schema_store.add_type(erlavro, lookup_table), + json <- Keyword.get_lazy(attributes, :json, fn -> to_json(erlavro) end) do + {:ok, %Schema{full_name: full_name, lookup_table: lookup_table, json: json}} else {:error, reason} -> true = :ets.delete(lookup_table) @@ -108,38 +104,34 @@ defmodule Avrora.Schema.Encoder do """ @spec to_erlavro(Schema.t()) :: {:ok, term()} | {:error, term()} def to_erlavro(%Schema{} = schema), - do: do_compile(schema.full_name, schema.lookup_table) - - defp to_json(schema), do: :avro_json_encoder.encode_type(schema) - - defp parse_recursive(payload, lookup_table, reference_lookup_fun) do - with {:ok, schema} <- do_parse(payload), - {:ok, _} <- extract_full_name(schema), - {:ok, references} <- ReferenceCollector.collect(schema), - lookup_table <- :avro_schema_store.add_type(schema, lookup_table) do - payloads = - references - |> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table)) - |> Enum.map(fn reference -> - reference |> reference_lookup_fun.() |> unwrap!() - end) - - schemas = - Enum.flat_map(payloads, fn payload -> - payload |> parse_recursive(lookup_table, reference_lookup_fun) |> unwrap!() - end) - - {:ok, [schema | schemas]} + do: do_expand(schema.full_name, schema.lookup_table) + + defp to_json(erlavro), do: :avro_json_encoder.encode_type(erlavro) + + defp unwrap!({:ok, result}), do: result + defp unwrap!({:error, error}), do: throw(error) + + defp parse_recursive(definition, name, lookup_table, reference_lookup_fun) do + with {:ok, erlavro} <- do_parse(definition), + {:ok, references} <- ReferenceCollector.collect(erlavro), + {:ok, full_name} <- do_add_type(name, erlavro, lookup_table) do + references + |> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table)) + |> Enum.each(fn reference_name -> + reference_lookup_fun.(reference_name) + |> unwrap!() + |> parse_recursive(reference_name, lookup_table, reference_lookup_fun) + |> unwrap!() + end) + + {:ok, full_name} end catch error -> {:error, error} end - defp unwrap!({:ok, result}), do: result - defp unwrap!({:error, error}), do: throw(error) - - defp extract_full_name(schema) do - case schema do + defp extract_full_name(erlavro) do + case erlavro do {:avro_fixed_type, _, _, _, _, full_name, _} -> {:ok, full_name} {:avro_enum_type, _, _, _, _, _, full_name, _} -> {:ok, full_name} {:avro_record_type, _, _, _, _, _, full_name, _} -> {:ok, full_name} @@ -147,9 +139,7 @@ defmodule Avrora.Schema.Encoder do end end - # Compile complete version of the `erlavro` format with all references - # being resolved, converting errors to error return - defp do_compile(full_name, lookup_table) do + defp do_expand(full_name, lookup_table) do {:ok, :avro_util.expand_type(full_name, lookup_table)} rescue _ in MatchError -> {:error, :bad_reference} @@ -165,5 +155,15 @@ defmodule Avrora.Schema.Encoder do error in ErlangError -> {:error, error.original} end + defp do_add_type(name, erlavro, lookup_table) do + name = if :avro.is_named_type(erlavro), do: :undefined, else: name + full_name = if :avro.is_named_type(erlavro), do: :avro.get_type_fullname(erlavro), else: name + + :avro_schema_store.add_type(name, erlavro, lookup_table) + {:ok, full_name} + rescue + error in ErlangError -> {:error, error.original} + end + defp ets, do: Config.self().ets_lib() end diff --git a/lib/avrora/storage/file.ex b/lib/avrora/storage/file.ex index c63ff52..1634e9a 100644 --- a/lib/avrora/storage/file.ex +++ b/lib/avrora/storage/file.ex @@ -38,10 +38,13 @@ defmodule Avrora.Storage.File do """ @impl true def get(key) when is_binary(key) do - with {:ok, body} <- read_schema_file_by_name(key), - do: SchemaEncoder.from_json(body, &read_schema_file_by_name/1) + with {:ok, schema_name} <- Name.parse(key), + {:ok, body} <- read_schema_file_by_name(key) do + SchemaEncoder.from_json(body, name: schema_name.name, reference_lookup_fun: &read_schema_file_by_name/1) + end end + @impl true def get(key) when is_integer(key), do: {:error, :unsupported} @impl true diff --git a/lib/avrora/storage/registry.ex b/lib/avrora/storage/registry.ex index 86ec370..66c0a3c 100644 --- a/lib/avrora/storage/registry.ex +++ b/lib/avrora/storage/registry.ex @@ -40,7 +40,8 @@ defmodule Avrora.Storage.Registry do {:ok, version} <- Map.fetch(response, "version"), {:ok, schema} <- Map.fetch(response, "schema"), {:ok, references} <- extract_references(response), - {:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do + {:ok, schema} <- + SchemaEncoder.from_json(schema, name: name, reference_lookup_fun: make_reference_lookup_fun(references)) do Logger.debug("obtaining schema `#{schema_name.name}` with version `#{version}`") {:ok, %{schema | id: id, version: version}} @@ -51,10 +52,18 @@ defmodule Avrora.Storage.Registry do with {:ok, response} <- http_client_get("schemas/ids/#{key}"), {:ok, schema} <- Map.fetch(response, "schema"), {:ok, references} <- extract_references(response), - {:ok, schema} <- SchemaEncoder.from_json(schema, make_reference_lookup_fun(references)) do - Logger.debug("obtaining schema with global id `#{key}`") - - {:ok, %{schema | id: key}} + # TODO: Add note in the readme that: + # There is no such endpoint in registry versions < 5.5.0 + {:ok, response} <- http_client_get("schemas/ids/#{key}/versions"), + {:ok, schema_name} <- extract_name(response), + {:ok, schema} <- + SchemaEncoder.from_json(schema, + name: schema_name.name, + reference_lookup_fun: make_reference_lookup_fun(references) + ) do + Logger.debug("obtaining schema and version with global id `#{key}`") + + {:ok, %{schema | id: key, version: schema_name.version}} end end @@ -110,6 +119,13 @@ defmodule Avrora.Storage.Registry do error -> {:error, error} end + defp extract_name(response) do + case response do + [%{"subject" => name, "version" => version}] -> {:ok, %Name{name: name, version: version}} + _ -> {:error, :invalid_versions} + end + end + defp make_reference_lookup_fun(map) when map_size(map) == 0, do: &SchemaEncoder.reference_lookup/1 diff --git a/test/avrora/codec/plain_test.exs b/test/avrora/codec/plain_test.exs index 0afe8b0..e8068c2 100644 --- a/test/avrora/codec/plain_test.exs +++ b/test/avrora/codec/plain_test.exs @@ -259,6 +259,42 @@ defmodule Avrora.Codec.PlainTest do assert encoded == "59B02128" end + + test "when payload is matching the Union schema and schema is resolvable" do + union_schema = union_schema() + + Avrora.Storage.MemoryMock + |> expect(:get, fn key -> + assert key == "io.acme.Union" + + {:ok, nil} + end) + |> expect(:put, fn key, value -> + assert key == "io.acme.Union" + assert value == union_schema + + {:ok, value} + end) + + Avrora.Storage.RegistryMock + |> expect(:put, fn key, value -> + assert key == "io.acme.Union" + assert value == union_json() + + {:error, :unconfigured_registry_url} + end) + + Avrora.Storage.FileMock + |> expect(:get, fn key -> + assert key == "io.acme.Union" + + {:ok, union_schema} + end) + + {:ok, encoded} = Codec.Plain.encode(123, schema: %Schema{full_name: "io.acme.Union"}) + + assert encoded == <<0, 246, 1>> + end end defp missing_field_error do @@ -306,6 +342,11 @@ defmodule Avrora.Codec.PlainTest do %{schema | id: nil, version: nil} end + defp union_schema do + {:ok, schema} = Schema.Encoder.from_json(union_json(), name: "io.acme.Union") + %{schema | id: nil, version: nil} + end + defp payment_json do ~s({"namespace":"io.acme","name":"Payment","type":"record","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}) end @@ -329,4 +370,6 @@ defmodule Avrora.Codec.PlainTest do defp fixed_json do ~s({"namespace":"io.acme","name":"CRC32","type":"fixed","size":8}) end + + defp union_json, do: ~s(["int","string"]) end diff --git a/test/avrora/schema/encoder_test.exs b/test/avrora/schema/encoder_test.exs index 60562f8..1f03fc5 100644 --- a/test/avrora/schema/encoder_test.exs +++ b/test/avrora/schema/encoder_test.exs @@ -7,7 +7,112 @@ defmodule Avrora.Schema.EncoderTest do setup :support_config + describe "xxx/2" do + test "when schema is Enum type" do + {:ok, schema} = Schema.Encoder.xxx(card_type_schema()) + {:ok, {type, _, _, _, _, fields, full_name, _}} = Schema.Encoder.to_erlavro(schema) + + assert type == :avro_enum_type + assert full_name == "io.confluent.CardType" + assert length(fields) == 3 + + assert schema.full_name == "io.confluent.CardType" + assert schema.json == card_type_json() + end + + test "when payload is Fixed type" do + {:ok, schema} = Schema.Encoder.xxx(crc32_schema()) + {:ok, {type, _, _, _, value, full_name, _}} = Schema.Encoder.to_erlavro(schema) + + assert type == :avro_fixed_type + assert full_name == "io.confluent.CRC32" + assert value == 8 + + assert schema.full_name == "io.confluent.CRC32" + assert schema.json == crc32_json() + end + + test "when schema is Record type with primitive fields types" do + {:ok, schema} = Schema.Encoder.xxx(payment_schema()) + {:ok, {type, _, _, _, _, fields, full_name, _}} = Schema.Encoder.to_erlavro(schema) + + assert type == :avro_record_type + assert full_name == "io.confluent.Payment" + assert length(fields) == 2 + + assert schema.full_name == "io.confluent.Payment" + assert schema.json == payment_json() + end + + test "when schema is Record type with nested type ref" do + {:ok, schema} = + Schema.Encoder.xxx(message_with_reference_schema(), fn name -> + case name do + "io.confluent.Attachment" -> {:ok, attachment_schema()} + "io.confluent.Signature" -> {:ok, signature_schema()} + _ -> raise "unknown reference name!" + end + end) + + {:ok, {type, _, _, _, _, fields, full_name, _}} = Schema.Encoder.to_erlavro(schema) + + assert type == :avro_record_type + assert full_name == "io.confluent.Message" + assert length(fields) == 2 + + assert schema.full_name == "io.confluent.Message" + assert schema.json == message_json() + + {:avro_record_field, _, _, body_type, _, _, _} = List.first(fields) + assert body_type == {:avro_primitive_type, "string", []} + + {:avro_record_field, _, _, attachments_type, _, _, _} = List.last(fields) + {:avro_array_type, {type, _, _, _, _, fields, full_name, _}, []} = attachments_type + + assert type == :avro_record_type + assert full_name == "io.confluent.Attachment" + assert length(fields) == 2 + + {:avro_record_field, _, _, signature_type, _, _, _} = List.last(fields) + {type, _, _, _, _, fields, full_name, _} = signature_type + + assert type == :avro_record_type + assert full_name == "io.confluent.Signature" + assert length(fields) == 1 + end + + test "when schema is Record type with type ref of invalid schema" do + result = + Schema.Encoder.xxx(message_with_reference_schema(), fn name -> + assert name == "io.confluent.Attachment" + {:ok, %Schema{full_name: "io.confluent.Attachment", source: "{}"}} + end) + + assert {:error, {:not_found, "type"}} == result + end + + test "when schema is Record type with type ref and resolution failed" do + result = + Schema.Encoder.xxx(message_with_reference_schema(), fn name -> + assert name == "io.confluent.Attachment" + {:error, :bad_thing_happen} + end) + + assert {:error, :bad_thing_happen} == result + end + + test "when schema is Record type with type ref and lookup function given" do + assert {:error, {:not_found, "type"}} == Schema.Encoder.xxx(message_with_reference_schema()) + end + + test "when schema is an invalid" do + assert Schema.Encoder.xxx(%Schema{full_name: "", source: "a:b"}) == {:error, "argument error"} + assert Schema.Encoder.xxx(%Schema{full_name: "", source: "{}"}) == {:error, {:not_found, "type"}} + end + end + describe "from_json/2" do + @tag :skip test "when payload is a valid Record json schema" do {:ok, schema} = Schema.Encoder.from_json(payment_json()) {:ok, {type, _, _, _, _, fields, full_name, _}} = Schema.Encoder.to_erlavro(schema) @@ -162,6 +267,30 @@ defmodule Avrora.Schema.EncoderTest do end end + defp payment_schema do + %Schema{full_name: "io.confluent.Payment", source: payment_json()} + end + + defp message_with_reference_schema do + %Schema{full_name: "io.confluent.Message", source: message_with_reference_json()} + end + + defp attachment_schema do + %Schema{full_name: "io.confluent.Attachment", source: attachment_json()} + end + + defp signature_schema do + %Schema{full_name: "io.confluent.Signature", source: signature_json()} + end + + defp card_type_schema do + %Schema{full_name: "io.confluent.CardType", source: card_type_json()} + end + + defp crc32_schema do + %Schema{full_name: "io.confluent.CRC32", source: crc32_json()} + end + defp payment_erlavro do {:avro_record_type, "Payment", "io.acme", "", [], [ diff --git a/test/avrora/storage/file_test.exs b/test/avrora/storage/file_test.exs index 4f25a39..6519a1b 100644 --- a/test/avrora/storage/file_test.exs +++ b/test/avrora/storage/file_test.exs @@ -15,6 +15,14 @@ defmodule Avrora.Storage.FileTest do assert schema.full_name == "io.acme.Payment" end + test "when schema file contains unnamed type" do + {:ok, schema} = File.get("io.acme.Primitive") + assert schema.full_name == "io.acme.Primitive" + + {:ok, schema} = File.get("io.acme.Union") + assert schema.full_name == "io.acme.Union" + end + test "when schema file contains named type with nested references" do output = capture_log(fn -> @@ -34,7 +42,7 @@ defmodule Avrora.Storage.FileTest do assert length(Regex.scan(~r/reading schema .*`/, output)) == 7 end - test "when schema name contains version and when schema file was found" do + test "when schema name contains version" do output = capture_log(fn -> {:ok, schema} = File.get("io.acme.Payment:42") diff --git a/test/avrora/storage/registry_test.exs b/test/avrora/storage/registry_test.exs index 982882e..8807ee9 100644 --- a/test/avrora/storage/registry_test.exs +++ b/test/avrora/storage/registry_test.exs @@ -136,6 +136,29 @@ defmodule Avrora.Storage.RegistryTest do assert schema.full_name == "io.acme.Payment" end + test "when requesting by subject name and schema is unnamed type" do + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/subjects/io.acme.Primitive/versions/latest" + + { + :ok, + %{ + "id" => 42, + "version" => 1, + "schema" => json_schema_unnamed(), + "subject" => "io.acme.Primitive" + } + } + end) + + {:ok, schema} = Registry.get("io.acme.Primitive") + + assert schema.id == 42 + assert schema.version == 1 + assert schema.full_name == "io.acme.Primitive" + end + test "when requesting by subject name failed" do Avrora.HTTPClientMock |> expect(:get, fn url, _ -> @@ -155,10 +178,17 @@ defmodule Avrora.Storage.RegistryTest do {:ok, %{"schema" => json_schema()}} end) + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/1/versions" + + {:ok, [%{"version" => 3, "subject" => "io.acme.Payment"}]} + end) + {:ok, schema} = Registry.get(1) assert schema.id == 1 - assert is_nil(schema.version) + assert schema.version == 3 assert schema.full_name == "io.acme.Payment" end @@ -173,14 +203,22 @@ defmodule Avrora.Storage.RegistryTest do {:ok, %{"schema" => json_schema()}} end) + Avrora.HTTPClientMock + |> expect(:get, fn url, options -> + assert url == "http://reg.loc/schemas/ids/1/versions" + assert Keyword.fetch!(options, :authorization) == "Basic bG9naW46cGFzc3dvcmQ=" + + {:ok, [%{"version" => 3, "subject" => "io.acme.Payment"}]} + end) + {:ok, schema} = Registry.get(1) assert schema.id == 1 - assert is_nil(schema.version) + assert schema.version == 3 assert schema.full_name == "io.acme.Payment" end - test "when requesting by global ID was failed" do + test "when requesting by global ID failed" do Avrora.HTTPClientMock |> expect(:get, fn url, _ -> assert url == "http://reg.loc/schemas/ids/1" @@ -225,14 +263,61 @@ defmodule Avrora.Storage.RegistryTest do } end) + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/43/versions" + + {:ok, [%{"version" => 13, "subject" => "io.acme.Account"}]} + end) + {:ok, schema} = Registry.get(43) assert schema.id == 43 - assert is_nil(schema.version) + assert schema.version == 13 assert schema.full_name == "io.acme.Account" assert schema.json == json_schema_with_reference_denormalized() end + test "when requesting by global ID and schema is unnamed type" do + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/42" + + {:ok, %{"schema" => json_schema_unnamed(), "subject" => "io.acme.Primitive"}} + end) + + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/42/versions" + + {:ok, [%{"version" => 1, "subject" => "io.acme.Primitive"}]} + end) + + {:ok, schema} = Registry.get(42) + + assert schema.id == 42 + assert schema.version == 1 + assert schema.full_name == "io.acme.Primitive" + end + + test "when requesting by global ID and versions response invalid" do + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/1" + + {:ok, %{"schema" => json_schema()}} + end) + + Avrora.HTTPClientMock + |> expect(:get, fn url, _ -> + assert url == "http://reg.loc/schemas/ids/1/versions" + + {:ok, []} + end) + + assert {:error, :invalid_versions} = Registry.get(1) + end + test "when request should not perform SSL verification" do Avrora.HTTPClientMock |> expect(:get, fn url, options -> @@ -242,10 +327,18 @@ defmodule Avrora.Storage.RegistryTest do {:ok, %{"schema" => json_schema()}} end) + Avrora.HTTPClientMock + |> expect(:get, fn url, options -> + assert url == "http://reg.loc/schemas/ids/1/versions" + assert Keyword.fetch!(options, :ssl_options) == [verify: :verify_none] + + {:ok, [%{"version" => 3, "subject" => "io.acme.Payment"}]} + end) + {:ok, schema} = Registry.get(1) assert schema.id == 1 - assert is_nil(schema.version) + assert schema.version == 3 assert schema.full_name == "io.acme.Payment" end @@ -261,6 +354,14 @@ defmodule Avrora.Storage.RegistryTest do {:ok, %{"schema" => json_schema()}} end) + Avrora.HTTPClientMock + |> expect(:get, fn url, options -> + assert url == "http://reg.loc/schemas/ids/1/versions" + assert Keyword.fetch!(options, :ssl_options) == [verify: :verify_peer, cacerts: [<<48, 130, 3, 201>>]] + + {:ok, [%{"version" => 3, "subject" => "io.acme.Payment"}]} + end) + assert :ok == Registry.get(1) |> elem(0) end @@ -275,6 +376,14 @@ defmodule Avrora.Storage.RegistryTest do {:ok, %{"schema" => json_schema()}} end) + Avrora.HTTPClientMock + |> expect(:get, fn url, options -> + assert url == "http://reg.loc/schemas/ids/1/versions" + assert Keyword.fetch!(options, :ssl_options) == [verify: :verify_peer, cacertfile: "path/to/file"] + + {:ok, [%{"version" => 3, "subject" => "io.acme.Payment"}]} + end) + assert :ok == Registry.get(1) |> elem(0) end @@ -417,6 +526,8 @@ defmodule Avrora.Storage.RegistryTest do %{"error_code" => 42201, "message" => "Invalid schema!"} end + defp json_schema_unnamed, do: ~s({"type": "string"}) + defp json_schema do ~s({"namespace":"io.acme","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}) end diff --git a/test/fixtures/schemas/io/acme/Primitive.avsc b/test/fixtures/schemas/io/acme/Primitive.avsc new file mode 100644 index 0000000..c2d48c0 --- /dev/null +++ b/test/fixtures/schemas/io/acme/Primitive.avsc @@ -0,0 +1,3 @@ +{ + "type": "string" +} diff --git a/test/fixtures/schemas/io/acme/Union.avsc b/test/fixtures/schemas/io/acme/Union.avsc new file mode 100644 index 0000000..3e7acfd --- /dev/null +++ b/test/fixtures/schemas/io/acme/Union.avsc @@ -0,0 +1,3 @@ +[ + "null", "int", "string" +]