-
Notifications
You must be signed in to change notification settings - Fork 807
/
Copy pathKafkaHealthCheckBuilderExtensions.cs
116 lines (108 loc) · 5.48 KB
/
KafkaHealthCheckBuilderExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace Microsoft.Extensions.DependencyInjection;
/// <summary>
/// Extension methods to configure <see cref="KafkaHealthCheck"/>.
/// </summary>
public static class KafkaHealthCheckBuilderExtensions
{
private const string NAME = "kafka";
internal const string DEFAULT_TOPIC = "healthchecks-topic";
/// <summary>
/// Add a health check for Kafka cluster.
/// </summary>
/// <param name="builder">The <see cref="IHealthChecksBuilder"/>.</param>
/// <param name="config">The Kafka connection configuration parameters to be used.</param>
/// <param name="topic">The topic name to produce Kafka messages on.</param>
/// <param name="name">The health check name. Optional. If <c>null</c> the type name 'kafka' will be used for the name.</param>
/// <param name="failureStatus">
/// The <see cref="HealthStatus"/> that should be reported when the health check fails. Optional. If <c>null</c> then
/// the default status of <see cref="HealthStatus.Unhealthy"/> will be reported.
/// </param>
/// <param name="tags">A list of tags that can be used to filter sets of health checks. Optional.</param>
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
/// <returns>The specified <paramref name="builder"/>.</returns>
public static IHealthChecksBuilder AddKafka(
this IHealthChecksBuilder builder,
ProducerConfig config,
string topic = DEFAULT_TOPIC,
string? name = default,
HealthStatus? failureStatus = default,
IEnumerable<string>? tags = default,
TimeSpan? timeout = default)
{
builder.Services.AddSingleton(_ => new KafkaHealthCheck(new KafkaHealthCheckOptions { Configuration = config, Topic = topic }));
return builder.Add(new HealthCheckRegistration(
name ?? NAME,
sp => sp.GetRequiredService<KafkaHealthCheck>(),
failureStatus,
tags,
timeout));
}
/// <summary>
/// Add a health check for Kafka cluster.
/// </summary>
/// <param name="builder">The <see cref="IHealthChecksBuilder"/>.</param>
/// <param name="setup">The action to configure the Kafka connection configuration parameters to be used.</param>
/// <param name="topic">The topic name to produce Kafka messages on.</param>
/// <param name="name">The health check name. Optional. If <c>null</c> the type name 'kafka' will be used for the name.</param>
/// <param name="failureStatus">
/// The <see cref="HealthStatus"/> that should be reported when the health check fails. Optional. If <c>null</c> then
/// the default status of <see cref="HealthStatus.Unhealthy"/> will be reported.
/// </param>
/// <param name="tags">A list of tags that can be used to filter sets of health checks. Optional.</param>
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
/// <returns>The specified <paramref name="builder"/>.</returns>
public static IHealthChecksBuilder AddKafka(
this IHealthChecksBuilder builder,
Action<ProducerConfig> setup,
string topic = DEFAULT_TOPIC,
string? name = default,
HealthStatus? failureStatus = default,
IEnumerable<string>? tags = default,
TimeSpan? timeout = default)
{
builder.Services.AddSingleton(_ =>
{
var config = new ProducerConfig();
setup?.Invoke(config);
return new KafkaHealthCheck(new KafkaHealthCheckOptions { Configuration = config, Topic = topic });
});
return builder.Add(new HealthCheckRegistration(
name ?? NAME,
sp => sp.GetRequiredService<KafkaHealthCheck>(),
failureStatus,
tags,
timeout));
}
/// <summary>
/// Add a health check for Kafka cluster.
/// </summary>
/// <param name="builder">The <see cref="IHealthChecksBuilder"/>.</param>
/// <param name="options">Options to configure Kafka health check.</param>
/// <param name="name">The health check name. Optional. If <c>null</c> the type name 'kafka' will be used for the name.</param>
/// <param name="failureStatus">
/// The <see cref="HealthStatus"/> that should be reported when the health check fails. Optional. If <c>null</c> then
/// the default status of <see cref="HealthStatus.Unhealthy"/> will be reported.
/// </param>
/// <param name="tags">A list of tags that can be used to filter sets of health checks. Optional.</param>
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
/// <returns>The specified <paramref name="builder"/>.</returns>
public static IHealthChecksBuilder AddKafka(
this IHealthChecksBuilder builder,
KafkaHealthCheckOptions options,
string? name = default,
HealthStatus? failureStatus = default,
IEnumerable<string>? tags = default,
TimeSpan? timeout = default)
{
builder.Services.AddSingleton(_ => new KafkaHealthCheck(options));
return builder.Add(new HealthCheckRegistration(
name ?? NAME,
sp => sp.GetRequiredService<KafkaHealthCheck>(),
failureStatus,
tags,
timeout));
}
}