Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dump kafka sample source from aspire repo #85

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions samples/KafkaBasic/Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
<ProjectReference Include="..\KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj" />
</ItemGroup>
</Project>
37 changes: 37 additions & 0 deletions samples/KafkaBasic/Consumer/ConsumerWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Confluent.Kafka;

namespace Consumer;

internal sealed class ConsumerWorker(IConsumer<Ignore, string> consumer, ILogger<ConsumerWorker> logger) : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
long i = 0;
return Task.Factory.StartNew(async () =>
{
consumer.Subscribe("topic");
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<Ignore, string>? result = default;
try
{
result = consumer.Consume(TimeSpan.FromSeconds(1));
}
catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart)
{
await Task.Delay(100);
continue;
}

i++;
if (i % 1000 == 0)
{
logger.LogInformation($"Received {i} messages. current offset is '{result!.Offset}'");
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
}
15 changes: 15 additions & 0 deletions samples/KafkaBasic/Consumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Confluent.Kafka;
using Consumer;

var builder = Host.CreateApplicationBuilder(args);

builder.AddServiceDefaults();

builder.AddKafkaConsumer<Ignore, string>("kafka");

builder.Services.AddHostedService<ConsumerWorker>();

builder.Build().Run();
8 changes: 8 additions & 0 deletions samples/KafkaBasic/Consumer/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
21 changes: 21 additions & 0 deletions samples/KafkaBasic/Consumer/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information",
"Azure": "Warning"
}
},
"Aspire": {
"Confluent": {
"Kafka": {
"Consumer": {
"Config": {
"AutoOffsetReset": "Earliest",
"GroupId": "aspire"
}
}
}
}
}
}
8 changes: 8 additions & 0 deletions samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.props" />

</Project>
8 changes: 8 additions & 0 deletions samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.targets" />

</Project>
17 changes: 17 additions & 0 deletions samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsAspireHost>true</IsAspireHost>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Aspire.Hosting\Aspire.Hosting.csproj" />
<ProjectReference Include="..\Consumer\Consumer.csproj" />
<ProjectReference Include="..\Producer\Producer.csproj" />
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions samples/KafkaBasic/KafkaBasic.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

var builder = DistributedApplication.CreateBuilder(args);

var containerResource = builder.AddKafkaContainer("kafka");

builder.AddProject<Projects.Producer>("producer")
.WithReference(containerResource);

builder.AddProject<Projects.Consumer>("consumer")
.WithReference(containerResource);

builder.Build().Run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "http://localhost:15024",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16132"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
9 changes: 9 additions & 0 deletions samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Aspire.Hosting.Dcp": "Warning"
}
}
}
119 changes: 119 additions & 0 deletions samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;

namespace Microsoft.Extensions.Hosting;

public static class Extensions
{
public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder)
{
builder.ConfigureOpenTelemetry();

builder.AddDefaultHealthChecks();

builder.Services.AddServiceDiscovery();

builder.Services.ConfigureHttpClientDefaults(http =>
{
// Turn on resilience by default
http.AddStandardResilienceHandler();

// Turn on service discovery by default
http.UseServiceDiscovery();
});

return builder;
}

public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder)
{
builder.Logging.AddOpenTelemetry(logging =>
{
logging.IncludeFormattedMessage = true;
logging.IncludeScopes = true;
});

builder.Services.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics.AddRuntimeInstrumentation()
.AddBuiltInMeters();
})
.WithTracing(tracing =>
{
if (builder.Environment.IsDevelopment())
{
// We want to view all traces in development
tracing.SetSampler(new AlwaysOnSampler());
}

tracing.AddAspNetCoreInstrumentation()
.AddGrpcClientInstrumentation()
.AddHttpClientInstrumentation();
});

builder.AddOpenTelemetryExporters();

return builder;
}

private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder)
{
var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]);

if (useOtlpExporter)
{
builder.Services.Configure<OpenTelemetryLoggerOptions>(logging => logging.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter());
}

// Uncomment the following lines to enable the Prometheus exporter (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
// builder.Services.AddOpenTelemetry()
// .WithMetrics(metrics => metrics.AddPrometheusExporter());

// Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.Exporter package)
// builder.Services.AddOpenTelemetry()
// .UseAzureMonitor();

return builder;
}

public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder)
{
builder.Services.AddHealthChecks()
// Add a default liveness check to ensure app is responsive
.AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]);

return builder;
}

public static WebApplication MapDefaultEndpoints(this WebApplication app)
{
// Uncomment the following line to enable the Prometheus endpoint (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
// app.MapPrometheusScrapingEndpoint();

// All health checks must pass for app to be considered ready to accept traffic after starting
app.MapHealthChecks("/health");

// Only health checks tagged with the "live" tag must pass for app to be considered alive
app.MapHealthChecks("/alive", new HealthCheckOptions
{
Predicate = r => r.Tags.Contains("live")
});

return app;
}

private static MeterProviderBuilder AddBuiltInMeters(this MeterProviderBuilder meterProviderBuilder) =>
meterProviderBuilder.AddMeter(
"Microsoft.AspNetCore.Hosting",
"Microsoft.AspNetCore.Server.Kestrel",
"System.Net.Http");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />

<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" />
<PackageReference Include="OpenTelemetry.Instrumentation.GrpcNetClient" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" />
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" />

<PackageReference Include="Microsoft.Extensions.Http.Resilience" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.Extensions.ServiceDiscovery.Dns\Microsoft.Extensions.ServiceDiscovery.Dns.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.Extensions.ServiceDiscovery\Microsoft.Extensions.ServiceDiscovery.csproj" />
</ItemGroup>


</Project>
42 changes: 42 additions & 0 deletions samples/KafkaBasic/KafkaBasic.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.9.34310.174
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.AppHost", "KafkaBasic.AppHost\KafkaBasic.AppHost.csproj", "{C0E6A5CB-D61D-4091-9F5E-81562E480C40}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.ServiceDefaults", "KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj", "{DE933720-1947-4920-A2E8-CB943D381634}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "Producer\Producer.csproj", "{45316E78-FF0A-4984-B303-F292BB3340C7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "Consumer\Consumer.csproj", "{6612601B-5912-4858-B23F-A2CC061C2918}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.Build.0 = Release|Any CPU
{DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.Build.0 = Release|Any CPU
{45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.Build.0 = Release|Any CPU
{6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3683F1C2-032E-43A3-93C0-3F79606377E4}
EndGlobalSection
EndGlobal
Loading