diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 15626bb5..d6a5cec0 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -246,6 +246,7 @@ defmodule KafkaEx do - min_bytes: minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting wait_time to 100 and setting min_bytes 64000 would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). Default is 1 - max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000 - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. + - api_version: Version of the Fetch API message to send (Kayrock client only, default: 0) ## Example @@ -270,6 +271,11 @@ defmodule KafkaEx do max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) auto_commit = Keyword.get(opts, :auto_commit, true) + # NOTE api_version is used by the new client to allow + # compatibility with newer message formats and is ignored by the legacy + # server implementations. + api_version = Keyword.get(opts, :api_version, 0) + retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) @@ -283,7 +289,8 @@ defmodule KafkaEx do offset: retrieved_offset, wait_time: wait_time, min_bytes: min_bytes, - max_bytes: max_bytes + max_bytes: max_bytes, + api_version: api_version }}, opts ) @@ -339,6 +346,9 @@ defmodule KafkaEx do - required_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0 - timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds - compression: specifies the compression type (:none, :snappy, :gzip) + - api_version: Version of the Fetch API message to send (Kayrock client only, default: 0) + - timestamp: unix epoch timestamp in milliseconds for the message + (Kayrock client only, default: nil, must be using api_version >= 3) ## Example @@ -363,6 +373,7 @@ defmodule KafkaEx do required_acks = Keyword.get(opts, :required_acks, 0) timeout = Keyword.get(opts, :timeout, 100) compression = Keyword.get(opts, :compression, :none) + timestamp = Keyword.get(opts, :timestamp) produce_request = %ProduceRequest{ topic: topic, @@ -370,7 +381,8 @@ defmodule KafkaEx do required_acks: required_acks, timeout: timeout, compression: compression, - messages: [%Message{key: key, value: value}] + messages: [%Message{key: key, value: value, timestamp: timestamp}], + api_version: Keyword.get(opts, :api_version, 0) } produce(produce_request, opts) diff --git a/lib/kafka_ex/exceptions.ex b/lib/kafka_ex/exceptions.ex index e6745721..9efc4e74 100644 --- a/lib/kafka_ex/exceptions.ex +++ b/lib/kafka_ex/exceptions.ex @@ -26,3 +26,7 @@ defmodule KafkaEx.InvalidConsumerGroupError do %__MODULE__{message: message} end end + +defmodule KafkaEx.TimestampNotSupportedError do + defexception message: "Timestamp requires produce api_version >= 3" +end diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index ad8ed671..b520999b 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -27,9 +27,12 @@ defmodule KafkaEx.New.Adapter do alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse alias KafkaEx.Protocol.Fetch.Message, as: FetchMessage + alias KafkaEx.TimestampNotSupportedError alias Kayrock.MessageSet alias Kayrock.MessageSet.Message + alias Kayrock.RecordBatch + alias Kayrock.RecordBatch.Record def list_offsets_request(topic, partition, time) do time = Offset.parse_time(time) @@ -58,41 +61,32 @@ defmodule KafkaEx.New.Adapter do end) end - def produce_request(request) do - topic = request.topic - partition = request.partition + def produce_request(produce_request) do + topic = produce_request.topic + partition = produce_request.partition - message_set = %MessageSet{ - messages: - Enum.map( - request.messages, - fn msg -> - %Message{ - key: msg.key, - value: msg.value, - compression: request.compression - } - end - ) - } + message_set = build_produce_messages(produce_request) - request = %Kayrock.Produce.V0.Request{ - acks: request.required_acks, - timeout: request.timeout, - topic_data: [ - %{ - topic: request.topic, - data: [ - %{partition: request.partition, record_set: message_set} - ] - } - ] + request = Kayrock.Produce.get_request_struct(produce_request.api_version) + + request = %{ + request + | acks: produce_request.required_acks, + timeout: produce_request.timeout, + topic_data: [ + %{ + topic: produce_request.topic, + data: [ + %{partition: produce_request.partition, record_set: message_set} + ] + } + ] } {request, topic, partition} end - def produce_response(%Kayrock.Produce.V0.Response{ + def produce_response(%{ responses: [ %{ partition_responses: [ @@ -125,23 +119,49 @@ defmodule KafkaEx.New.Adapter do end def fetch_request(fetch_request) do - {%Kayrock.Fetch.V0.Request{ - max_wait_time: fetch_request.wait_time, - min_bytes: fetch_request.min_bytes, - replica_id: -1, - topics: [ - %{ - topic: fetch_request.topic, - partitions: [ - %{ - partition: fetch_request.partition, - fetch_offset: fetch_request.offset, - max_bytes: fetch_request.max_bytes - } - ] - } - ] - }, fetch_request.topic, fetch_request.partition} + request = Kayrock.Fetch.get_request_struct(fetch_request.api_version) + + partition_request = %{ + partition: fetch_request.partition, + fetch_offset: fetch_request.offset, + max_bytes: fetch_request.max_bytes + } + + partition_request = + if fetch_request.api_version >= 5 do + Map.put(partition_request, :log_start_offset, 0) + else + partition_request + end + + request = %{ + request + | max_wait_time: fetch_request.wait_time, + min_bytes: fetch_request.min_bytes, + replica_id: -1, + topics: [ + %{ + topic: fetch_request.topic, + partitions: [partition_request] + } + ] + } + + request = + if fetch_request.api_version >= 3 do + %{request | max_bytes: fetch_request.max_bytes} + else + request + end + + request = + if fetch_request.api_version >= 4 do + %{request | isolation_level: 0} + else + request + end + + {request, fetch_request.topic, fetch_request.partition} end def fetch_response(fetch_response) do @@ -434,22 +454,26 @@ defmodule KafkaEx.New.Adapter do } end - defp kayrock_message_set_to_kafka_ex( - %Kayrock.RecordBatch{} = record_batch, - topic, - partition - ) do + defp kayrock_message_set_to_kafka_ex(nil, _topic, _partition) do + {[], nil} + end + + defp kayrock_message_set_to_kafka_ex(record_batches, topic, partition) + when is_list(record_batches) do messages = - Enum.map(record_batch.records, fn record -> - %FetchMessage{ - attributes: record.attributes, - crc: nil, - key: record.key, - value: record.value, - offset: record.offset, - topic: topic, - partition: partition - } + Enum.flat_map(record_batches, fn record_batch -> + Enum.map(record_batch.records, fn record -> + %FetchMessage{ + attributes: record.attributes, + crc: nil, + key: record.key, + value: record.value, + offset: record.offset, + topic: topic, + partition: partition, + timestamp: record.timestamp + } + end) end) case messages do @@ -476,7 +500,8 @@ defmodule KafkaEx.New.Adapter do value: message.value, offset: message.offset, topic: topic, - partition: partition + partition: partition, + timestamp: message.timestamp } end) @@ -520,4 +545,50 @@ defmodule KafkaEx.New.Adapter do isrs: partition.isr } end + + # NOTE we don't handle any other attributes here + defp produce_attributes(%{compression: :none}), do: 0 + defp produce_attributes(%{compression: :gzip}), do: 1 + defp produce_attributes(%{compression: :snappy}), do: 2 + + defp build_produce_messages(%{api_version: v} = produce_request) + when v <= 2 do + %MessageSet{ + messages: + Enum.map( + produce_request.messages, + fn msg -> + if msg.timestamp do + raise TimestampNotSupportedError + end + + %Message{ + key: msg.key, + value: msg.value, + compression: produce_request.compression + } + end + ) + } + end + + defp build_produce_messages(produce_request) do + %RecordBatch{ + attributes: produce_attributes(produce_request), + records: + Enum.map( + produce_request.messages, + fn msg -> + %Record{ + key: msg.key, + value: msg.value, + timestamp: minus_one_if_nil(msg.timestamp) + } + end + ) + } + end + + defp minus_one_if_nil(nil), do: -1 + defp minus_one_if_nil(x), do: x end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 57d435cd..32d44ba9 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -17,7 +17,9 @@ defmodule KafkaEx.Protocol.Fetch do wait_time: nil, min_bytes: nil, max_bytes: nil, - auto_commit: nil + auto_commit: nil, + # NOTE api_version only used in new client + api_version: 0 @type t :: %Request{ correlation_id: integer, @@ -27,7 +29,8 @@ defmodule KafkaEx.Protocol.Fetch do offset: integer, wait_time: integer, min_bytes: integer, - max_bytes: integer + max_bytes: integer, + api_version: integer } end @@ -45,7 +48,14 @@ defmodule KafkaEx.Protocol.Fetch do defmodule Message do @moduledoc false - defstruct attributes: 0, crc: nil, offset: nil, key: nil, value: nil, topic: nil, partition: nil + defstruct attributes: 0, + crc: nil, + offset: nil, + key: nil, + value: nil, + topic: nil, + partition: nil, + timestamp: nil @type t :: %Message{ attributes: integer, @@ -54,7 +64,9 @@ defmodule KafkaEx.Protocol.Fetch do key: binary, value: binary, topic: binary, - partition: integer + partition: integer, + # timestamp supported for `kafka_version: "kayrock"` ONLY + timestamp: integer } end @@ -95,18 +107,24 @@ defmodule KafkaEx.Protocol.Fetch do partitions, topic ) do - {:ok, message_set, last_offset} = parse_message_set([], msg_set_data, topic, partition) - - parse_partitions(partitions_size - 1, rest, [ - %{ - partition: partition, - error_code: Protocol.error(error_code), - hw_mark_offset: hw_mark_offset, - message_set: message_set, - last_offset: last_offset - } - | partitions - ], topic) + {:ok, message_set, last_offset} = + parse_message_set([], msg_set_data, topic, partition) + + parse_partitions( + partitions_size - 1, + rest, + [ + %{ + partition: partition, + error_code: Protocol.error(error_code), + hw_mark_offset: hw_mark_offset, + message_set: message_set, + last_offset: last_offset + } + | partitions + ], + topic + ) end defp parse_message_set([], <<>>, _topic, _partition) do @@ -117,10 +135,15 @@ defmodule KafkaEx.Protocol.Fetch do list, <>, - topic, - partition + topic, + partition ) do - {:ok, message} = parse_message(%Message{offset: offset, topic: topic, partition: partition}, msg_data) + {:ok, message} = + parse_message( + %Message{offset: offset, topic: topic, partition: partition}, + msg_data + ) + parse_message_set(append_messages(message, list), rest, topic, partition) end @@ -165,10 +188,16 @@ defmodule KafkaEx.Protocol.Fetch do parse_key(message, rest) end - defp maybe_decompress(%Message{attributes: attributes, topic: topic, partition: partition}, rest) do + defp maybe_decompress( + %Message{attributes: attributes, topic: topic, partition: partition}, + rest + ) do <<-1::32-signed, value_size::32, value::size(value_size)-binary>> = rest decompressed = Compression.decompress(attributes, value) - {:ok, msg_set, _offset} = parse_message_set([], decompressed, topic, partition) + + {:ok, msg_set, _offset} = + parse_message_set([], decompressed, topic, partition) + {:ok, msg_set} end diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 62163c61..3bcbef25 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -27,7 +27,9 @@ defmodule KafkaEx.Protocol.Produce do required_acks: 0, timeout: 0, compression: :none, - messages: [] + messages: [], + # NOTE api_version only used in new client + api_version: 0 @type t :: %Request{ topic: binary, @@ -35,7 +37,8 @@ defmodule KafkaEx.Protocol.Produce do required_acks: integer, timeout: integer, compression: atom, - messages: list + messages: list, + api_version: integer } end @@ -44,9 +47,10 @@ defmodule KafkaEx.Protocol.Produce do - key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil - value: is the message to be written to kafka logs. + - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ - defstruct key: nil, value: nil - @type t :: %Message{key: binary, value: binary} + defstruct key: nil, value: nil, timestamp: nil + @type t :: %Message{key: binary, value: binary, timestamp: integer} end defmodule Response do @@ -133,13 +137,18 @@ defmodule KafkaEx.Protocol.Produce do partitions, topic ) do - parse_partitions(partitions_size - 1, rest, [ - %{ - partition: partition, - error_code: Protocol.error(error_code), - offset: offset - } - | partitions - ], topic) + parse_partitions( + partitions_size - 1, + rest, + [ + %{ + partition: partition, + error_code: Protocol.error(error_code), + offset: offset + } + | partitions + ], + topic + ) end end diff --git a/mix.exs b/mix.exs index 6ec62880..430f1bed 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.Mixfile do defp deps do [ - {:kayrock, "~> 0.1.0"}, + {:kayrock, "~> 0.1.8"}, {:credo, "~> 0.8.10", only: :dev}, {:dialyxir, "~> 1.0.0-rc.3", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, diff --git a/mix.lock b/mix.lock index 655acfc6..8839f6a6 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "hackney": {:hex, :hackney, "1.8.6", "21a725db3569b3fb11a6af17d5c5f654052ce9624219f1317e8639183de4a423", [:rebar3], [{:certifi, "1.2.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.0.2", "ac203208ada855d95dc591a764b6e87259cb0e2a364218f215ad662daa8cd6b4", [:rebar3], [{:unicode_util_compat, "0.2.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "kayrock": {:hex, :kayrock, "0.1.0", "f8c77c0e380579ac7aafd794def1a2e4bdbd62829128061e8c75931517bdf869", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, + "kayrock": {:hex, :kayrock, "0.1.8", "f91dbc324a142e121e3e267d5d83812b6bc3de4a6738496d92149c374e925295", [:mix], [{:connection, "~>1.0.4", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~>0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~>1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs new file mode 100644 index 00000000..9f5398d0 --- /dev/null +++ b/test/integration/kayrock/record_batch_test.exs @@ -0,0 +1,501 @@ +defmodule KafkaEx.KayrockRecordBatchTest do + @moduledoc """ + Tests for producing/fetching messages using the newer RecordBatch format + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + + @moduletag :new_client + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + + {:ok, pid} = Client.start_link(args, :no_name) + + {:ok, %{client: pid}} + end + + test "can specify protocol version for fetch - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "fetch empty message set - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset + 5, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + assert partition_response.message_set == [] + end + + # v2 is the highest that will accept the MessageSet format + test "can specify protocol version for produce - v2", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 2 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "can specify protocol version for fetch - v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.offset == offset + assert message.value == msg + end + + test "fetch empty message set - v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset + 5, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + assert partition_response.message_set == [] + end + + # v3 is the lowest that requires the RecordBatch format + test "can specify protocol version for produce - v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v0, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + api_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v0, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + api_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v3, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v3, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "gzip compression - produce v3, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :gzip, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v0, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + api_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v0, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + api_version: 0 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end + + test "snappy compression - produce v3, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + compression: :snappy, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: max(offset - 2, 0), + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + end +end diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs new file mode 100644 index 00000000..6242f056 --- /dev/null +++ b/test/integration/kayrock/timestamp_test.exs @@ -0,0 +1,387 @@ +defmodule KafkaEx.KayrockTimestampTest do + @moduledoc """ + Tests for the timestamp functionality in messages + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + alias KafkaEx.New.NodeSelector + alias KafkaEx.TimestampNotSupportedError + + require Logger + + @moduletag :new_client + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + + {:ok, pid} = Client.start_link(args, :no_name) + + {:ok, %{client: pid}} + end + + defp ensure_append_timestamp_topic(client) do + topic_name = "test_log_append_timestamp" + + resp = + Client.send_request( + client, + %Kayrock.CreateTopics.V0.Request{ + create_topic_requests: [ + %{ + topic: topic_name, + num_partitions: 4, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{ + config_name: "message.timestamp.type", + config_value: "LogAppendTime" + } + ] + } + ], + timeout: 1000 + }, + NodeSelector.controller() + ) + + {:ok, + %Kayrock.CreateTopics.V0.Response{ + topic_errors: [%{error_code: error_code}] + }} = resp + + unless error_code in [0, 36] do + Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") + assert false + end + + topic_name + end + + test "fetch timestamp is nil by default on v0 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "fetch timestamp is -1 by default on v3 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == -1 + end + + test "fetch timestamp is -1 by default on v5 messages", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == -1 + end + + test "log with append time - v0", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "log with append time - v3", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + refute is_nil(message.timestamp) + assert message.timestamp > 0 + end + + test "log with append time - v5", %{client: client} do + topic = ensure_append_timestamp_topic(client) + + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + refute is_nil(message.timestamp) + assert message.timestamp > 0 + end + + test "set timestamp with v0 throws an error", %{client: client} do + topic = "food" + + msg = TestHelper.generate_random_string() + + Process.flag(:trap_exit, true) + + catch_exit do + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 0 + ) + end + + assert_received {:EXIT, ^client, {%TimestampNotSupportedError{}, _}} + end + + test "set timestamp with v1 throws an error", %{client: client} do + topic = "food" + + msg = TestHelper.generate_random_string() + + Process.flag(:trap_exit, true) + + catch_exit do + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 1 + ) + end + + assert_received {:EXIT, ^client, {%TimestampNotSupportedError{}, _}} + end + + test "set timestamp for v3 message, fetch v0", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 0 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == nil + end + + test "set timestamp for v3 message, fetch v3", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 3 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == 12345 + end + + test "set timestamp for v3 message, fetch v5", %{client: client} do + topic = "food" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1, + timestamp: 12345, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.value == msg + assert message.offset == offset + assert message.timestamp == 12345 + end +end