From 446fef27fb1cdf44561695ae90fb1d91498f903e Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Fri, 30 Aug 2019 21:30:05 -0400 Subject: [PATCH 01/11] Allow specifying protocol for fetch This allows us to specify what format messages we want to receive --- lib/kafka_ex.ex | 8 +- lib/kafka_ex/new/adapter.ex | 89 ++++++++++++------- lib/kafka_ex/protocol/fetch.ex | 6 +- mix.exs | 2 +- mix.lock | 2 +- .../integration/kayrock/record_batch_test.exs | 77 ++++++++++++++++ 6 files changed, 147 insertions(+), 37 deletions(-) create mode 100644 test/integration/kayrock/record_batch_test.exs diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 79af754e..b220684f 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -270,6 +270,11 @@ defmodule KafkaEx do max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) auto_commit = Keyword.get(opts, :auto_commit, true) + # NOTE protocol_version is used by the new client to allow + # compatibility with newer message formats and is ignored by the legacy + # server implementations. + protocol_version = Keyword.get(opts, :protocol_version, 0) + retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) @@ -283,7 +288,8 @@ defmodule KafkaEx do offset: retrieved_offset, wait_time: wait_time, min_bytes: min_bytes, - max_bytes: max_bytes + max_bytes: max_bytes, + protocol_version: protocol_version }}, opts ) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index ad8ed671..f5a2643c 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -125,23 +125,49 @@ defmodule KafkaEx.New.Adapter do end def fetch_request(fetch_request) do - {%Kayrock.Fetch.V0.Request{ - max_wait_time: fetch_request.wait_time, - min_bytes: fetch_request.min_bytes, - replica_id: -1, - topics: [ - %{ - topic: fetch_request.topic, - partitions: [ - %{ - partition: fetch_request.partition, - fetch_offset: fetch_request.offset, - max_bytes: fetch_request.max_bytes - } - ] - } - ] - }, fetch_request.topic, fetch_request.partition} + request = Kayrock.Fetch.get_request_struct(fetch_request.protocol_version) + + partition_request = %{ + partition: fetch_request.partition, + fetch_offset: fetch_request.offset, + max_bytes: fetch_request.max_bytes + } + + partition_request = + if fetch_request.protocol_version >= 5 do + Map.put(partition_request, :log_start_offset, 0) + else + partition_request + end + + request = %{ + request + | max_wait_time: fetch_request.wait_time, + min_bytes: fetch_request.min_bytes, + replica_id: -1, + topics: [ + %{ + topic: fetch_request.topic, + partitions: [partition_request] + } + ] + } + + request = + if fetch_request.protocol_version >= 3 do + %{request | max_bytes: fetch_request.max_bytes} + else + request + end + + request = + if fetch_request.protocol_version >= 4 do + %{request | isolation_level: 0} + else + request + end + + {request, fetch_request.topic, fetch_request.partition} end def fetch_response(fetch_response) do @@ -434,22 +460,21 @@ defmodule KafkaEx.New.Adapter do } end - defp kayrock_message_set_to_kafka_ex( - %Kayrock.RecordBatch{} = record_batch, - topic, - partition - ) do + defp kayrock_message_set_to_kafka_ex(record_batches, topic, partition) + when is_list(record_batches) do messages = - Enum.map(record_batch.records, fn record -> - %FetchMessage{ - attributes: record.attributes, - crc: nil, - key: record.key, - value: record.value, - offset: record.offset, - topic: topic, - partition: partition - } + Enum.flat_map(record_batches, fn record_batch -> + Enum.map(record_batch.records, fn record -> + %FetchMessage{ + attributes: record.attributes, + crc: nil, + key: record.key, + value: record.value, + offset: record.offset, + topic: topic, + partition: partition + } + end) end) case messages do diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 57d435cd..5185acf8 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -17,7 +17,8 @@ defmodule KafkaEx.Protocol.Fetch do wait_time: nil, min_bytes: nil, max_bytes: nil, - auto_commit: nil + auto_commit: nil, + protocol_version: 0 @type t :: %Request{ correlation_id: integer, @@ -27,7 +28,8 @@ defmodule KafkaEx.Protocol.Fetch do offset: integer, wait_time: integer, min_bytes: integer, - max_bytes: integer + max_bytes: integer, + protocol_version: integer } end diff --git a/mix.exs b/mix.exs index 6ec62880..d8f514da 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.0"}, + {:kayrock, "~> 0.1.2"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index 655acfc6..47395f79 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.0", "f8c77c0e380579ac7aafd794def1a2e4bdbd62829128061e8c75931517bdf869", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.2", "bdbd4dbf2ccf707780a19322a723aa6bfbe43687ac7039836629842eb3af320a", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs new file mode 100644 index 00000000..fb13eb24 --- /dev/null +++ b/test/integration/kayrock/record_batch_test.exs @@ -0,0 +1,77 @@ +defmodule KafkaEx.KayrockRecordBatchTest do + @moduledoc """ + Tests for producing/fetching messages using the newer RecordBatch format + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + + @moduletag :new_client + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + + {:ok, pid} = Client.start_link(args, :no_name) + + {:ok, %{client: pid}} + end + + test "can specify protocol version for fetch - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: 0, + auto_commit: false, + worker_name: client, + protocol_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "can specify protocol version for fetch - v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: 0, + auto_commit: false, + worker_name: client, + protocol_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.offset == offset + assert message.value == msg + end +end From 6e917406ed0fff5aaa240695c6ffab7b57922308 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sun, 1 Sep 2019 21:13:58 -0400 Subject: [PATCH 02/11] Fix handling empty message sets --- lib/kafka_ex/new/adapter.ex | 4 ++++ mix.exs | 2 +- mix.lock | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index f5a2643c..b5eb1b42 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -460,6 +460,10 @@ defmodule KafkaEx.New.Adapter do } end + defp kayrock_message_set_to_kafka_ex(nil, _topic, _partition) do + {[], nil} + end + defp kayrock_message_set_to_kafka_ex(record_batches, topic, partition) when is_list(record_batches) do messages = diff --git a/mix.exs b/mix.exs index d8f514da..2762c7cd 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.2"}, + {:kayrock, "~> 0.1.3"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index 47395f79..ead002b4 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.2", "bdbd4dbf2ccf707780a19322a723aa6bfbe43687ac7039836629842eb3af320a", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.3", "22e60cf65cfdcae556c4317c4c709c0b0720a252b477bb882e31dd840e395050", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, From 0f6f04e2896876e1c14dac87497a692dadfd0bed Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sun, 1 Sep 2019 21:17:20 -0400 Subject: [PATCH 03/11] Test to handle empty batches --- .../integration/kayrock/record_batch_test.exs | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index fb13eb24..1bf7ff4c 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -46,6 +46,32 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end + test "empty message set - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset + 5, + auto_commit: false, + worker_name: client, + protocol_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + assert partition_response.message_set == [] + end + test "can specify protocol version for fetch - v5", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -74,4 +100,30 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset assert message.value == msg end + + test "empty message set - v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset + 5, + auto_commit: false, + worker_name: client, + protocol_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + assert partition_response.message_set == [] + end end From ab0bc886580d15b0bca6c7ca1a72a44b9179851e Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sun, 1 Sep 2019 21:49:19 -0400 Subject: [PATCH 04/11] Allow setting protocol version for produce requests --- lib/kafka_ex.ex | 3 +- lib/kafka_ex/new/adapter.ex | 88 +++++++++++++------ lib/kafka_ex/protocol/fetch.ex | 1 + lib/kafka_ex/protocol/produce.ex | 7 +- .../integration/kayrock/record_batch_test.exs | 66 +++++++++++++- 5 files changed, 132 insertions(+), 33 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index b220684f..0c95bc7c 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -375,7 +375,8 @@ defmodule KafkaEx do required_acks: required_acks, timeout: timeout, compression: :none, - messages: [%Message{key: key, value: value}] + messages: [%Message{key: key, value: value}], + protocol_version: Keyword.get(opts, :protocol_version, 0) } produce(produce_request, opts) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index b5eb1b42..80985b01 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -30,6 +30,8 @@ defmodule KafkaEx.New.Adapter do alias Kayrock.MessageSet alias Kayrock.MessageSet.Message + alias Kayrock.RecordBatch + alias Kayrock.RecordBatch.Record def list_offsets_request(topic, partition, time) do time = Offset.parse_time(time) @@ -58,41 +60,66 @@ defmodule KafkaEx.New.Adapter do end) end - def produce_request(request) do - topic = request.topic - partition = request.partition + def produce_request(produce_request) do + topic = produce_request.topic + partition = produce_request.partition + + # TODO HERE need to write tests and probably change how messages are created + # to be per version + message_set = + case produce_request.protocol_version do + v when v <= 2 -> + %MessageSet{ + messages: + Enum.map( + produce_request.messages, + fn msg -> + %Message{ + key: msg.key, + value: msg.value, + compression: produce_request.compression + } + end + ) + } - message_set = %MessageSet{ - messages: - Enum.map( - request.messages, - fn msg -> - %Message{ - key: msg.key, - value: msg.value, - compression: request.compression - } - end - ) - } + _ -> + %RecordBatch{ + attributes: produce_attributes(produce_request), + records: + Enum.map( + produce_request.messages, + fn msg -> + %Record{ + key: msg.key, + value: msg.value + } + end + ) + } + end - request = %Kayrock.Produce.V0.Request{ - acks: request.required_acks, - timeout: request.timeout, - topic_data: [ - %{ - topic: request.topic, - data: [ - %{partition: request.partition, record_set: message_set} - ] - } - ] + request = + Kayrock.Produce.get_request_struct(produce_request.protocol_version) + + request = %{ + request + | acks: produce_request.required_acks, + timeout: produce_request.timeout, + topic_data: [ + %{ + topic: produce_request.topic, + data: [ + %{partition: produce_request.partition, record_set: message_set} + ] + } + ] } {request, topic, partition} end - def produce_response(%Kayrock.Produce.V0.Response{ + def produce_response(%{ responses: [ %{ partition_responses: [ @@ -549,4 +576,9 @@ defmodule KafkaEx.New.Adapter do isrs: partition.isr } end + + # NOTE we don't handle any other attributes here + defp produce_attributes(%{compression: :none}), do: 0 + defp produce_attributes(%{compression: :gzip}), do: 1 + defp produce_attributes(%{compression: :snappy}), do: 2 end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 5185acf8..f1461899 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -18,6 +18,7 @@ defmodule KafkaEx.Protocol.Fetch do min_bytes: nil, max_bytes: nil, auto_commit: nil, + # NOTE protocol_version only used in new client protocol_version: 0 @type t :: %Request{ diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 62163c61..1eb6400c 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -27,7 +27,9 @@ defmodule KafkaEx.Protocol.Produce do required_acks: 0, timeout: 0, compression: :none, - messages: [] + messages: [], + # NOTE protocol_version only used in new client + protocol_version: 0 @type t :: %Request{ topic: binary, @@ -35,7 +37,8 @@ defmodule KafkaEx.Protocol.Produce do required_acks: integer, timeout: integer, compression: atom, - messages: list + messages: list, + protocol_version: integer } end diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 1bf7ff4c..df0bfc3a 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -46,7 +46,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end - test "empty message set - v3", %{client: client} do + test "fetch empty message set - v3", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -72,6 +72,37 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert partition_response.message_set == [] end + # v2 is the highest that will accept the MessageSet format + test "can specify protocol version for produce - v2", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: 0, + auto_commit: false, + worker_name: client, + protocol_version: 2 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + test "can specify protocol version for fetch - v5", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -101,7 +132,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.value == msg end - test "empty message set - v5", %{client: client} do + test "fetch empty message set - v5", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -126,4 +157,35 @@ defmodule KafkaEx.KayrockRecordBatchTest do [partition_response | _] = fetch_response.partitions assert partition_response.message_set == [] end + + # v3 is the lowest that requires the RecordBatch format + test "can specify protocol version for produce - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: 0, + auto_commit: false, + worker_name: client, + protocol_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end end From d29e912a031abb45cb28983d40ccf8113b0020b5 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 3 Sep 2019 21:29:37 -0400 Subject: [PATCH 05/11] Tests for decompression --- lib/kafka_ex.ex | 3 +- mix.exs | 2 +- mix.lock | 2 +- .../integration/kayrock/record_batch_test.exs | 70 +++++++++++++++++-- 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 0c95bc7c..85ba5a1a 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -368,13 +368,14 @@ defmodule KafkaEx do key = Keyword.get(opts, :key, "") required_acks = Keyword.get(opts, :required_acks, 0) timeout = Keyword.get(opts, :timeout, 100) + compression = Keyword.get(opts, :compression, :none) produce_request = %ProduceRequest{ topic: topic, partition: partition, required_acks: required_acks, timeout: timeout, - compression: :none, + compression: compression, messages: [%Message{key: key, value: value}], protocol_version: Keyword.get(opts, :protocol_version, 0) } diff --git a/mix.exs b/mix.exs index 2762c7cd..d545a498 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.3"}, + {:kayrock, "~> 0.1.5"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index ead002b4..ffee3bba 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.3", "22e60cf65cfdcae556c4317c4c709c0b0720a252b477bb882e31dd840e395050", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.5", "df4159d2be8a89c0aabc6d65d2c87a711d3c857fc2d09cafe80755b32f02d15a", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index df0bfc3a..97cb0492 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -32,7 +32,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: 0, + offset: offset, auto_commit: false, worker_name: client, protocol_version: 3 @@ -89,7 +89,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: 0, + offset: offset, auto_commit: false, worker_name: client, protocol_version: 2 @@ -118,7 +118,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: 0, + offset: offset, auto_commit: false, worker_name: client, protocol_version: 5 @@ -175,7 +175,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: 0, + offset: offset, auto_commit: false, worker_name: client, protocol_version: 3 @@ -188,4 +188,66 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.value == msg assert message.offset == offset end + + test "compression - produce v0, read v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + protocol_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset - 2, + auto_commit: false, + worker_name: client, + protocol_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "compression - produce v0, read v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + protocol_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset - 2, + auto_commit: false, + worker_name: client, + protocol_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end end From 7197a909922ff5a2f75de4ee445ea9b9099395ac Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Mon, 9 Sep 2019 22:05:57 -0400 Subject: [PATCH 06/11] Update kayrock, more compression tests --- mix.exs | 2 +- mix.lock | 2 +- .../integration/kayrock/record_batch_test.exs | 256 +++++++++++++++++- 3 files changed, 254 insertions(+), 6 deletions(-) diff --git a/mix.exs b/mix.exs index d545a498..d35b88a0 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.5"}, + {:kayrock, "~> 0.1.7"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index ffee3bba..bfa392b6 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.5", "df4159d2be8a89c0aabc6d65d2c87a711d3c857fc2d09cafe80755b32f02d15a", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.7", "6b04b5181cf358c31ba16807dc2dd8b25e5df442b9190e6762c03f42c6ab545f", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 97cb0492..0c60893a 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -189,7 +189,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end - test "compression - produce v0, read v3", %{client: client} do + test "gzip compression - produce v0, fetch v3", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -206,7 +206,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: offset - 2, + offset: max(offset - 2, 0), auto_commit: false, worker_name: client, protocol_version: 3 @@ -220,7 +220,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end - test "compression - produce v0, read v5", %{client: client} do + test "gzip compression - produce v0, fetch v5", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -237,7 +237,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: offset - 2, + offset: max(offset - 2, 0), auto_commit: false, worker_name: client, protocol_version: 5 @@ -250,4 +250,252 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.value == msg assert message.offset == offset end + + test "gzip compression - produce v3, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v3, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v3, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v0, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + protocol_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v0, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + protocol_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + protocol_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + protocol_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end end From 5cca4eb1ada1e999b59209b28b7f1f88efc906da Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Mon, 9 Sep 2019 22:10:49 -0400 Subject: [PATCH 07/11] Rename 'protocol_version' to 'api_version' --- lib/kafka_ex.ex | 8 +-- lib/kafka_ex/new/adapter.ex | 16 ++--- lib/kafka_ex/protocol/fetch.ex | 65 +++++++++++++------ lib/kafka_ex/protocol/produce.ex | 27 ++++---- .../integration/kayrock/record_batch_test.exs | 56 ++++++++-------- 5 files changed, 99 insertions(+), 73 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 85ba5a1a..f81da6cc 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -270,10 +270,10 @@ defmodule KafkaEx do max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) auto_commit = Keyword.get(opts, :auto_commit, true) - # NOTE protocol_version is used by the new client to allow + # NOTE api_version is used by the new client to allow # compatibility with newer message formats and is ignored by the legacy # server implementations. - protocol_version = Keyword.get(opts, :protocol_version, 0) + api_version = Keyword.get(opts, :api_version, 0) retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) @@ -289,7 +289,7 @@ defmodule KafkaEx do wait_time: wait_time, min_bytes: min_bytes, max_bytes: max_bytes, - protocol_version: protocol_version + api_version: api_version }}, opts ) @@ -377,7 +377,7 @@ defmodule KafkaEx do timeout: timeout, compression: compression, messages: [%Message{key: key, value: value}], - protocol_version: Keyword.get(opts, :protocol_version, 0) + api_version: Keyword.get(opts, :api_version, 0) } produce(produce_request, opts) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 80985b01..2490095e 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -64,10 +64,9 @@ defmodule KafkaEx.New.Adapter do topic = produce_request.topic partition = produce_request.partition - # TODO HERE need to write tests and probably change how messages are created - # to be per version + # TODO refactor, add timestamp tests message_set = - case produce_request.protocol_version do + case produce_request.api_version do v when v <= 2 -> %MessageSet{ messages: @@ -99,8 +98,7 @@ defmodule KafkaEx.New.Adapter do } end - request = - Kayrock.Produce.get_request_struct(produce_request.protocol_version) + request = Kayrock.Produce.get_request_struct(produce_request.api_version) request = %{ request @@ -152,7 +150,7 @@ defmodule KafkaEx.New.Adapter do end def fetch_request(fetch_request) do - request = Kayrock.Fetch.get_request_struct(fetch_request.protocol_version) + request = Kayrock.Fetch.get_request_struct(fetch_request.api_version) partition_request = %{ partition: fetch_request.partition, @@ -161,7 +159,7 @@ defmodule KafkaEx.New.Adapter do } partition_request = - if fetch_request.protocol_version >= 5 do + if fetch_request.api_version >= 5 do Map.put(partition_request, :log_start_offset, 0) else partition_request @@ -181,14 +179,14 @@ defmodule KafkaEx.New.Adapter do } request = - if fetch_request.protocol_version >= 3 do + if fetch_request.api_version >= 3 do %{request | max_bytes: fetch_request.max_bytes} else request end request = - if fetch_request.protocol_version >= 4 do + if fetch_request.api_version >= 4 do %{request | isolation_level: 0} else request diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index f1461899..24731c35 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -18,8 +18,8 @@ defmodule KafkaEx.Protocol.Fetch do min_bytes: nil, max_bytes: nil, auto_commit: nil, - # NOTE protocol_version only used in new client - protocol_version: 0 + # NOTE api_version only used in new client + api_version: 0 @type t :: %Request{ correlation_id: integer, @@ -30,7 +30,7 @@ defmodule KafkaEx.Protocol.Fetch do wait_time: integer, min_bytes: integer, max_bytes: integer, - protocol_version: integer + api_version: integer } end @@ -48,7 +48,13 @@ defmodule KafkaEx.Protocol.Fetch do defmodule Message do @moduledoc false - defstruct attributes: 0, crc: nil, offset: nil, key: nil, value: nil, topic: nil, partition: nil + defstruct attributes: 0, + crc: nil, + offset: nil, + key: nil, + value: nil, + topic: nil, + partition: nil @type t :: %Message{ attributes: integer, @@ -98,18 +104,24 @@ defmodule KafkaEx.Protocol.Fetch do partitions, topic ) do - {:ok, message_set, last_offset} = parse_message_set([], msg_set_data, topic, partition) - - parse_partitions(partitions_size - 1, rest, [ - %{ - partition: partition, - error_code: Protocol.error(error_code), - hw_mark_offset: hw_mark_offset, - message_set: message_set, - last_offset: last_offset - } - | partitions - ], topic) + {:ok, message_set, last_offset} = + parse_message_set([], msg_set_data, topic, partition) + + parse_partitions( + partitions_size - 1, + rest, + [ + %{ + partition: partition, + error_code: Protocol.error(error_code), + hw_mark_offset: hw_mark_offset, + message_set: message_set, + last_offset: last_offset + } + | partitions + ], + topic + ) end defp parse_message_set([], <<>>, _topic, _partition) do @@ -120,10 +132,15 @@ defmodule KafkaEx.Protocol.Fetch do list, <>, - topic, - partition + topic, + partition ) do - {:ok, message} = parse_message(%Message{offset: offset, topic: topic, partition: partition}, msg_data) + {:ok, message} = + parse_message( + %Message{offset: offset, topic: topic, partition: partition}, + msg_data + ) + parse_message_set(append_messages(message, list), rest, topic, partition) end @@ -168,10 +185,16 @@ defmodule KafkaEx.Protocol.Fetch do parse_key(message, rest) end - defp maybe_decompress(%Message{attributes: attributes, topic: topic, partition: partition}, rest) do + defp maybe_decompress( + %Message{attributes: attributes, topic: topic, partition: partition}, + rest + ) do <<-1::32-signed, value_size::32, value::size(value_size)-binary>> = rest decompressed = Compression.decompress(attributes, value) - {:ok, msg_set, _offset} = parse_message_set([], decompressed, topic, partition) + + {:ok, msg_set, _offset} = + parse_message_set([], decompressed, topic, partition) + {:ok, msg_set} end diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 1eb6400c..6c27354d 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -28,8 +28,8 @@ defmodule KafkaEx.Protocol.Produce do timeout: 0, compression: :none, messages: [], - # NOTE protocol_version only used in new client - protocol_version: 0 + # NOTE api_version only used in new client + api_version: 0 @type t :: %Request{ topic: binary, @@ -38,7 +38,7 @@ defmodule KafkaEx.Protocol.Produce do timeout: integer, compression: atom, messages: list, - protocol_version: integer + api_version: integer } end @@ -136,13 +136,18 @@ defmodule KafkaEx.Protocol.Produce do partitions, topic ) do - parse_partitions(partitions_size - 1, rest, [ - %{ - partition: partition, - error_code: Protocol.error(error_code), - offset: offset - } - | partitions - ], topic) + parse_partitions( + partitions_size - 1, + rest, + [ + %{ + partition: partition, + error_code: Protocol.error(error_code), + offset: offset + } + | partitions + ], + topic + ) end end diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 0c60893a..9f5398d0 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset, auto_commit: false, worker_name: client, - protocol_version: 3 + api_version: 3 ) [fetch_response | _] = fetch_responses @@ -64,7 +64,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset + 5, auto_commit: false, worker_name: client, - protocol_version: 3 + api_version: 3 ) [fetch_response | _] = fetch_responses @@ -84,7 +84,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do msg, worker_name: client, required_acks: 1, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -92,7 +92,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset, auto_commit: false, worker_name: client, - protocol_version: 2 + api_version: 2 ) [fetch_response | _] = fetch_responses @@ -121,7 +121,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset, auto_commit: false, worker_name: client, - protocol_version: 5 + api_version: 5 ) [fetch_response | _] = fetch_responses @@ -150,7 +150,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset + 5, auto_commit: false, worker_name: client, - protocol_version: 5 + api_version: 5 ) [fetch_response | _] = fetch_responses @@ -170,7 +170,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do msg, worker_name: client, required_acks: 1, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -178,7 +178,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: offset, auto_commit: false, worker_name: client, - protocol_version: 3 + api_version: 3 ) [fetch_response | _] = fetch_responses @@ -201,7 +201,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :gzip, - protocol_version: 0 + api_version: 0 ) fetch_responses = @@ -209,7 +209,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 3 + api_version: 3 ) [fetch_response | _] = fetch_responses @@ -232,7 +232,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :gzip, - protocol_version: 0 + api_version: 0 ) fetch_responses = @@ -240,7 +240,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 5 + api_version: 5 ) [fetch_response | _] = fetch_responses @@ -263,7 +263,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :gzip, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -271,7 +271,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses @@ -294,7 +294,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :gzip, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -302,7 +302,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses @@ -325,7 +325,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :gzip, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -333,7 +333,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses @@ -356,7 +356,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :snappy, - protocol_version: 0 + api_version: 0 ) fetch_responses = @@ -364,7 +364,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 3 + api_version: 3 ) [fetch_response | _] = fetch_responses @@ -387,7 +387,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :snappy, - protocol_version: 0 + api_version: 0 ) fetch_responses = @@ -395,7 +395,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 5 + api_version: 5 ) [fetch_response | _] = fetch_responses @@ -418,7 +418,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :snappy, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -426,7 +426,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses @@ -449,7 +449,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :snappy, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -457,7 +457,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses @@ -480,7 +480,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do worker_name: client, required_acks: 1, compression: :snappy, - protocol_version: 3 + api_version: 3 ) fetch_responses = @@ -488,7 +488,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do offset: max(offset - 2, 0), auto_commit: false, worker_name: client, - protocol_version: 0 + api_version: 0 ) [fetch_response | _] = fetch_responses From fdd3b5f4e5fb22dff235f8344fefccd887a07900 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Mon, 9 Sep 2019 22:13:12 -0400 Subject: [PATCH 08/11] Document api_version option --- lib/kafka_ex.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index f81da6cc..aaede415 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -246,6 +246,7 @@ defmodule KafkaEx do - min_bytes: minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting wait_time to 100 and setting min_bytes 64000 would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). Default is 1 - max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000 - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. + - api_version: Version of the Fetch API message to send (Kayrock client only, default: 0) ## Example @@ -345,6 +346,7 @@ defmodule KafkaEx do - required_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0 - timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds - compression: specifies the compression type (:none, :snappy, :gzip) + - api_version: Version of the Fetch API message to send (Kayrock client only, default: 0) ## Example From be059353b29cc5ead1e9a0a07bbe09407429b04b Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 10 Sep 2019 21:45:51 -0400 Subject: [PATCH 09/11] Support timestamps in new client compatibility mode --- test/integration/kayrock/timestamp_test.exs | 387 ++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 test/integration/kayrock/timestamp_test.exs diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs new file mode 100644 index 00000000..6242f056 --- /dev/null +++ b/test/integration/kayrock/timestamp_test.exs @@ -0,0 +1,387 @@ +defmodule KafkaEx.KayrockTimestampTest do + @moduledoc """ + Tests for the timestamp functionality in messages + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + alias KafkaEx.New.NodeSelector + alias KafkaEx.TimestampNotSupportedError + + require Logger + + @moduletag :new_client + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + + {:ok, pid} = Client.start_link(args, :no_name) + + {:ok, %{client: pid}} + end + + defp ensure_append_timestamp_topic(client) do + topic_name = "test_log_append_timestamp" + + resp = + Client.send_request( + client, + %Kayrock.CreateTopics.V0.Request{ + create_topic_requests: [ + %{ + topic: topic_name, + num_partitions: 4, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{ + config_name: "message.timestamp.type", + config_value: "LogAppendTime" + } + ] + } + ], + timeout: 1000 + }, + NodeSelector.controller() + ) + + {:ok, + %Kayrock.CreateTopics.V0.Response{ + topic_errors: [%{error_code: error_code}] + }} = resp + + unless error_code in [0, 36] do + Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") + assert false + end + + topic_name + end + + test "fetch timestamp is nil by default on v0 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "fetch timestamp is -1 by default on v3 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == -1 + end + + test "fetch timestamp is -1 by default on v5 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == -1 + end + + test "log with append time - v0", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "log with append time - v3", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + refute is_nil(message.timestamp) + assert message.timestamp > 0 + end + + test "log with append time - v5", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + refute is_nil(message.timestamp) + assert message.timestamp > 0 + end + + test "set timestamp with v0 throws an error", %{client: client} do + topic = "food" + + msg = TestHelper.generate_random_string() + + Process.flag(:trap_exit, true) + + catch_exit do + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 0 + ) + end + + assert_received {:EXIT, ^client, {%TimestampNotSupportedError{}, _}} + end + + test "set timestamp with v1 throws an error", %{client: client} do + topic = "food" + + msg = TestHelper.generate_random_string() + + Process.flag(:trap_exit, true) + + catch_exit do + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 1 + ) + end + + assert_received {:EXIT, ^client, {%TimestampNotSupportedError{}, _}} + end + + test "set timestamp for v3 message, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "set timestamp for v3 message, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == 12345 + end + + test "set timestamp for v3 message, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == 12345 + end +end From 6edad437f1144f75fc11811fd66d05b1356a3818 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 10 Sep 2019 21:46:50 -0400 Subject: [PATCH 10/11] Support timestamps in new client compatibility mode I borked the previous commit --- lib/kafka_ex.ex | 5 +- lib/kafka_ex/exceptions.ex | 4 ++ lib/kafka_ex/new/adapter.ex | 82 ++++++++++++++++++-------------- lib/kafka_ex/protocol/fetch.ex | 7 ++- lib/kafka_ex/protocol/produce.ex | 5 +- mix.exs | 2 +- mix.lock | 2 +- 7 files changed, 65 insertions(+), 42 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index aaede415..d6a5cec0 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -347,6 +347,8 @@ defmodule KafkaEx do - timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds - compression: specifies the compression type (:none, :snappy, :gzip) - api_version: Version of the Fetch API message to send (Kayrock client only, default: 0) + - timestamp: unix epoch timestamp in milliseconds for the message + (Kayrock client only, default: nil, must be using api_version >= 3) ## Example @@ -371,6 +373,7 @@ defmodule KafkaEx do required_acks = Keyword.get(opts, :required_acks, 0) timeout = Keyword.get(opts, :timeout, 100) compression = Keyword.get(opts, :compression, :none) + timestamp = Keyword.get(opts, :timestamp) produce_request = %ProduceRequest{ topic: topic, @@ -378,7 +381,7 @@ defmodule KafkaEx do required_acks: required_acks, timeout: timeout, compression: compression, - messages: [%Message{key: key, value: value}], + messages: [%Message{key: key, value: value, timestamp: timestamp}], api_version: Keyword.get(opts, :api_version, 0) } diff --git a/lib/kafka_ex/exceptions.ex b/lib/kafka_ex/exceptions.ex index e6745721..9efc4e74 100644 --- a/lib/kafka_ex/exceptions.ex +++ b/lib/kafka_ex/exceptions.ex @@ -26,3 +26,7 @@ defmodule KafkaEx.InvalidConsumerGroupError do %__MODULE__{message: message} end end + +defmodule KafkaEx.TimestampNotSupportedError do + defexception message: "Timestamp requires produce api_version >= 3" +end diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 2490095e..b520999b 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -27,6 +27,7 @@ defmodule KafkaEx.New.Adapter do alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse alias KafkaEx.Protocol.Fetch.Message, as: FetchMessage + alias KafkaEx.TimestampNotSupportedError alias Kayrock.MessageSet alias Kayrock.MessageSet.Message @@ -64,39 +65,7 @@ defmodule KafkaEx.New.Adapter do topic = produce_request.topic partition = produce_request.partition - # TODO refactor, add timestamp tests - message_set = - case produce_request.api_version do - v when v <= 2 -> - %MessageSet{ - messages: - Enum.map( - produce_request.messages, - fn msg -> - %Message{ - key: msg.key, - value: msg.value, - compression: produce_request.compression - } - end - ) - } - - _ -> - %RecordBatch{ - attributes: produce_attributes(produce_request), - records: - Enum.map( - produce_request.messages, - fn msg -> - %Record{ - key: msg.key, - value: msg.value - } - end - ) - } - end + message_set = build_produce_messages(produce_request) request = Kayrock.Produce.get_request_struct(produce_request.api_version) @@ -501,7 +470,8 @@ defmodule KafkaEx.New.Adapter do value: record.value, offset: record.offset, topic: topic, - partition: partition + partition: partition, + timestamp: record.timestamp } end) end) @@ -530,7 +500,8 @@ defmodule KafkaEx.New.Adapter do value: message.value, offset: message.offset, topic: topic, - partition: partition + partition: partition, + timestamp: message.timestamp } end) @@ -579,4 +550,45 @@ defmodule KafkaEx.New.Adapter do defp produce_attributes(%{compression: :none}), do: 0 defp produce_attributes(%{compression: :gzip}), do: 1 defp produce_attributes(%{compression: :snappy}), do: 2 + + defp build_produce_messages(%{api_version: v} = produce_request) + when v <= 2 do + %MessageSet{ + messages: + Enum.map( + produce_request.messages, + fn msg -> + if msg.timestamp do + raise TimestampNotSupportedError + end + + %Message{ + key: msg.key, + value: msg.value, + compression: produce_request.compression + } + end + ) + } + end + + defp build_produce_messages(produce_request) do + %RecordBatch{ + attributes: produce_attributes(produce_request), + records: + Enum.map( + produce_request.messages, + fn msg -> + %Record{ + key: msg.key, + value: msg.value, + timestamp: minus_one_if_nil(msg.timestamp) + } + end + ) + } + end + + defp minus_one_if_nil(nil), do: -1 + defp minus_one_if_nil(x), do: x end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 24731c35..32d44ba9 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -54,7 +54,8 @@ defmodule KafkaEx.Protocol.Fetch do key: nil, value: nil, topic: nil, - partition: nil + partition: nil, + timestamp: nil @type t :: %Message{ attributes: integer, @@ -63,7 +64,9 @@ defmodule KafkaEx.Protocol.Fetch do key: binary, value: binary, topic: binary, - partition: integer + partition: integer, + # timestamp supported for `kafka_version: "kayrock"` ONLY + timestamp: integer } end diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 6c27354d..3bcbef25 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -47,9 +47,10 @@ defmodule KafkaEx.Protocol.Produce do - key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil - value: is the message to be written to kafka logs. + - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ - defstruct key: nil, value: nil - @type t :: %Message{key: binary, value: binary} + defstruct key: nil, value: nil, timestamp: nil + @type t :: %Message{key: binary, value: binary, timestamp: integer} end defmodule Response do diff --git a/mix.exs b/mix.exs index d35b88a0..430f1bed 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.7"}, + {:kayrock, "~> 0.1.8"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index bfa392b6..8839f6a6 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.7", "6b04b5181cf358c31ba16807dc2dd8b25e5df442b9190e6762c03f42c6ab545f", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.8", "f91dbc324a142e121e3e267d5d83812b6bc3de4a6738496d92149c374e925295", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, From becfbf40def7956e0a55e04d9344c7a9824c8a58 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 10 Sep 2019 21:54:11 -0400 Subject: [PATCH 11/11] Fix merge conflicts I r bad at git --- lib/kafka_ex.ex | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 3173cc95..d6a5cec0 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -373,10 +373,7 @@ defmodule KafkaEx do required_acks = Keyword.get(opts, :required_acks, 0) timeout = Keyword.get(opts, :timeout, 100) compression = Keyword.get(opts, :compression, :none) -<<<<<<< HEAD timestamp = Keyword.get(opts, :timestamp) -======= ->>>>>>> master produce_request = %ProduceRequest{ topic: topic, @@ -384,12 +381,8 @@ defmodule KafkaEx do required_acks: required_acks, timeout: timeout, compression: compression, -<<<<<<< HEAD messages: [%Message{key: key, value: value, timestamp: timestamp}], api_version: Keyword.get(opts, :api_version, 0) -======= - messages: [%Message{key: key, value: value}] ->>>>>>> master } produce(produce_request, opts)