Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying API version for produce/fetch with new client, support timestamps #364

Merged
merged 12 commits into from
Sep 12, 2019
16 changes: 14 additions & 2 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ defmodule KafkaEx do
- min_bytes: minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting wait_time to 100 and setting min_bytes 64000 would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). Default is 1
- max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000
- auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`.
- api_version: Version of the Fetch API message to send (Kayrock client only, default: 0)

## Example

Expand All @@ -270,6 +271,11 @@ defmodule KafkaEx do
max_bytes = Keyword.get(opts, :max_bytes, @max_bytes)
auto_commit = Keyword.get(opts, :auto_commit, true)

# NOTE api_version is used by the new client to allow
# compatibility with newer message formats and is ignored by the legacy
# server implementations.
api_version = Keyword.get(opts, :api_version, 0)

retrieved_offset =
current_offset(supplied_offset, partition, topic, worker_name)

Expand All @@ -283,7 +289,8 @@ defmodule KafkaEx do
offset: retrieved_offset,
wait_time: wait_time,
min_bytes: min_bytes,
max_bytes: max_bytes
max_bytes: max_bytes,
api_version: api_version
}},
opts
)
Expand Down Expand Up @@ -339,6 +346,9 @@ defmodule KafkaEx do
- required_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0
- timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds
- compression: specifies the compression type (:none, :snappy, :gzip)
- api_version: Version of the Fetch API message to send (Kayrock client only, default: 0)
- timestamp: unix epoch timestamp in milliseconds for the message
(Kayrock client only, default: nil, must be using api_version >= 3)

## Example

Expand All @@ -363,14 +373,16 @@ defmodule KafkaEx do
required_acks = Keyword.get(opts, :required_acks, 0)
timeout = Keyword.get(opts, :timeout, 100)
compression = Keyword.get(opts, :compression, :none)
timestamp = Keyword.get(opts, :timestamp)

produce_request = %ProduceRequest{
topic: topic,
partition: partition,
required_acks: required_acks,
timeout: timeout,
compression: compression,
messages: [%Message{key: key, value: value}]
messages: [%Message{key: key, value: value, timestamp: timestamp}],
api_version: Keyword.get(opts, :api_version, 0)
}

produce(produce_request, opts)
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka_ex/exceptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ defmodule KafkaEx.InvalidConsumerGroupError do
%__MODULE__{message: message}
end
end

defmodule KafkaEx.TimestampNotSupportedError do
defexception message: "Timestamp requires produce api_version >= 3"
end
193 changes: 132 additions & 61 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ defmodule KafkaEx.New.Adapter do
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse
alias KafkaEx.Protocol.Fetch.Message, as: FetchMessage
alias KafkaEx.TimestampNotSupportedError

alias Kayrock.MessageSet
alias Kayrock.MessageSet.Message
alias Kayrock.RecordBatch
alias Kayrock.RecordBatch.Record

def list_offsets_request(topic, partition, time) do
time = Offset.parse_time(time)
Expand Down Expand Up @@ -58,41 +61,32 @@ defmodule KafkaEx.New.Adapter do
end)
end

def produce_request(request) do
topic = request.topic
partition = request.partition
def produce_request(produce_request) do
topic = produce_request.topic
partition = produce_request.partition

message_set = %MessageSet{
messages:
Enum.map(
request.messages,
fn msg ->
%Message{
key: msg.key,
value: msg.value,
compression: request.compression
}
end
)
}
message_set = build_produce_messages(produce_request)

request = %Kayrock.Produce.V0.Request{
acks: request.required_acks,
timeout: request.timeout,
topic_data: [
%{
topic: request.topic,
data: [
%{partition: request.partition, record_set: message_set}
]
}
]
request = Kayrock.Produce.get_request_struct(produce_request.api_version)

request = %{
request
| acks: produce_request.required_acks,
timeout: produce_request.timeout,
topic_data: [
%{
topic: produce_request.topic,
data: [
%{partition: produce_request.partition, record_set: message_set}
]
}
]
}

{request, topic, partition}
end

def produce_response(%Kayrock.Produce.V0.Response{
def produce_response(%{
responses: [
%{
partition_responses: [
Expand Down Expand Up @@ -125,23 +119,49 @@ defmodule KafkaEx.New.Adapter do
end

def fetch_request(fetch_request) do
{%Kayrock.Fetch.V0.Request{
max_wait_time: fetch_request.wait_time,
min_bytes: fetch_request.min_bytes,
replica_id: -1,
topics: [
%{
topic: fetch_request.topic,
partitions: [
%{
partition: fetch_request.partition,
fetch_offset: fetch_request.offset,
max_bytes: fetch_request.max_bytes
}
]
}
]
}, fetch_request.topic, fetch_request.partition}
request = Kayrock.Fetch.get_request_struct(fetch_request.api_version)

partition_request = %{
partition: fetch_request.partition,
fetch_offset: fetch_request.offset,
max_bytes: fetch_request.max_bytes
}

partition_request =
if fetch_request.api_version >= 5 do
Map.put(partition_request, :log_start_offset, 0)
else
partition_request
end

request = %{
request
| max_wait_time: fetch_request.wait_time,
min_bytes: fetch_request.min_bytes,
replica_id: -1,
topics: [
%{
topic: fetch_request.topic,
partitions: [partition_request]
}
]
}

request =
if fetch_request.api_version >= 3 do
%{request | max_bytes: fetch_request.max_bytes}
else
request
end

request =
if fetch_request.api_version >= 4 do
%{request | isolation_level: 0}
else
request
end

{request, fetch_request.topic, fetch_request.partition}
end

def fetch_response(fetch_response) do
Expand Down Expand Up @@ -434,22 +454,26 @@ defmodule KafkaEx.New.Adapter do
}
end

defp kayrock_message_set_to_kafka_ex(
%Kayrock.RecordBatch{} = record_batch,
topic,
partition
) do
defp kayrock_message_set_to_kafka_ex(nil, _topic, _partition) do
{[], nil}
end

defp kayrock_message_set_to_kafka_ex(record_batches, topic, partition)
when is_list(record_batches) do
messages =
Enum.map(record_batch.records, fn record ->
%FetchMessage{
attributes: record.attributes,
crc: nil,
key: record.key,
value: record.value,
offset: record.offset,
topic: topic,
partition: partition
}
Enum.flat_map(record_batches, fn record_batch ->
Enum.map(record_batch.records, fn record ->
%FetchMessage{
attributes: record.attributes,
crc: nil,
key: record.key,
value: record.value,
offset: record.offset,
topic: topic,
partition: partition,
timestamp: record.timestamp
}
end)
end)

case messages do
Expand All @@ -476,7 +500,8 @@ defmodule KafkaEx.New.Adapter do
value: message.value,
offset: message.offset,
topic: topic,
partition: partition
partition: partition,
timestamp: message.timestamp
}
end)

Expand Down Expand Up @@ -520,4 +545,50 @@ defmodule KafkaEx.New.Adapter do
isrs: partition.isr
}
end

# NOTE we don't handle any other attributes here
defp produce_attributes(%{compression: :none}), do: 0
defp produce_attributes(%{compression: :gzip}), do: 1
defp produce_attributes(%{compression: :snappy}), do: 2

defp build_produce_messages(%{api_version: v} = produce_request)
when v <= 2 do
%MessageSet{
messages:
Enum.map(
produce_request.messages,
fn msg ->
if msg.timestamp do
raise TimestampNotSupportedError
end

%Message{
key: msg.key,
value: msg.value,
compression: produce_request.compression
}
end
)
}
end

defp build_produce_messages(produce_request) do
%RecordBatch{
attributes: produce_attributes(produce_request),
records:
Enum.map(
produce_request.messages,
fn msg ->
%Record{
key: msg.key,
value: msg.value,
timestamp: minus_one_if_nil(msg.timestamp)
}
end
)
}
end

defp minus_one_if_nil(nil), do: -1
defp minus_one_if_nil(x), do: x
end
Loading