From 52f16d66185fe2516a160ad30346a9b076a1087f Mon Sep 17 00:00:00 2001 From: Taavi Burns Date: Wed, 27 Dec 2017 16:54:18 -0500 Subject: [PATCH 1/3] Adds a clause to match when a partial messageSet is received. --- lib/kafka_ex/protocol/fetch.ex | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 3740e1e3..f999c2c1 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -73,6 +73,13 @@ defmodule KafkaEx.Protocol.Fetch do defp parse_message_set([last|_] = list, _) do {:ok, Enum.reverse(list), last.offset} end + defp parse_message_set(_, << offset :: 64, msg_size :: 32, rest :: binary >>) do + if byte_size(rest) < msg_size do + raise RuntimeError, "Too little data fetched at offset #{offset}. Message size #{msg_size} but only got another #{byte_size rest} bytes! Try increasing max_bytes to at least #{msg_size + 12}." + else + raise FunctionClauseError, "no function clause matching in #{__MODULE__}.parse_message_set/2" + end + end # handles the single message case and the batch (compression) case defp append_messages([], list) do From cbbcde063f3b43a754adc0ba34d3bab905568738 Mon Sep 17 00:00:00 2001 From: Taavi Burns Date: Fri, 29 Dec 2017 11:35:22 -0500 Subject: [PATCH 2/3] Uses a guard to match a message larger than the bytes fetched. TIL you can use integers matched out of a binary in a guard clause! --- lib/kafka_ex/protocol/fetch.ex | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index f999c2c1..ed415050 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -73,12 +73,8 @@ defmodule KafkaEx.Protocol.Fetch do defp parse_message_set([last|_] = list, _) do {:ok, Enum.reverse(list), last.offset} end - defp parse_message_set(_, << offset :: 64, msg_size :: 32, rest :: binary >>) do - if byte_size(rest) < msg_size do - raise RuntimeError, "Too little data fetched at offset #{offset}. Message size #{msg_size} but only got another #{byte_size rest} bytes! Try increasing max_bytes to at least #{msg_size + 12}." - else - raise FunctionClauseError, "no function clause matching in #{__MODULE__}.parse_message_set/2" - end + defp parse_message_set(_, << offset :: 64, msg_size :: 32, partial_msg_data :: binary >>) when byte_size(partial_message_data> < msg_size do + raise RuntimeError, "Insufficient data fetched at offset #{offset}. Message size is #{msg_size} but only received #{byte_size(partial_message_data)} bytes. Try increasing max_bytes." end # handles the single message case and the batch (compression) case From 1f96b22c9fdef1b7b3ffd5c87bb156d908406fe0 Mon Sep 17 00:00:00 2001 From: Taavi Burns Date: Wed, 3 Jan 2018 10:50:58 -0500 Subject: [PATCH 3/3] Fix some typos --- lib/kafka_ex/protocol/fetch.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index ed415050..0d662c26 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -73,7 +73,7 @@ defmodule KafkaEx.Protocol.Fetch do defp parse_message_set([last|_] = list, _) do {:ok, Enum.reverse(list), last.offset} end - defp parse_message_set(_, << offset :: 64, msg_size :: 32, partial_msg_data :: binary >>) when byte_size(partial_message_data> < msg_size do + defp parse_message_set(_, << offset :: 64, msg_size :: 32, partial_message_data :: binary >>) when byte_size(partial_message_data) < msg_size do raise RuntimeError, "Insufficient data fetched at offset #{offset}. Message size is #{msg_size} but only received #{byte_size(partial_message_data)} bytes. Try increasing max_bytes." end