Skip to content

Commit

Permalink
feat: changes how to serve client app files
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch authored and dougolima committed Jun 14, 2021
1 parent f13157a commit b9d4e0a
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 140 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ jobs:
with:
dotnet-version: 3.1.403

- name: Use Node 14.x
uses: actions/setup-node@v1
with:
node-version: '14.x'

- name: Build
run: dotnet build ./src/KafkaFlow.sln -c Release

Expand Down
1 change: 0 additions & 1 deletion samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public void ConfigureServices(IServiceCollection services)
);

services
.AddKafkaFlowDashboard()
.AddControllers();
}

Expand Down
24 changes: 22 additions & 2 deletions src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
namespace KafkaFlow.Admin.Dashboard
{
using System.Reflection;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.FileProviders;

/// <summary>
/// Extension methods over IApplicationBuilder
Expand All @@ -14,8 +17,25 @@ public static class ApplicationBuilderExtensions
/// <returns></returns>
public static IApplicationBuilder UseKafkaFlowDashboard(this IApplicationBuilder app)
{
app
.UseMiddleware<ServeClientFilesMiddleware>();
app.Map(
"/kafka-flow",
builder =>
{
var provider = new ManifestEmbeddedFileProvider(
Assembly.GetAssembly(typeof(ApplicationBuilderExtensions)),
"ClientApp/dist");

builder.UseStaticFiles(new StaticFileOptions { FileProvider = provider });

builder.Run(
async context =>
{
if (context.Request.Path == "/")
{
await context.Response.SendFileAsync(provider.GetFileInfo("index.html"));
}
});
});

return app;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
<TypeScriptCompileBlocked>true</TypeScriptCompileBlocked>
<TypeScriptToolsVersion>Latest</TypeScriptToolsVersion>
<IsPackable>true</IsPackable>
<OutputType>Library</OutputType>
<OutputType>Library</OutputType>
<GenerateEmbeddedFilesManifest>true</GenerateEmbeddedFilesManifest>
</PropertyGroup>

<ItemGroup>
Expand All @@ -14,4 +15,8 @@
<EmbeddedResource Include="ClientApp\dist\*.js" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.FileProviders.Embedded" Version="3.1.16" />
</ItemGroup>

</Project>
74 changes: 0 additions & 74 deletions src/KafkaFlow.Admin.Dashboard/ServeClientFilesMiddleware.cs

This file was deleted.

21 changes: 0 additions & 21 deletions src/KafkaFlow.Admin.Dashboard/ServiceCollectionExtensions.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
namespace KafkaFlow
{
using System;
using System.Linq;
using System.Reflection;
using KafkaFlow.Admin;
using KafkaFlow.Admin.Handlers;
using KafkaFlow.Admin.Messages;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Producers;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.Caching.Memory;
Expand All @@ -30,10 +26,8 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
string adminTopic,
string adminConsumerGroup)
{
cluster.DependencyConfigurator.AddSingleton<IAdminProducer, AdminProducer>();

cluster.DependencyConfigurator.AddSingleton<IMemoryCache, MemoryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryStorage, MemoryCacheTelemetryStorage>();
cluster.DependencyConfigurator
.AddSingleton<IAdminProducer, AdminProducer>();

return cluster
.AddProducer<AdminProducer>(
Expand Down Expand Up @@ -80,8 +74,10 @@ public static IClusterConfigurationBuilder EnableTelemetry(
string topicName,
string consumerGroup)
{
cluster.DependencyConfigurator.AddSingleton<IMemoryCache, MemoryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryStorage, MemoryCacheTelemetryStorage>();
cluster.DependencyConfigurator
.AddSingleton<IMemoryCache, MemoryCache>()
.AddSingleton<ITelemetryStorage, MemoryCacheTelemetryStorage>()
.AddSingleton<ITelemetryScheduler, TelemetryScheduler>();

var groupId = $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
var name = $"telemetry-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
Expand Down
68 changes: 37 additions & 31 deletions src/KafkaFlow.Admin/TelemetryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,18 @@ public void Start(string key, string topicName)
var consumers = this.dependencyResolver
.Resolve<IConsumerAccessor>()
.All
.Where(c => !c.ManagementDisabled &&
c.ClusterName.Equals(this.dependencyResolver
.Resolve<IConsumerAccessor>()[key]
.ClusterName));
.Where(
c => !c.ManagementDisabled &&
c.ClusterName.Equals(
this.dependencyResolver
.Resolve<IConsumerAccessor>()[key]
.ClusterName))
.ToList();

var producer = this.dependencyResolver.Resolve<IProducerAccessor>().GetProducer(key);

this.timers[key] = new Timer(
_ =>
{
producer.BatchProduceAsync(
consumers.SelectMany(c => c.Assignment.Select(a =>
new BatchProduceItem(
topicName,
Guid.NewGuid(),
new ConsumerMetric()
{
ConsumerName = c.ConsumerName,
Topic = a.Topic,
GroupId = c.GroupId,
InstanceName = $"{Environment.MachineName}-{c.MemberId}",
PausedPartitions = c.PausedPartitions
.Where(p => p.Topic == a.Topic)
.Select(p => p.Partition.Value),
RunningPartitions = c.RunningPartitions
.Where(p => p.Topic == a.Topic)
.Select(p => p.Partition.Value),
SentAt = DateTime.Now,
},
null)))
.ToList())
.GetAwaiter()
.GetResult();
},
_ => ProduceTelemetry(topicName, consumers, producer),
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(1));
Expand All @@ -72,5 +50,33 @@ public void Stop(string key)
this.timers.Remove(key);
}
}

private static void ProduceTelemetry(
string topicName,
IReadOnlyCollection<IMessageConsumer> consumers,
IMessageProducer producer)
{
var items = consumers.SelectMany(
c => c.Assignment.Select(
a => new ConsumerMetric()
{
ConsumerName = c.ConsumerName,
Topic = a.Topic,
GroupId = c.GroupId,
InstanceName = $"{Environment.MachineName}-{c.MemberId}",
PausedPartitions = c.PausedPartitions
.Where(p => p.Topic == a.Topic)
.Select(p => p.Partition.Value),
RunningPartitions = c.RunningPartitions
.Where(p => p.Topic == a.Topic)
.Select(p => p.Partition.Value),
SentAt = DateTime.Now,
}));

foreach (var item in items)
{
producer.Produce(topicName, Guid.NewGuid().ToByteArray(), item);
}
}
}
}
}

0 comments on commit b9d4e0a

Please sign in to comment.