Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a blob container client factory for customisation
Browse files Browse the repository at this point in the history
Allows blob containers to be customised based on grainType and grain id. This is wrapped in an interface to allow injection of dependencies and state to be maintained.
Alex McAuliffe authored and Romanx committed Aug 11, 2022
1 parent c6733ec commit 04a705c
Showing 3 changed files with 86 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -23,10 +23,10 @@ namespace Orleans.Storage
/// </summary>
public class AzureBlobGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
{
private BlobContainerClient container;
private ILogger logger;
private readonly string name;
private AzureBlobStorageOptions options;
private readonly IBlobContainerFactory blobContainerFactory;
private IGrainStorageSerializer grainStorageSerializer;
private readonly IServiceProvider services;

@@ -35,11 +35,13 @@ public AzureBlobGrainStorage(
string name,
AzureBlobStorageOptions options,
IGrainStorageSerializer grainStorageSerializer,
IBlobContainerFactory blobContainerFactory,
IServiceProvider services,
ILogger<AzureBlobGrainStorage> logger)
{
this.name = name;
this.options = options;
this.blobContainerFactory = blobContainerFactory;
this.grainStorageSerializer = options.GrainStorageSerializer;
this.services = services;
this.logger = logger;
@@ -50,6 +52,8 @@ public AzureBlobGrainStorage(
public async Task ReadStateAsync<T>(string grainType, GrainReference grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.BuildContainerClient(grainType, grainId);

if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_Storage_Reading,
"Reading: GrainType={GrainType} Grainid={GrainId} ETag={ETag} from BlobName={BlobName} in Container={ContainerName}",
grainType,
@@ -145,6 +149,8 @@ private static string GetBlobName(string grainType, GrainReference grainId)
public async Task WriteStateAsync<T>(string grainType, GrainReference grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.BuildContainerClient(grainType, grainId);

try
{
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_Storage_Writing,
@@ -189,6 +195,8 @@ public async Task WriteStateAsync<T>(string grainType, GrainReference grainId, I
public async Task ClearStateAsync<T>(string grainType, GrainReference grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.BuildContainerClient(grainType, grainId);

try
{
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_ClearingData,
@@ -240,6 +248,8 @@ await DoOptimisticUpdate(() => blob.DeleteIfExistsAsync(DeleteSnapshotsOption.No

private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType, GrainReference grainId, IGrainState<T> grainState, BinaryData contents, string mimeType, BlobClient blob)
{
var container = this.blobContainerFactory.BuildContainerClient(grainType, grainId);

try
{
var conditions = string.IsNullOrEmpty(grainState.ETag)
@@ -309,8 +319,7 @@ private async Task Init(CancellationToken ct)
}

var client = await createClient();
container = client.GetBlobContainerClient(this.options.ContainerName);
await container.CreateIfNotExistsAsync().ConfigureAwait(false);
await this.blobContainerFactory.Init(client);
stopWatch.Stop();
this.logger.LogInformation((int)AzureProviderErrorCode.AzureBlobProvider_InitProvider,
"Initializing provider {ProviderName} of type {ProviderType} in stage {Stage} took {ElapsedMilliseconds} Milliseconds.",
@@ -351,7 +360,11 @@ public static class AzureBlobGrainStorageFactory
public static IGrainStorage Create(IServiceProvider services, string name)
{
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<AzureBlobStorageOptions>>();
return ActivatorUtilities.CreateInstance<AzureBlobGrainStorage>(services, name, optionsMonitor.Get(name));
var options = optionsMonitor.Get(name);

var containerFactory = options.BuildContainerFactory(services, options);

return ActivatorUtilities.CreateInstance<AzureBlobGrainStorage>(services, name, containerFactory);
}
}
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
using Azure.Core;
using Azure.Storage;
using Azure.Storage.Blobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Orleans.Persistence.AzureStorage;
using Orleans.Runtime;
@@ -36,7 +38,13 @@ public class AzureBlobStorageOptions : IStorageProviderSerializerOptions
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;

/// <inheritdoc/>
public IGrainStorageSerializer GrainStorageSerializer { get; set;}
public IGrainStorageSerializer GrainStorageSerializer { get; set; }

/// <summary>
/// A function for building container factory instances
/// </summary>
public Func<IServiceProvider, AzureBlobStorageOptions, IBlobContainerFactory> BuildContainerFactory { get; set; }
= static (provider, options) => ActivatorUtilities.CreateInstance<DefaultBlobContainerFactory>(provider, options);

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using a connection string.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Orleans.Configuration;
using Orleans.Runtime;

namespace Orleans.Storage;

/// <summary>
/// A factory for building container clients for blob storage using grainType and grainId
/// </summary>
public interface IBlobContainerFactory
{
/// <summary>
/// Build a container for the specific grain type and grain id
/// </summary>
/// <param name="grainType">The grain type</param>
/// <param name="grainId">The grain id</param>
/// <returns>A configured blob client</returns>
public BlobContainerClient BuildContainerClient(string grainType, GrainReference grainId);

/// <summary>
/// Initialize any required dependenceis using the provided client and options
/// </summary>
/// <param name="client">The connected blob client</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public Task Init(BlobServiceClient client);
}

/// <summary>
/// A default blob container factory that uses the default container name
/// </summary>
internal class DefaultBlobContainerFactory : IBlobContainerFactory
{
private readonly AzureBlobStorageOptions _options;
private BlobContainerClient _defaultContainer = null!;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultBlobContainerFactory"/> class.
/// </summary>
/// <param name="options">The blob storage options</param>
public DefaultBlobContainerFactory(AzureBlobStorageOptions options)
{
_options = options;
}

/// <inheritdoc/>
public BlobContainerClient BuildContainerClient(string grainType, GrainReference grainId)
=> _defaultContainer;

/// <inheritdoc/>
public async Task Init(BlobServiceClient client)
{
_defaultContainer = client.GetBlobContainerClient(_options.ContainerName);
await _defaultContainer.CreateIfNotExistsAsync();
}
}

0 comments on commit 04a705c

Please sign in to comment.