Skip to content

Commit

Permalink
Add Kafka consumer metrics handler #94
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelliao5 authored and bartelink committed Feb 13, 2019
1 parent a97c8f7 commit 2d505f3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" />
<PackageReference Include="Serilog" Version="2.7.1" />
</ItemGroup>
Expand Down
37 changes: 37 additions & 0 deletions src/Equinox.Projection.Kafka/Kafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ open System.Collections.Concurrent
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
open Newtonsoft.Json.Linq
open Newtonsoft.Json

module private Config =
let validateBrokerUri (u:Uri) =
Expand Down Expand Up @@ -256,6 +258,7 @@ type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq<KeyValueP
?fetchMaxBytes,
/// Default 10B.
?fetchMinBytes,
/// Stats reporting interval for the consumer in ms. By default, the reporting is turned off.
?statisticsInterval,
/// Consumed offsets commit interval. Default 10s. (WAS 1s)
?offsetCommitInterval,
Expand Down Expand Up @@ -291,6 +294,25 @@ type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq<KeyValueP
maxBatchDelay = defaultArg maxBatchDelay (TimeSpan.FromMilliseconds 500.)
minInFlightBytes = defaultArg minInFlightBytes (16L * 1024L * 1024L)
maxInFlightBytes = defaultArg maxInFlightBytes (24L * 1024L * 1024L) } }

// Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
type KafkaPartitionMetrics =
{
partition: int
[<JsonProperty("fetch_state")>]
fetchState: string
[<JsonProperty("next_offset")>]
nextOffset: int64
[<JsonProperty("stored_offset")>]
storedOffset: int64
[<JsonProperty("committed_offset")>]
committedOffset: int64
[<JsonProperty("lo_offset")>]
loOffset: int64
[<JsonProperty("hi_offset")>]
hiOffset: int64
[<JsonProperty("consumer_lag")>]
consumerLag: int64 }

type KafkaConsumer private (log : ILogger, consumer : Consumer<string, string>, task : Task<unit>, cts : CancellationTokenSource) =

Expand All @@ -317,6 +339,21 @@ type KafkaConsumer private (log : ILogger, consumer : Consumer<string, string>,
ConsumerBuilder<_,_>(config.Kvps)
.SetLogHandler(fun _c m -> log.Information("consumer_info|{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetStatisticsHandler(fun _c json ->
// Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
let stats = JToken.Parse json
(stats.Item "topics").Children()
|> Seq.filter(fun t ->
t.HasValues && config.topics |> Seq.exists(fun top -> top = ((t.First.Item "topic").ToString())))
|> Seq.iter (fun topicMetric ->
let topicMetric = topicMetric.First
let topic =( topicMetric.Item "topic").ToString()
let metrics =
(topicMetric.Item "partitions").Children()
|> Seq.choose(fun t ->
if t.HasValues then Some (t.First.ToObject<KafkaPartitionMetrics>()) else None)
|> Seq.filter(fun m -> m.partition <> -1)
log.Information("consumer stats reporting topic:{topic} | {@stats}", topic, metrics)))
.SetRebalanceHandler(fun _c m ->
for topic,partitions in m.Partitions |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> let p = p.Partition in p.Value |]) do
if m.IsAssignment then log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type T1(testOutputHelper) =
// Section: run the test
let producers = runProducers log broker topic numProducers messagesPerProducer |> Async.Ignore

let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId)
let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId, statisticsInterval=(TimeSpan.FromSeconds 1.))
let consumers = runConsumers log config numConsumers None consumerCallback

do! [ producers ; consumers ]
Expand Down

0 comments on commit 2d505f3

Please sign in to comment.