From 2d505f3775edc99a62552551057e97c6658ebb5f Mon Sep 17 00:00:00 2001 From: Michael Liao Date: Mon, 11 Feb 2019 09:12:00 -0500 Subject: [PATCH] Add Kafka consumer metrics handler #94 --- .../Equinox.Projection.Kafka.fsproj | 1 + src/Equinox.Projection.Kafka/Kafka.fs | 37 +++++++++++++++++++ .../KafkaIntegration.fs | 2 +- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj index 46637e760..a5c11fb1b 100644 --- a/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj +++ b/src/Equinox.Projection.Kafka/Equinox.Projection.Kafka.fsproj @@ -21,6 +21,7 @@ + diff --git a/src/Equinox.Projection.Kafka/Kafka.fs b/src/Equinox.Projection.Kafka/Kafka.fs index 30137bb47..257d1924a 100644 --- a/src/Equinox.Projection.Kafka/Kafka.fs +++ b/src/Equinox.Projection.Kafka/Kafka.fs @@ -7,6 +7,8 @@ open System.Collections.Concurrent open System.Collections.Generic open System.Threading open System.Threading.Tasks +open Newtonsoft.Json.Linq +open Newtonsoft.Json module private Config = let validateBrokerUri (u:Uri) = @@ -256,6 +258,7 @@ type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq] + fetchState: string + [] + nextOffset: int64 + [] + storedOffset: int64 + [] + committedOffset: int64 + [] + loOffset: int64 + [] + hiOffset: int64 + [] + consumerLag: int64 } type KafkaConsumer private (log : ILogger, consumer : Consumer, task : Task, cts : CancellationTokenSource) = @@ -317,6 +339,21 @@ type KafkaConsumer private (log : ILogger, consumer : Consumer, ConsumerBuilder<_,_>(config.Kvps) .SetLogHandler(fun _c m -> log.Information("consumer_info|{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) .SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + .SetStatisticsHandler(fun _c json -> + // Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md + let stats = JToken.Parse json + (stats.Item "topics").Children() + |> Seq.filter(fun t -> + t.HasValues && config.topics |> Seq.exists(fun top -> top = ((t.First.Item "topic").ToString()))) + |> Seq.iter (fun topicMetric -> + let topicMetric = topicMetric.First + let topic =( topicMetric.Item "topic").ToString() + let metrics = + (topicMetric.Item "partitions").Children() + |> Seq.choose(fun t -> + if t.HasValues then Some (t.First.ToObject()) else None) + |> Seq.filter(fun m -> m.partition <> -1) + log.Information("consumer stats reporting topic:{topic} | {@stats}", topic, metrics))) .SetRebalanceHandler(fun _c m -> for topic,partitions in m.Partitions |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> let p = p.Partition in p.Value |]) do if m.IsAssignment then log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions) diff --git a/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs index 96d2d3967..f32a175bb 100644 --- a/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs +++ b/tests/Equinox.Projection.Kafka.Integration/KafkaIntegration.fs @@ -135,7 +135,7 @@ type T1(testOutputHelper) = // Section: run the test let producers = runProducers log broker topic numProducers messagesPerProducer |> Async.Ignore - let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId, statisticsInterval=(TimeSpan.FromSeconds 1.)) let consumers = runConsumers log config numConsumers None consumerCallback do! [ producers ; consumers ]