Skip to content

feat: add clickhouse support #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cnblogs.Architecture.sln
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cnblogs.Architecture.Ddd.In
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cnblogs.Architecture.TestShared", "test\Cnblogs.Architecture.TestShared\Cnblogs.Architecture.TestShared.csproj", "{3B22F0CC-9A61-4D95-8ED9-F41B7FCBFC6F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse", "src\Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse\Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse.csproj", "{73665E32-3D10-4F71-B893-4C65F36332D0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse", "src\Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse\Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse.csproj", "{4BD98FBF-FB98-4172-B352-BB7BF8761FCB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -94,6 +98,8 @@ Global
{E7ABC399-AF71-46A6-A4D4-A38972BC7D50} = {D3A6DF01-017E-4088-936C-B3791F41DF53}
{A6A8FDC5-20E7-4776-9CB6-A2E43DCCBE7B} = {D3A6DF01-017E-4088-936C-B3791F41DF53}
{3B22F0CC-9A61-4D95-8ED9-F41B7FCBFC6F} = {772497F8-2CB1-4EA6-AEB8-482C3ECD0A9D}
{73665E32-3D10-4F71-B893-4C65F36332D0} = {D3A6DF01-017E-4088-936C-B3791F41DF53}
{4BD98FBF-FB98-4172-B352-BB7BF8761FCB} = {D3A6DF01-017E-4088-936C-B3791F41DF53}
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{54D9D850-1CFC-485E-97FE-87F41C220523}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -200,5 +206,13 @@ Global
{3B22F0CC-9A61-4D95-8ED9-F41B7FCBFC6F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3B22F0CC-9A61-4D95-8ED9-F41B7FCBFC6F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3B22F0CC-9A61-4D95-8ED9-F41B7FCBFC6F}.Release|Any CPU.Build.0 = Release|Any CPU
{73665E32-3D10-4F71-B893-4C65F36332D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{73665E32-3D10-4F71-B893-4C65F36332D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{73665E32-3D10-4F71-B893-4C65F36332D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{73665E32-3D10-4F71-B893-4C65F36332D0}.Release|Any CPU.Build.0 = Release|Any CPU
{4BD98FBF-FB98-4172-B352-BB7BF8761FCB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4BD98FBF-FB98-4172-B352-BB7BF8761FCB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4BD98FBF-FB98-4172-B352-BB7BF8761FCB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4BD98FBF-FB98-4172-B352-BB7BF8761FCB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse;

/// <summary>
/// The collection for clickhouse contexts.
/// </summary>
public class ClickhouseContextCollection
{
internal List<Type> ContextTypes { get; } = new();

internal void Add<TContext>()
{
ContextTypes.Add(typeof(TContext));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Data;
using ClickHouse.Client.ADO;
using Cnblogs.Architecture.Ddd.Infrastructure.Dapper;

namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse;

/// <summary>
/// Clickhouse connection factory.
/// </summary>
public class ClickhouseDbConnectionFactory : IDbConnectionFactory
{
private readonly string _connectionString;

/// <summary>
/// Create a clickhouse connection factory.
/// </summary>
/// <param name="connectionString">The connection string.</param>
public ClickhouseDbConnectionFactory(string connectionString)
{
_connectionString = connectionString;
}

/// <inheritdoc />
public IDbConnection CreateDbConnection()
{
return new ClickHouseConnection(_connectionString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;

namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse;

/// <summary>
/// Hosed service to initialize clickhouse contexts.
/// </summary>
public class ClickhouseInitializeHostedService : IHostedService
{
private readonly ClickhouseContextCollection _collection;
private readonly IServiceProvider _serviceProvider;

/// <summary>
/// Create a <see cref="ClickhouseInitializeHostedService"/>.
/// </summary>
/// <param name="collections">The contexts been registered.</param>
/// <param name="serviceProvider">The provider for contexts.</param>
public ClickhouseInitializeHostedService(
IOptions<ClickhouseContextCollection> collections,
IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_collection = collections.Value;
}

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
foreach (var collectionContextType in _collection.ContextTypes)
{
var context = scope.ServiceProvider.GetRequiredService(collectionContextType) as ClickhouseDapperContext;
context?.Init();
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>
Provides clickhouse dapper integration.
</Description>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Cnblogs.Architecture.Ddd.Cqrs.Dapper\Cnblogs.Architecture.Ddd.Cqrs.Dapper.csproj" />
<ProjectReference Include="..\Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse\Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Cnblogs.Architecture.Ddd.Cqrs.Dapper.Clickhouse;
using Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper;

/// <summary>
/// Extension methods for inject clickhouse to dapper context.
/// </summary>
public static class DapperConfigurationBuilderExtension
{
/// <summary>
/// Use clickhouse as the underlying database.
/// </summary>
/// <param name="builder"><see cref="DapperConfigurationBuilder{TContext}"/>.</param>
/// <param name="connectionString">The connection string for clickhouse.</param>
/// <typeparam name="TContext">The context type been used.</typeparam>
public static void UseClickhouse<TContext>(
this DapperConfigurationBuilder<TContext> builder,
string connectionString)
where TContext : ClickhouseDapperContext
{
builder.UseDbConnectionFactory(new ClickhouseDbConnectionFactory(connectionString));
builder.Services.AddSingleton(new ClickhouseContextOptions<TContext>(connectionString));
builder.Services.Configure<ClickhouseContextCollection>(x => x.Add<TContext>());
builder.Services.AddHostedService<ClickhouseInitializeHostedService>();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
using Cnblogs.Architecture.Ddd.Infrastructure.Dapper;
using Cnblogs.Architecture.Ddd.Cqrs.Dapper.SqlServer;
using Cnblogs.Architecture.Ddd.Infrastructure.Dapper;

namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper.SqlServer;
// ReSharper disable once CheckNamespace
namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper;

/// <summary>
/// 用于配置 Dapper Configuration 的扩展方法。
/// Extension methods to configure dapper context.
/// </summary>
public static class DapperConfigurationBuilderExtension
{
/// <summary>
/// 使用 SqlServer 配置 <see cref="DapperContext"/>
/// Configure <see cref="DapperContext"/> to use sql server as underlying database.
/// </summary>
/// <param name="builder"><see cref="DapperConfigurationBuilder"/></param>
/// <param name="connectionString">连接字符串。</param>
public static void UseSqlServer(this DapperConfigurationBuilder builder, string connectionString)
/// <param name="builder"><see cref="DapperConfigurationBuilder{TContext}"/></param>
/// <param name="connectionString">The connection string for sql server.</param>
/// <typeparam name="TContext">The type of context been configured.</typeparam>
public static void UseSqlServer<TContext>(this DapperConfigurationBuilder<TContext> builder, string connectionString)
where TContext : DapperContext
{
builder.UseDbConnectionFactory(new SqlServerDbConnectionFactory(connectionString));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ namespace Cnblogs.Architecture.Ddd.Cqrs.Dapper;
/// <summary>
/// Dapper 配置类。
/// </summary>
public class DapperConfigurationBuilder
/// <typeparam name="TContext">The context type been configured.</typeparam>
public class DapperConfigurationBuilder<TContext>
where TContext : DapperContext
{
private readonly IServiceCollection _services;
private readonly string _dapperContextTypeName;

/// <summary>
/// 创建一个 DapperConfigurationBuilder。
/// </summary>
/// <param name="dapperContextTypeName">正在配置的 DapperContext 名称。</param>
/// <param name="services"><see cref="ServiceCollection"/></param>
public DapperConfigurationBuilder(string dapperContextTypeName, IServiceCollection services)
public DapperConfigurationBuilder(IServiceCollection services)
{
_dapperContextTypeName = dapperContextTypeName;
_services = services;
_dapperContextTypeName = typeof(TContext).Name;
Services = services;
}

/// <summary>
Expand All @@ -31,7 +31,12 @@ public DapperConfigurationBuilder(string dapperContextTypeName, IServiceCollecti
public void UseDbConnectionFactory<TFactory>(TFactory factory)
where TFactory : IDbConnectionFactory
{
_services.Configure<DbConnectionFactoryCollection>(
Services.Configure<DbConnectionFactoryCollection>(
c => c.AddDbConnectionFactory(_dapperContextTypeName, factory));
}
}

/// <summary>
/// The underlying <see cref="IServiceCollection"/>.
/// </summary>
public IServiceCollection Services { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public static class ServiceCollectionInjector
/// <param name="services"><see cref="ServiceCollection"/>。</param>
/// <typeparam name="TContext"><see cref="DapperContext"/> 的实现类型。</typeparam>
/// <returns></returns>
public static DapperConfigurationBuilder AddDapperContext<TContext>(this IServiceCollection services)
public static DapperConfigurationBuilder<TContext> AddDapperContext<TContext>(this IServiceCollection services)
where TContext : DapperContext
{
services.AddScoped<TContext>();
return new DapperConfigurationBuilder(typeof(TContext).Name, services);
return new DapperConfigurationBuilder<TContext>(services);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse;

/// <summary>
/// The options for clickhouse context.
/// </summary>
/// <typeparam name="TContext">The type of <see cref="ClickhouseDapperContext"/> been configured.</typeparam>
public class ClickhouseContextOptions<TContext> : ClickhouseContextOptions
where TContext : ClickhouseDapperContext
{
/// <summary>
/// Create a <see cref="ClickhouseContextOptions{TContext}"/> with given connection string.
/// </summary>
/// <param name="connectionString">The connection string for clickhouse.</param>
public ClickhouseContextOptions(string connectionString)
: base(connectionString)
{
}
}

/// <summary>
/// The options for <see cref="ClickhouseDapperContext"/>.
/// </summary>
public class ClickhouseContextOptions
{
private readonly Dictionary<Type, ClickhouseEntityConfiguration> _entityConfigurations = new();

internal ClickhouseContextOptions(string connectionString)
{
ConnectionString = connectionString;
}

internal string ConnectionString { get; }

internal void Add(Type type, ClickhouseEntityConfiguration configuration)
{
_entityConfigurations.Add(type, configuration);
}

internal ClickhouseEntityConfiguration? GetConfiguration<T>()
{
return _entityConfigurations.GetValueOrDefault(typeof(T));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using ClickHouse.Client.Copy;
using Microsoft.Extensions.Options;

namespace Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse;

/// <summary>
/// DapperContext that specialized for clickhouse.
/// </summary>
public abstract class ClickhouseDapperContext : DapperContext
{
private readonly ClickhouseContextOptions _options;

/// <summary>
/// Create a <see cref="ClickhouseDapperContext"/>.
/// </summary>
/// <param name="dbConnectionFactoryCollection">The underlying <see cref="IDbConnectionFactory"/> collection.</param>
/// <param name="options">The options used for this context.</param>
protected ClickhouseDapperContext(
IOptions<DbConnectionFactoryCollection> dbConnectionFactoryCollection,
ClickhouseContextOptions options)
: base(dbConnectionFactoryCollection)
{
_options = options;
}

/// <summary>
/// Init context, register models, etc.
/// </summary>
public void Init()
{
var builder = new ClickhouseModelCollectionBuilder();
ConfigureModels(builder);
builder.Build(_options);
}

/// <summary>
/// Configure models that related to this context.
/// </summary>
/// <param name="builder"><see cref="ClickhouseModelCollectionBuilder"/>.</param>
protected abstract void ConfigureModels(ClickhouseModelCollectionBuilder builder);

/// <summary>
/// Bulk write entities to clickhouse.
/// </summary>
/// <param name="entities">The entity to be written.</param>
/// <typeparam name="T">The type of entity.</typeparam>
/// <exception cref="InvalidOperationException">Throw when <typeparamref name="T"/> is not registered.</exception>
public async Task BulkWriteAsync<T>(IEnumerable<T> entities)
where T : class
{
var configuration = _options.GetConfiguration<T>();
if (configuration is null)
{
throw new InvalidOperationException(
$"The model type {typeof(T).Name} is not registered, make sure you have called builder.Entity<T>() at ConfigureModels()");
}

using var bulkCopyInterface = new ClickHouseBulkCopy(_options.ConnectionString)
{
DestinationTableName = configuration.TableName
};

var objs = entities.Select(x => configuration.ToObjectArray(x));
await bulkCopyInterface.WriteToServerAsync(objs, configuration.ColumnNames);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Reflection;

namespace Cnblogs.Architecture.Ddd.Infrastructure.Dapper.Clickhouse;

internal record ClickhouseEntityConfiguration(string TableName, PropertyInfo[] Properties, string[] ColumnNames)
{
internal object?[] ToObjectArray(object entity)
{
return Properties.Select(x => x.GetValue(entity)).ToArray();
}
}
Loading