Skip to content

Commit

Permalink
Merge pull request #1163 from stu-mck/allowPreProvisionedAWSClients
Browse files Browse the repository at this point in the history
Enable supply of pre-provisoned AWS clients
  • Loading branch information
danielgerlag authored May 9, 2023
2 parents 9cb9cbe + e1829f8 commit b4e06ac
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 23 deletions.
12 changes: 12 additions & 0 deletions src/providers/WorkflowCore.Providers.AWS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ services.AddWorkflow(cfg =>
If any AWS resources do not exists, they will be automatcially created. By default, all DynamoDB tables and indexes will be provisioned with a throughput of 1, you can modify these values from the AWS console.
You may also specify a prefix for the dynamo table names.

If you have a preconfigured dynamoClient, you can pass this in instead of the credentials and config
```C#
var client = new AmazonDynamoDBClient();
var sqsClient = new AmazonSQSClient();
services.AddWorkflow(cfg =>
{
cfg.UseAwsDynamoPersistenceWithProvisionedClient(client, "table-prefix");
cfg.UseAwsDynamoLockingWithProvisionedClient(client, "workflow-core-locks");
cfg.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, "queues-prefix");
});
```


## Usage (Kinesis)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using Amazon;
using Amazon.DynamoDBv2;
using Amazon.Kinesis;
using Amazon.Runtime;
using Amazon.SQS;
using Microsoft.Extensions.Logging;
Expand All @@ -15,28 +16,55 @@ public static class ServiceCollectionExtensions
{
public static WorkflowOptions UseAwsSimpleQueueService(this WorkflowOptions options, AWSCredentials credentials, AmazonSQSConfig config, string queuesPrefix = "workflowcore")
{
options.UseQueueProvider(sp => new SQSQueueProvider(credentials, config, sp.GetService<ILoggerFactory>(), queuesPrefix));
var sqsClient = new AmazonSQSClient(credentials, config);
return options.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, queuesPrefix);
}

public static WorkflowOptions UseAwsSimpleQueueServiceWithProvisionedClient(this WorkflowOptions options, AmazonSQSClient sqsClient, string queuesPrefix = "workflowcore")
{
options.UseQueueProvider(sp => new SQSQueueProvider(sqsClient, sp.GetService<ILoggerFactory>(), queuesPrefix));
return options;
}

public static WorkflowOptions UseAwsDynamoLocking(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName)
{
options.UseDistributedLockManager(sp => new DynamoLockProvider(credentials, config, tableName, sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
var dbClient = new AmazonDynamoDBClient(credentials, config);
return options.UseAwsDynamoLockingWithProvisionedClient(dbClient, tableName);
}

public static WorkflowOptions UseAwsDynamoLockingWithProvisionedClient (this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tableName)
{
options.UseDistributedLockManager(sp => new DynamoLockProvider(dynamoClient, tableName, sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
return options;
}

public static WorkflowOptions UseAwsDynamoPersistence(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix)
{
options.Services.AddTransient<IDynamoDbProvisioner>(sp => new DynamoDbProvisioner(credentials, config, tablePrefix, sp.GetService<ILoggerFactory>()));
options.UsePersistence(sp => new DynamoPersistenceProvider(credentials, config, sp.GetService<IDynamoDbProvisioner>(), tablePrefix, sp.GetService<ILoggerFactory>()));
var dbClient = new AmazonDynamoDBClient(credentials, config);
return options.UseAwsDynamoPersistenceWithProvisionedClient(dbClient, tablePrefix);
}

public static WorkflowOptions UseAwsDynamoPersistenceWithProvisionedClient(this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tablePrefix)
{
options.Services.AddTransient<IDynamoDbProvisioner>(sp => new DynamoDbProvisioner(dynamoClient, tablePrefix, sp.GetService<ILoggerFactory>()));
options.UsePersistence(sp => new DynamoPersistenceProvider(dynamoClient, sp.GetService<IDynamoDbProvisioner>(), tablePrefix, sp.GetService<ILoggerFactory>()));
return options;
}

public static WorkflowOptions UseAwsKinesis(this WorkflowOptions options, AWSCredentials credentials, RegionEndpoint region, string appName, string streamName)
{
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(credentials, region, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(credentials, region, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
options.UseEventHub(sp => new KinesisProvider(credentials, region, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
var kinesisClient = new AmazonKinesisClient(credentials, region);
var dynamoClient = new AmazonDynamoDBClient(credentials, region);

return options.UseAwsKinesisWithProvisionedClients(kinesisClient, dynamoClient,appName, streamName);

}

public static WorkflowOptions UseAwsKinesisWithProvisionedClients(this WorkflowOptions options, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDbClient, string appName, string streamName)
{
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(dynamoDbClient, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(kinesisClient, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
options.UseEventHub(sp => new KinesisProvider(kinesisClient, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
return options;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public class DynamoDbProvisioner : IDynamoDbProvisioner
private readonly IAmazonDynamoDB _client;
private readonly string _tablePrefix;

public DynamoDbProvisioner(AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix, ILoggerFactory logFactory)
public DynamoDbProvisioner(AmazonDynamoDBClient dynamoDBClient, string tablePrefix, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger<DynamoDbProvisioner>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_tablePrefix = tablePrefix;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class DynamoLockProvider : IDistributedLockProvider
private readonly AutoResetEvent _mutex = new AutoResetEvent(true);
private readonly IDateTimeProvider _dateTimeProvider;

public DynamoLockProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
public DynamoLockProvider(AmazonDynamoDBClient dynamoDBClient, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
{
_logger = logFactory.CreateLogger<DynamoLockProvider>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_localLocks = new List<string>();
_tableName = tableName;
_nodeId = Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public class DynamoPersistenceProvider : IPersistenceProvider

public bool SupportsScheduledCommands => false;

public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory)
public DynamoPersistenceProvider(AmazonDynamoDBClient dynamoDBClient, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger<DynamoPersistenceProvider>();
_client = new AmazonDynamoDBClient(credentials, config);
_client = dynamoDBClient;
_tablePrefix = tablePrefix;
_provisioner = provisioner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ public class KinesisProvider : ILifeCycleEventHub
private readonly int _defaultShardCount = 1;
private bool _started = false;

public KinesisProvider(AWSCredentials credentials, RegionEndpoint region, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory)
public KinesisProvider(AmazonKinesisClient kinesisClient, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger(GetType());
_appName = appName;
_streamName = streamName;
_consumer = consumer;
_serializer = new JsonSerializer();
_serializer.TypeNameHandling = TypeNameHandling.All;
_client = new AmazonKinesisClient(credentials, region);
_client = kinesisClient;
}

public async Task PublishNotification(LifeCycleEvent evt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public class KinesisStreamConsumer : IKinesisStreamConsumer, IDisposable
private ICollection<ShardSubscription> _subscribers = new HashSet<ShardSubscription>();
private readonly IDateTimeProvider _dateTimeProvider;

public KinesisStreamConsumer(AWSCredentials credentials, RegionEndpoint region, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
public KinesisStreamConsumer(AmazonKinesisClient kinesisClient, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
{
_logger = logFactory.CreateLogger(GetType());
_tracker = tracker;
_lockManager = lockManager;
_client = new AmazonKinesisClient(credentials, region);
_client = kinesisClient;
_processTask = new Task(Process);
_processTask.Start();
_dateTimeProvider = dateTimeProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ public class KinesisTracker : IKinesisTracker
private readonly string _tableName;
private bool _tableConfirmed = false;

public KinesisTracker(AWSCredentials credentials, RegionEndpoint region, string tableName, ILoggerFactory logFactory)
public KinesisTracker(AmazonDynamoDBClient client, string tableName, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger(GetType());
_client = new AmazonDynamoDBClient(credentials, region);
_client = client;
_tableName = tableName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class SQSQueueProvider : IQueueProvider

public bool IsDequeueBlocking => true;

public SQSQueueProvider(AWSCredentials credentials, AmazonSQSConfig config, ILoggerFactory logFactory, string queuesPrefix)
public SQSQueueProvider(AmazonSQSClient sqsClient, ILoggerFactory logFactory, string queuesPrefix)
{
_logger = logFactory.CreateLogger<SQSQueueProvider>();
_client = new AmazonSQSClient(credentials, config);
_client = sqsClient;
_queuesPrefix = queuesPrefix;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ protected override IPersistenceProvider Subject
if (_subject == null)
{
var cfg = new AmazonDynamoDBConfig { ServiceURL = DynamoDbDockerSetup.ConnectionString };
var provisioner = new DynamoDbProvisioner(DynamoDbDockerSetup.Credentials, cfg, "unittests", new LoggerFactory());
var client = new DynamoPersistenceProvider(DynamoDbDockerSetup.Credentials, cfg, provisioner, "unittests", new LoggerFactory());
var dbClient = new AmazonDynamoDBClient(DynamoDbDockerSetup.Credentials, cfg);
var provisioner = new DynamoDbProvisioner(dbClient, "unittests", new LoggerFactory());
var client = new DynamoPersistenceProvider(dbClient, provisioner, "unittests", new LoggerFactory());
client.EnsureStoreExists();
_subject = client;
}
Expand Down

0 comments on commit b4e06ac

Please sign in to comment.