diff --git a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs index c7a72f0..8864629 100644 --- a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs +++ b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs @@ -19,32 +19,58 @@ public class FetchResponseTest { [TestMethod] [TestCategory(TestCategories.BVT)] - public void ShouldAbleToParseFetchResponse() + public void ShouldAbleToParseV0FetchResponse() { var stream = new MemoryStream(); + WriteTestFetchResponse(stream, 0); + var reader = new KafkaBinaryReader(stream); + var response = new FetchResponse.Parser(0).ParseFrom(reader); + response.ThrottleTime.ShouldBeEquivalentTo(0); + var set = response.MessageSet("topic1", 111); + set.Should().NotBeNull(); + var messages = set.Messages.ToList(); + messages.Count().Should().Be(1); + messages.First().Payload.Length.Should().Be(100); + } + + [TestMethod] + [TestCategory(TestCategories.BVT)] + public void ShouldAbleToParseV1FetchResponse() + { + var stream = new MemoryStream(); + WriteTestFetchResponse(stream, 1); + var reader = new KafkaBinaryReader(stream); + var response = new FetchResponse.Parser(1).ParseFrom(reader); + response.ThrottleTime.ShouldBeEquivalentTo(456); + var set = response.MessageSet("topic1", 111); + set.Should().NotBeNull(); + var messages = set.Messages.ToList(); + messages.Count().Should().Be(1); + messages.First().Payload.Length.Should().Be(100); + } + + private static void WriteTestFetchResponse(MemoryStream stream, int versionId) + { var writer = new KafkaBinaryWriter(stream); writer.Write(1); writer.Write(123); // correlation id + if (versionId > 0) + { + writer.Write(456); // throttle time + } writer.Write(1); // data count writer.WriteShortString("topic1"); writer.Write(1); // partition count writer.Write(111); //partition id - writer.Write((short)ErrorMapping.NoError); + writer.Write((short) ErrorMapping.NoError); writer.Write(1011L); // hw var messageStream = new MemoryStream(); var messageWriter = new KafkaBinaryWriter(messageStream); - new BufferedMessageSet(new List() { new Message(new byte[100]) }, 0).WriteTo(messageWriter); - writer.Write((int)messageStream.Length); - writer.Write(messageStream.GetBuffer(), 0, (int)messageStream.Length); + new BufferedMessageSet(new List() {new Message(new byte[100])}, 0).WriteTo(messageWriter); + writer.Write((int) messageStream.Length); + writer.Write(messageStream.GetBuffer(), 0, (int) messageStream.Length); stream.Seek(0, SeekOrigin.Begin); - var reader = new KafkaBinaryReader(stream); - var response = new FetchResponse.Parser().ParseFrom(reader); - var set = response.MessageSet("topic1", 111); - set.Should().NotBeNull(); - var messages = set.Messages.ToList(); - messages.Count().Should().Be(1); - messages.First().Payload.Length.Should().Be(100); } } } diff --git a/src/KafkaNET.Library/KafkaConnection.cs b/src/KafkaNET.Library/KafkaConnection.cs index 475ba7d..18b9cf7 100644 --- a/src/KafkaNET.Library/KafkaConnection.cs +++ b/src/KafkaNET.Library/KafkaConnection.cs @@ -139,7 +139,7 @@ public FetchResponse Send(FetchRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); - return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser()); + return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser(request.VersionId)); } /// diff --git a/src/KafkaNET.Library/Responses/FetchResponse.cs b/src/KafkaNET.Library/Responses/FetchResponse.cs index 0e6e0f8..405c1fa 100644 --- a/src/KafkaNET.Library/Responses/FetchResponse.cs +++ b/src/KafkaNET.Library/Responses/FetchResponse.cs @@ -39,10 +39,11 @@ public FetchResponse(int correlationId, IEnumerable data) this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); } - public FetchResponse(int correlationId, IEnumerable data, int size) + public FetchResponse(int correlationId, int throttleTime, IEnumerable data, int size) { Guard.NotNull(data, "data"); this.CorrelationId = correlationId; + this.ThrottleTime = throttleTime; this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); this.Size = size; @@ -50,6 +51,7 @@ public FetchResponse(int correlationId, IEnumerable data, int size) public int Size { get; private set; } public int CorrelationId { get; private set; } + public int ThrottleTime { get; private set; } public Dictionary TopicDataDict { get; private set; } public BufferedMessageSet MessageSet(string topic, int partition) @@ -92,13 +94,22 @@ public PartitionData PartitionData(string topic, int partition) public class Parser : IResponseParser { + private int versionId; + + public Parser(int versionId) + { + this.versionId = versionId; + } + public FetchResponse ParseFrom(KafkaBinaryReader reader) { - int size = 0, correlationId = 0, dataCount = 0; + int size = 0, correlationId = 0, dataCount = 0, throttleTime = 0; try { size = reader.ReadInt32(); correlationId = reader.ReadInt32(); + if (versionId > 0) + throttleTime = reader.ReadInt32(); dataCount = reader.ReadInt32(); var data = new TopicData[dataCount]; for (int i = 0; i < dataCount; i++) @@ -106,7 +117,7 @@ public FetchResponse ParseFrom(KafkaBinaryReader reader) data[i] = TopicData.ParseFrom(reader); } - return new FetchResponse(correlationId, data, size); + return new FetchResponse(correlationId, throttleTime, data, size); } catch (OutOfMemoryException mex) {