diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bd27872e27..871957cf5e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
os: [windows-2022, ubuntu-22.04]
- test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence ]
+ test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL ]
include:
- os: windows-2022
os-name: Windows
@@ -65,7 +65,7 @@ jobs:
run: Import-Module ./deploy/PowerShellModules/Particular.ServiceControl.Management
- name: Azure login
uses: azure/login@v2.2.0
- if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ'
+ if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ' || matrix.test-category == 'PostgreSQL'
with:
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
- name: Setup SQL Server
@@ -74,6 +74,14 @@ jobs:
with:
connection-string-env-var: ServiceControl_TransportTests_SQL_ConnectionString
catalog: nservicebus
+ - name: Setup PostgreSQL
+ uses: Particular/setup-postgres-action@v1.0.0
+ if: matrix.test-category == 'PostgreSQL'
+ with:
+ connection-string-name: ServiceControl_TransportTests_PostgreSQL_ConnectionString
+ tag: ServiceControl
+ registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
+ registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Setup RabbitMQ
uses: Particular/setup-rabbitmq-action@v1.7.0
if: matrix.test-category == 'RabbitMQ'
diff --git a/.github/workflows/container-integration-test.yml b/.github/workflows/container-integration-test.yml
index fe8584bda7..fb9a48fc30 100644
--- a/.github/workflows/container-integration-test.yml
+++ b/.github/workflows/container-integration-test.yml
@@ -22,6 +22,11 @@ jobs:
connection-string: 'Server=mssql;Database=master;User=sa;Password=ServiceControl1!;Encrypt=False;'
compose-cmd: docker compose -f servicecontrol.yml -f mssql.yml up -d
expected-healthy-containers: 5
+ - name: postgresql
+ transport: PostgreSQL
+ connection-string: 'Host=postgres;Port=5432;Database=postgres;User ID=postgres;Password=ServiceControl1!;'
+ compose-cmd: docker compose -f servicecontrol.yml -f postgres.yml up -d
+ expected-healthy-containers: 5
- name: asb
transport: NetStandardAzureServiceBus
compose-cmd: docker compose -f servicecontrol.yml up -d
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index ed23d2fc40..528f39f947 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -46,8 +46,10 @@
-
-
+
+
+
+
diff --git a/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs b/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs
index b883fbac49..816dd605b2 100644
--- a/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs
+++ b/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs
@@ -117,7 +117,13 @@ public async Task UpdateUserIndicatorOnEndpoints(List userI
//if there are multiple sources of throughput for the endpoint, update them all
var existingEndpoints = GetAllConnectedEndpoints(e.Name);
- existingEndpoints.ForEach(u => u.UserIndicator = e.UserIndicator);
+ existingEndpoints.ForEach(u =>
+ {
+ u.UserIndicator = e.UserIndicator;
+ //for ones that matched on endpoint name, update matching on sanitizedName
+ var sanitizedMAtchingEndpoints = GetAllConnectedEndpoints(u.SanitizedName);
+ sanitizedMAtchingEndpoints.ForEach(s => s.UserIndicator = e.UserIndicator);
+ });
});
await Task.CompletedTask;
@@ -138,7 +144,7 @@ public async Task IsThereThroughputForLastXDaysForSource(int days, Through
endpointThroughput.Value.Any(t => t.Key >= DateOnly.FromDateTime(DateTime.UtcNow).AddDays(-days) && t.Key <= endDate)));
}
- List GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name).ToList();
+ List GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name || w.Id.Name == name).ToList();
public Task GetBrokerMetadata(CancellationToken cancellationToken) => Task.FromResult(brokerMetadata);
diff --git a/src/ProjectReferences.Transports.props b/src/ProjectReferences.Transports.props
index 71ce167f9d..c82221e628 100644
--- a/src/ProjectReferences.Transports.props
+++ b/src/ProjectReferences.Transports.props
@@ -7,6 +7,7 @@
+
diff --git a/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs b/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs
index 872dce0634..6aef55b25e 100644
--- a/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs
+++ b/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs
@@ -24,11 +24,11 @@ public async Task InitializeSettings()
var transportIntegration = new ConfigureEndpointLearningTransport();
settings = new Settings(
+ transportType: transportIntegration.TypeName,
forwardErrorMessages: false,
errorRetentionPeriod: TimeSpan.FromDays(1),
persisterType: "RavenDB")
{
- TransportType = transportIntegration.TypeName,
TransportConnectionString = transportIntegration.ConnectionString,
AssemblyLoadContextResolver = static _ => AssemblyLoadContext.Default
};
diff --git a/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs b/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs
index 27aa531580..d77d5758a9 100644
--- a/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs
+++ b/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs
@@ -88,5 +88,6 @@ public Task CreateTransportInfrastructure(string name,
OnError onError = null, Func onCriticalError = null,
TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) =>
throw new NotImplementedException();
+ public string ToTransportQualifiedQueueName(string queueName) => queueName;
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config
index 7497b67bcd..1c6276815e 100644
--- a/src/ServiceControl.Audit/App.config
+++ b/src/ServiceControl.Audit/App.config
@@ -17,6 +17,7 @@ These settings are only here so that we can debug ServiceControl while developin
+
@@ -46,5 +47,8 @@ These settings are only here so that we can debug ServiceControl while developin
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
index 6474f28e84..84d110a1f2 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
@@ -15,11 +15,10 @@
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure.Metrics;
+ using ServiceControl.Transports;
public class AuditIngestor
{
- static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
-
public AuditIngestor(
Metrics metrics,
Settings settings,
@@ -27,7 +26,8 @@ public AuditIngestor(
EndpointInstanceMonitoring endpointInstanceMonitoring,
IEnumerable auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container
IMessageSession messageSession,
- Lazy messageDispatcher
+ Lazy messageDispatcher,
+ ITransportCustomization transportCustomization
)
{
this.settings = settings;
@@ -49,14 +49,16 @@ Lazy messageDispatcher
new SagaRelationshipsEnricher()
}.Concat(auditEnrichers).ToArray();
+ logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
+
auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
}
public async Task Ingest(List contexts)
{
- if (log.IsDebugEnabled)
+ if (Log.IsDebugEnabled)
{
- log.Debug($"Ingesting {contexts.Count} message contexts");
+ Log.Debug($"Ingesting {contexts.Count} message contexts");
}
var stored = await auditPersister.Persist(contexts);
@@ -65,14 +67,14 @@ public async Task Ingest(List contexts)
{
if (settings.ForwardAuditMessages)
{
- if (log.IsDebugEnabled)
+ if (Log.IsDebugEnabled)
{
- log.Debug($"Forwarding {stored.Count} messages");
+ Log.Debug($"Forwarding {stored.Count} messages");
}
- await Forward(stored, settings.AuditLogQueue);
- if (log.IsDebugEnabled)
+ await Forward(stored, logQueueAddress);
+ if (Log.IsDebugEnabled)
{
- log.Debug("Forwarded messages");
+ Log.Debug("Forwarded messages");
}
}
@@ -83,9 +85,9 @@ public async Task Ingest(List contexts)
}
catch (Exception e)
{
- if (log.IsWarnEnabled)
+ if (Log.IsWarnEnabled)
{
- log.Warn("Forwarding messages failed", e);
+ Log.Warn("Forwarding messages failed", e);
}
// making sure to rethrow so that all messages get marked as failed
@@ -140,7 +142,7 @@ public async Task VerifyCanReachForwardingAddress()
new TransportOperation(
new OutgoingMessage(Guid.Empty.ToString("N"),
[], Array.Empty()),
- new UnicastAddressTag(settings.AuditLogQueue)
+ new UnicastAddressTag(logQueueAddress)
)
);
@@ -155,7 +157,9 @@ public async Task VerifyCanReachForwardingAddress()
readonly AuditPersister auditPersister;
readonly Settings settings;
readonly Lazy messageDispatcher;
+ readonly string logQueueAddress;
- static readonly ILog log = LogManager.GetLogger();
+ static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
+ static readonly ILog Log = LogManager.GetLogger();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs
index 513ac8486f..869fc03cda 100644
--- a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs
+++ b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs
@@ -44,7 +44,7 @@ public static void Configure(Settings.Settings settings, ITransportCustomization
routing.RouteToEndpoint(typeof(RegisterNewEndpoint), serviceControlLogicalQueue);
routing.RouteToEndpoint(typeof(MarkMessageFailureResolvedByRetry), serviceControlLogicalQueue);
- configuration.ReportCustomChecksTo(settings.ServiceControlQueueAddress);
+ configuration.ReportCustomChecksTo(transportCustomization.ToTransportQualifiedQueueName(settings.ServiceControlQueueAddress));
}
configuration.GetSettings().Set(settings.LoggingSettings);
diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
index be61d7b028..c14bc525b5 100644
--- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
+++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
@@ -43,9 +43,9 @@ public Settings(string transportType = null, string persisterType = null, Loggin
{
Hostname = SettingsReader.Read(SettingsRootNamespace, "Hostname", "localhost");
Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444);
- }
+ };
- MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 32);
+ MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultAuditMaximumConcurrencyLevel ?? 32);
ServiceControlQueueAddress = SettingsReader.Read(SettingsRootNamespace, "ServiceControlQueueAddress");
TimeToRestartAuditIngestionAfterFailure = GetTimeToRestartAuditIngestionAfterFailure();
EnableFullTextSearchOnBodies = SettingsReader.Read(SettingsRootNamespace, "EnableFullTextSearchOnBodies", true);
diff --git a/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs
index 72a5b27129..fcb0e36b9c 100644
--- a/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs
+++ b/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs
@@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});
- if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
+ if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
diff --git a/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs
index 7045fcc9d5..5acd682d02 100644
--- a/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs
+++ b/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs
@@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});
- if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
+ if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs
index 1c5e337f01..ff6e2e98cf 100644
--- a/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs
+++ b/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs
@@ -68,7 +68,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});
- if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
+ if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs
index 137aba3348..12830edf11 100644
--- a/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs
+++ b/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs
@@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});
- if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
+ if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs
index 629a0d2c15..af2e19a9ef 100644
--- a/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs
+++ b/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs
@@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});
- if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
+ if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
diff --git a/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs
index d60c32a5f0..d76f3400fc 100644
--- a/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs
+++ b/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs
@@ -108,7 +108,7 @@ public void Transport_cannot_be_empty_when_adding_audit_instance()
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_audit_instance(string transportInfoName)
{
var viewModel = new ServiceControlAddViewModel
@@ -126,7 +126,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void
Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_audit_instance(
string transportInfoName)
diff --git a/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs
index 362ac50802..534ba7f705 100644
--- a/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs
+++ b/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs
@@ -107,7 +107,7 @@ public void Transport_cannot_be_empty_when_adding_error_instance()
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_error_instance(
string transportInfoName)
{
@@ -127,7 +127,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_error_instance(
string transportInfoName)
{
diff --git a/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs
index afc6761b0d..42ccf0e6dd 100644
--- a/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs
+++ b/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs
@@ -366,7 +366,7 @@ public void Transport_cannot_be_empty_when_adding_monitoring_instance()
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_monitoring_instance(string transportInfoName)
{
@@ -384,7 +384,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_monitoring_instance(
string transportInfoName)
{
diff --git a/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs
index 3e5a55ede7..73abc4d6d2 100644
--- a/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs
+++ b/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs
@@ -10,7 +10,7 @@ public class EditAuditInstanceValidationTests
{
#region transport
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_editing_audit_instance(
string transportInfoName)
{
@@ -28,7 +28,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_editing_audit_instance(
string transportInfoName)
{
diff --git a/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs
index 63bfb30202..a1e744191a 100644
--- a/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs
+++ b/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs
@@ -10,7 +10,7 @@ public class EditErrorInstanceValidationTests
{
#region transport
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_editing_error_instance(
string transportInfoName)
{
@@ -28,7 +28,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}
- [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
+ [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_editing_error_instance(
string transportInfoName)
{
diff --git a/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs b/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs
index 6ab42fd97e..59ca54e8bd 100644
--- a/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs
+++ b/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs
@@ -26,7 +26,7 @@ class PerformanceTests : AcceptanceTest
retriesStore = new RetriesStore();
queueLengthStore = new QueueLengthStore();
- var settings = new Settings { EndpointUptimeGracePeriod = TimeSpan.FromMinutes(5) };
+ var settings = new Settings(transportType: "Unknown") { EndpointUptimeGracePeriod = TimeSpan.FromMinutes(5) };
activityTracker = new EndpointInstanceActivityTracker(settings, TimeProvider.System);
messageTypeRegistry = new MessageTypeRegistry();
diff --git a/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
index 2537e08268..debc3ed629 100644
--- a/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
+++ b/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
@@ -33,9 +33,8 @@ class ServiceControlComponentRunner(
async Task InitializeServiceControl(ScenarioContext context)
{
- settings = new Settings
+ settings = new Settings(transportType: transportToUse.TypeName)
{
- TransportType = transportToUse.TypeName,
ConnectionString = transportToUse.ConnectionString,
HttpHostName = "localhost",
OnMessage = (id, headers, body, @continue) =>
diff --git a/src/ServiceControl.Monitoring/App.config b/src/ServiceControl.Monitoring/App.config
index fad7edd774..577b757748 100644
--- a/src/ServiceControl.Monitoring/App.config
+++ b/src/ServiceControl.Monitoring/App.config
@@ -17,6 +17,8 @@ These settings are only here so that we can debug ServiceControl while developin
+
+
@@ -42,5 +44,8 @@ These settings are only here so that we can debug ServiceControl while developin
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs
index b338a90e03..dc14169d15 100644
--- a/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs
+++ b/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs
@@ -98,7 +98,7 @@ static void ConfigureEndpoint(EndpointConfiguration config, Func c.NumberOfRetries(3));
recoverability.Delayed(c => c.NumberOfRetries(0));
- config.SendFailedMessagesTo(settings.ErrorQueue);
+ config.SendFailedMessagesTo(transportCustomization.ToTransportQualifiedQueueName(settings.ErrorQueue));
config.DisableFeature();
diff --git a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs
index fe15116ac9..70606ed4c0 100644
--- a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs
+++ b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs
@@ -9,13 +9,16 @@
using NServiceBus.Metrics;
using NServiceBus.Unicast.Queuing;
using ServiceControl.Monitoring.Infrastructure.Api;
+ using ServiceControl.Transports;
- class ReportThroughputHostedService(ILogger logger, IMessageSession session, IEndpointMetricsApi endpointMetricsApi, Settings settings, TimeProvider timeProvider) : BackgroundService
+ class ReportThroughputHostedService(ILogger logger, IMessageSession session, IEndpointMetricsApi endpointMetricsApi, Settings settings, TimeProvider timeProvider, ITransportCustomization transportCustomization) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
logger.LogInformation($"Starting {nameof(ReportThroughputHostedService)}");
+ var serviceControlThroughputDataQueue = transportCustomization.ToTransportQualifiedQueueName(settings.ServiceControlThroughputDataQueue);
+
try
{
using PeriodicTimer timer = new(TimeSpan.FromMinutes(ReportSendingIntervalInMinutes), timeProvider);
@@ -24,7 +27,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
- await ReportOnThroughput(cancellationToken);
+ await ReportOnThroughput(serviceControlThroughputDataQueue, cancellationToken);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
@@ -45,7 +48,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
}
- async Task ReportOnThroughput(CancellationToken cancellationToken)
+ async Task ReportOnThroughput(string serviceControlThroughputDataQueue, CancellationToken cancellationToken)
{
var endpointData = endpointMetricsApi.GetAllEndpointsMetrics(ReportSendingIntervalInMinutes);
@@ -61,13 +64,17 @@ async Task ReportOnThroughput(CancellationToken cancellationToken)
for (int i = 0; i < endpointData.Length; i++)
{
var average = endpointData[i].Metrics["Throughput"]?.Average ?? 0;
- throughputData.EndpointThroughputData[i] = new EndpointThroughputData { Name = endpointData[i].Name, Throughput = Convert.ToInt64(average * ReportSendingIntervalInMinutes * 60) };
+ throughputData.EndpointThroughputData[i] = new EndpointThroughputData
+ {
+ Name = endpointData[i].Name,
+ Throughput = Convert.ToInt64(average * ReportSendingIntervalInMinutes * 60)
+ };
}
- await session.Send(settings.ServiceControlThroughputDataQueue, throughputData, cancellationToken);
+ await session.Send(serviceControlThroughputDataQueue, throughputData, cancellationToken);
}
}
- static int ReportSendingIntervalInMinutes = 5;
+ const int ReportSendingIntervalInMinutes = 5;
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Monitoring/Settings.cs b/src/ServiceControl.Monitoring/Settings.cs
index 8084e43d55..a9fe9a626c 100644
--- a/src/ServiceControl.Monitoring/Settings.cs
+++ b/src/ServiceControl.Monitoring/Settings.cs
@@ -12,14 +12,14 @@ namespace ServiceControl.Monitoring
public class Settings
{
- public Settings(LoggingSettings loggingSettings = null)
+ public Settings(LoggingSettings loggingSettings = null, string transportType = null)
{
LoggingSettings = loggingSettings ?? new(SettingsRootNamespace);
// Overwrite the instance name if it is specified in ENVVAR, reg, or config file
InstanceName = SettingsReader.Read(SettingsRootNamespace, "InstanceName", InstanceName);
- TransportType = SettingsReader.Read(SettingsRootNamespace, "TransportType");
+ TransportType = SettingsReader.Read(SettingsRootNamespace, "TransportType", transportType);
ConnectionString = GetConnectionString();
ErrorQueue = SettingsReader.Read(SettingsRootNamespace, "ErrorQueue", "error");
@@ -37,7 +37,7 @@ public Settings(LoggingSettings loggingSettings = null)
}
EndpointUptimeGracePeriod = TimeSpan.Parse(SettingsReader.Read(SettingsRootNamespace, "EndpointUptimeGracePeriod", "00:00:40"));
- MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 32);
+ MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultMonitoringMaximumConcurrencyLevel ?? 32);
ServiceControlThroughputDataQueue = SettingsReader.Read(SettingsRootNamespace, "ServiceControlThroughputDataQueue", "ServiceControl.ThroughputData");
AssemblyLoadContextResolver = static assemblyPath => new PluginAssemblyLoadContext(assemblyPath);
diff --git a/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs b/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs
index d39893fabd..005c9f1c09 100644
--- a/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs
+++ b/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs
@@ -186,14 +186,27 @@ public async Task UpdateUserIndicatorOnEndpoints(List userI
using IAsyncDocumentSession session = store.OpenAsyncSession(databaseConfiguration.Name);
var query = session.Query()
- .Where(document => document.SanitizedName.In(updates.Keys));
+ .Where(document => document.SanitizedName.In(updates.Keys) || document.EndpointId.Name.In(updates.Keys));
var documents = await query.ToListAsync(cancellationToken);
foreach (var document in documents)
{
- if (updates.TryGetValue(document.SanitizedName, out var newValue))
+ if (updates.TryGetValue(document.SanitizedName, out var newValueFromSanitizedName))
{
- document.UserIndicator = newValue;
+ document.UserIndicator = newValueFromSanitizedName;
+ }
+ else if (updates.TryGetValue(document.EndpointId.Name, out var newValueFromEndpoint))
+ {
+ document.UserIndicator = newValueFromEndpoint;
+ //update all that match this sanitized name
+ var sanitizedMatchingQuery = session.Query()
+ .Where(sanitizedDocument => sanitizedDocument.SanitizedName == document.SanitizedName && sanitizedDocument.EndpointId.Name != document.EndpointId.Name);
+ var sanitizedMatchingDocuments = await sanitizedMatchingQuery.ToListAsync(cancellationToken);
+
+ foreach (var matchingDocumentOnSanitizedName in sanitizedMatchingDocuments)
+ {
+ matchingDocumentOnSanitizedName.UserIndicator = newValueFromEndpoint;
+ }
}
}
diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs
index 664e19c33c..dcfec5888b 100644
--- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs
+++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs
@@ -1,9 +1,11 @@
namespace ServiceControl.Persistence.Tests
{
using System;
+ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NServiceBus.Transport;
using NUnit.Framework;
@@ -14,6 +16,7 @@
using ServiceControl.MessageFailures;
using ServiceControl.Persistence;
using ServiceControl.Recoverability;
+ using ServiceControl.Transports;
[NonParallelizable]
class RetryStateTests : PersistenceTestBase
@@ -279,7 +282,7 @@ class FakeApplicationLifetime : IHostApplicationLifetime
class TestReturnToSenderDequeuer : ReturnToSenderDequeuer
{
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName)
- : base(returnToSender, store, domainEvents, null, null, new Settings { InstanceName = endpointName })
+ : base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName })
{
}
@@ -289,6 +292,19 @@ public override Task Run(string forwardingBatchId, Predicate fil
}
}
+ public class TestTransportCustomization : ITransportCustomization
+ {
+ public void AddTransportForAudit(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
+ public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
+ public void AddTransportForPrimary(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
+ public Task CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func onCriticalError = null, NServiceBus.TransportTransactionMode preferredTransactionMode = NServiceBus.TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException();
+ public void CustomizeAuditEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
+ public void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
+ public void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
+ public Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues) => throw new NotImplementedException();
+ public string ToTransportQualifiedQueueName(string queueName) => queueName;
+ }
+
public class TestSender : IMessageDispatcher
{
public Action Callback { get; set; } = m => { };
diff --git a/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs b/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs
index 98168c3095..f677b03e8c 100644
--- a/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs
+++ b/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs
@@ -148,7 +148,7 @@ public async Task Should_not_add_endpoint_when_updating_user_indication()
}
[Test]
- public async Task Should_update_indicators_on_all_endpoint_sources()
+ public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_based_on_sanitized_name()
{
// Arrange
var userIndicator = "someIndicator";
@@ -173,6 +173,32 @@ public async Task Should_update_indicators_on_all_endpoint_sources()
Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator));
}
+ [Test]
+ public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_based_on_endpoint_name()
+ {
+ // Arrange
+ var userIndicator = "someIndicator";
+
+ var endpointAudit = new Endpoint("Endpoint1", ThroughputSource.Audit) { SanitizedName = "Endpoint1" };
+ var endpointMonitoring = new Endpoint("\"public\".\"Endpoint1\"", ThroughputSource.Monitoring) { SanitizedName = "Endpoint1" };
+
+ await LicensingDataStore.SaveEndpoint(endpointAudit, default);
+ await LicensingDataStore.SaveEndpoint(endpointMonitoring, default);
+
+ // Act
+ await LicensingDataStore.UpdateUserIndicatorOnEndpoints([new UpdateUserIndicator { Name = "\"public\".\"Endpoint1\"", UserIndicator = userIndicator }], default);
+
+ // Assert
+ var foundEndpointAudit = await LicensingDataStore.GetEndpoint("Endpoint1", ThroughputSource.Audit, default);
+ var foundEndpointMonitoring = await LicensingDataStore.GetEndpoint("\"public\".\"Endpoint1\"", ThroughputSource.Monitoring, default);
+
+ Assert.That(foundEndpointAudit, Is.Not.Null);
+ Assert.That(foundEndpointAudit.UserIndicator, Is.EqualTo(userIndicator));
+
+ Assert.That(foundEndpointMonitoring, Is.Not.Null);
+ Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator));
+ }
+
[TestCase(10, 5, false)]
[TestCase(10, 20, true)]
public async Task Should_correctly_report_throughput_existence_for_X_days(int daysSinceLastThroughputEntry, int timeFrameToCheck, bool expectedValue)
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig b/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig
new file mode 100644
index 0000000000..0279bdc2db
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig
@@ -0,0 +1,4 @@
+[*.cs]
+
+# Justification: Test project
+dotnet_diagnostic.CA2007.severity = none
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt
new file mode 100644
index 0000000000..ef7ee5cd4e
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt
@@ -0,0 +1,5 @@
+Connection settings to PostgreSql have some errors:
+PostgreSQL Connection String could not be parsed.
+
+Connection attempted with the following settings:
+ConnectionString set
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt
new file mode 100644
index 0000000000..88acd38c33
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt
@@ -0,0 +1,4 @@
+Connection test to PostgreSql was successful
+
+Connection settings used:
+ConnectionString set
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs b/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs
new file mode 100644
index 0000000000..f736f184bf
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs
@@ -0,0 +1,48 @@
+namespace ServiceControl.Transport.Tests;
+
+using Transports.PostgreSql;
+using NUnit.Framework;
+
+[TestFixture]
+class ConnectionStringExtensionsTests : TransportTestFixture
+{
+ [TestCase("table")]
+ [TestCase("schema.table")]
+ [TestCase("schema.my.table")]
+ public void ShouldParseSchemaFromSubscriptionTable(string customSubscriptionTableContainingSchema)
+ {
+ string connectionString = $"{configuration.ConnectionString};Subscriptions Table={customSubscriptionTableContainingSchema}";
+
+ _ = connectionString.RemoveCustomConnectionStringParts(out var _, out var subscriptionsTableSetting);
+ var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting);
+
+ Assert.That(subscriptionsAddress.Table, Is.Not.Null);
+
+ if (customSubscriptionTableContainingSchema.Contains("."))
+ {
+ Assert.That(subscriptionsAddress.Schema, Is.Not.Null);
+ Assert.That(subscriptionsAddress.Table, Is.EqualTo(customSubscriptionTableContainingSchema.Substring(customSubscriptionTableContainingSchema.IndexOf(".") + 1)));
+ Assert.That(subscriptionsAddress.Schema, Is.EqualTo(customSubscriptionTableContainingSchema.Substring(0, customSubscriptionTableContainingSchema.IndexOf("."))));
+ }
+ else
+ {
+ Assert.That(subscriptionsAddress.Schema, Is.Null);
+ Assert.That(subscriptionsAddress.Table, Is.EqualTo(customSubscriptionTableContainingSchema));
+ }
+ }
+
+ [TestCase("\"table\"")]
+ [TestCase("\"schema.table\"")]
+ [TestCase("\"schema.my.table\"")]
+ public void ShouldParseOnlyTableFromSubscriptionTableWhenEnclosedInQuotes(string customSubscriptionTableWithoutSchema)
+ {
+ string connectionString = $"{configuration.ConnectionString};Subscriptions Table={customSubscriptionTableWithoutSchema}";
+
+ _ = connectionString.RemoveCustomConnectionStringParts(out var _, out var subscriptionsTableSetting);
+ var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting);
+
+ Assert.That(subscriptionsAddress.Schema, Is.Null);
+ Assert.That(subscriptionsAddress.Table, Is.Not.Null);
+ Assert.That(subscriptionsAddress.Table, Is.EqualTo(PostgreSqlNameHelper.Unquote(customSubscriptionTableWithoutSchema)));
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs
new file mode 100644
index 0000000000..bee96d7921
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs
@@ -0,0 +1,119 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Time.Testing;
+using NUnit.Framework;
+using Particular.Approvals;
+using Transports;
+using Transports.PostgreSql;
+using Transports.BrokerThroughput;
+
+[TestFixture]
+class PostgreSqlQueryTests : TransportTestFixture
+{
+ FakeTimeProvider provider;
+ TransportSettings transportSettings;
+ PostgreSqlQuery query;
+
+ [SetUp]
+ public void Initialise()
+ {
+ provider = new();
+ provider.SetUtcNow(DateTimeOffset.UtcNow);
+ transportSettings = new TransportSettings
+ {
+ ConnectionString = configuration.ConnectionString,
+ MaxConcurrency = 1,
+ EndpointName = Guid.NewGuid().ToString("N")
+ };
+ query = new PostgreSqlQuery(NullLogger.Instance, provider, transportSettings);
+ }
+
+ [Test]
+ public async Task TestConnectionWithInvalidConnectionStringSettings()
+ {
+ using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+
+ var dictionary = new Dictionary
+ {
+ { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, "not valid" }
+ };
+ query.Initialize(new ReadOnlyDictionary(dictionary));
+ (bool success, List errors, string diagnostics) =
+ await query.TestConnection(cancellationTokenSource.Token);
+
+ Assert.That(success, Is.False);
+ Assert.That(errors.Single(), Is.EqualTo("PostgreSQL Connection String could not be parsed."));
+ Approver.Verify(diagnostics);
+ }
+
+ [Test]
+ public async Task TestConnectionWithValidSettings()
+ {
+ using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+
+ var dictionary = new Dictionary
+ {
+ { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString }
+ };
+ query.Initialize(new ReadOnlyDictionary(dictionary));
+ (bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token);
+
+ Assert.That(success, Is.True);
+ Approver.Verify(diagnostics);
+ }
+
+ [Test]
+ public async Task RunScenario()
+ {
+ // We need to wait a bit of time, because the scenario running takes on average 1 sec per run.
+ using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(50));
+ CancellationToken token = cancellationTokenSource.Token;
+ var dictionary = new Dictionary
+ {
+ { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString }
+ };
+
+ await CreateTestQueue(transportSettings.EndpointName);
+
+ query.Initialize(new ReadOnlyDictionary(dictionary));
+
+ var queueNames = new List();
+ await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
+ {
+ queueNames.Add(queueName);
+ }
+
+ IBrokerQueue queue = queueNames.Find(name => ((BrokerQueueTable)name).SanitizedName == transportSettings.EndpointName);
+ Assert.That(queue, Is.Not.Null);
+
+ long total = 0L;
+ using var reset = new ManualResetEventSlim();
+
+ var runScenarioAndAdvanceTime = Task.Run(async () =>
+ {
+ while (!reset.IsSet)
+ {
+ await SendAndReceiveMessages(transportSettings.EndpointName, 1);
+ provider.Advance(TimeSpan.FromHours(1));
+ }
+ }, token);
+
+ await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, new DateOnly(), token))
+ {
+ total += queueThroughput.TotalThroughput;
+ }
+
+ reset.Set();
+ await runScenarioAndAdvanceTime.WaitAsync(token);
+
+ // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion.
+ Assert.That(total, Is.EqualTo(23));
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj b/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj
new file mode 100644
index 0000000000..7a3acb266d
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj
@@ -0,0 +1,32 @@
+
+
+
+ net8.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs b/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs
new file mode 100644
index 0000000000..aaa9ec7659
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs
@@ -0,0 +1 @@
+[assembly: IncludeInPostgreSqlTests()]
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs b/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs
new file mode 100644
index 0000000000..819248c612
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs
@@ -0,0 +1,31 @@
+namespace ServiceControl.Transport.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using ServiceControl.Transports.PostgreSql;
+ using Transports;
+
+ partial class TransportTestsConfiguration
+ {
+ public string ConnectionString { get; private set; }
+
+ public ITransportCustomization TransportCustomization { get; private set; }
+
+ public Task Configure()
+ {
+ TransportCustomization = new PostgreSqlTransportCustomization();
+ ConnectionString = Environment.GetEnvironmentVariable(ConnectionStringKey);
+
+ if (string.IsNullOrEmpty(ConnectionString))
+ {
+ throw new Exception($"Environment variable {ConnectionStringKey} is required for PostgreSQL transport tests to run");
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public Task Cleanup() => Task.CompletedTask;
+
+ const string ConnectionStringKey = "ServiceControl_TransportTests_PostgreSQL_ConnectionString";
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/.editorconfig b/src/ServiceControl.Transports.PostgreSql/.editorconfig
new file mode 100644
index 0000000000..ff993b49bb
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/.editorconfig
@@ -0,0 +1,4 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
diff --git a/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs b/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs
new file mode 100644
index 0000000000..07df2285ae
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs
@@ -0,0 +1,36 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+using System.Data.Common;
+
+public static class ConnectionStringExtensions
+{
+ public static string RemoveCustomConnectionStringParts(this string connectionString, out string schema, out string subscriptionTable) =>
+ connectionString
+ .RemoveCustomConnectionStringPart(SubscriptionsTableName, out subscriptionTable)
+ .RemoveCustomConnectionStringPart(QueueSchemaName, out schema);
+
+ static string RemoveCustomConnectionStringPart(this string connectionString, string partName, out string partValue)
+ {
+ var builder = new DbConnectionStringBuilder
+ {
+ ConnectionString = connectionString
+ };
+
+ if (builder.TryGetValue(partName, out var customPartValue))
+ {
+ builder.Remove(partName);
+ }
+
+ partValue = (string)customPartValue;
+
+ if (partValue != null && connectionString.Contains(PostgreSqlNameHelper.Quote(partValue)))
+ {
+ partValue = PostgreSqlNameHelper.Quote(partValue);
+ }
+
+ return builder.ConnectionString;
+ }
+
+ const string QueueSchemaName = "Queue Schema";
+ const string SubscriptionsTableName = "Subscriptions Table";
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs
new file mode 100644
index 0000000000..556aba5a43
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs
@@ -0,0 +1,142 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Npgsql;
+
+public class DatabaseDetails
+{
+ readonly string connectionString;
+
+ public string DatabaseName { get; }
+
+ public DatabaseDetails(string connectionString)
+ {
+ try
+ {
+ var builder = new NpgsqlConnectionStringBuilder
+ {
+ ConnectionString = connectionString
+ };
+ DatabaseName = builder.Database;
+ this.connectionString = builder.ToString();
+ }
+ catch (Exception ex) when (ex is FormatException or ArgumentException)
+ {
+ throw new Exception("PostgreSQL Connection String could not be parsed.", ex);
+ }
+ }
+
+ public async Task TestConnection(CancellationToken cancellationToken)
+ {
+ try
+ {
+ return await GetPostgreSqlVersion(cancellationToken);
+ }
+ catch (NpgsqlException ex) when (IsConnectionOrLoginIssue(ex))
+ {
+ throw new Exception($"Could not connect to '{DatabaseName}' PostgreSQL database.", ex);
+ }
+ }
+
+ static bool IsConnectionOrLoginIssue(NpgsqlException x)
+ {
+ // Reference is here: https://www.postgresql.org/docs/current/errcodes-appendix.html
+
+ return x.SqlState switch
+ {
+ //28000 invalid_authorization_specification
+ //28P01 invalid_password
+ "28000" or "28P01" => true,
+
+ //08000 connection_exception
+ //08003 connection_does_not_exist
+ //08006 connection_failure
+ //08001 sqlclient_unable_to_establish_sqlconnection
+ //08004 sqlserver_rejected_establishment_of_sqlconnection
+ //08007 transaction_resolution_unknown
+ //08P01 protocol_violation
+ "08000" or "08003" or "08006" or "08001" or "08004" or "08007" or "08P01" => true,
+
+ // Everything else
+ _ => false
+ };
+ }
+
+ async Task GetPostgreSqlVersion(CancellationToken cancellationToken)
+ {
+ await using var conn = await OpenConnectionAsync(cancellationToken);
+ await using var cmd = conn.CreateCommand();
+ cmd.CommandText = "SELECT version()";
+
+ return (string)await cmd.ExecuteScalarAsync(cancellationToken);
+ }
+
+ public async Task> GetTables(CancellationToken cancellationToken = default)
+ {
+ List tables = [];
+
+ await using var conn = await OpenConnectionAsync(cancellationToken);
+ await using var cmd = conn.CreateCommand();
+ cmd.CommandText = GetQueueListCommandText;
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
+ while (await reader.ReadAsync(cancellationToken))
+ {
+ var schema = reader.GetString(0);
+ var name = reader.GetString(1);
+ tables.Add(new BrokerQueueTable(this, new QueueAddress(name, schema)));
+ }
+
+ return tables;
+ }
+
+ public async Task GetSnapshot(BrokerQueueTable brokerQueueTable,
+ CancellationToken cancellationToken = default)
+ {
+ var table = new BrokerQueueTableSnapshot(brokerQueueTable);
+
+ await using var conn = await OpenConnectionAsync(cancellationToken);
+ await using var cmd = conn.CreateCommand();
+ cmd.CommandText = $"select last_value from \"{table.SequenceName}\";";
+ var value = await cmd.ExecuteScalarAsync(cancellationToken);
+
+ if (value is long longValue)
+ {
+ table.RowVersion = longValue;
+ }
+
+ return table;
+ }
+
+ async Task OpenConnectionAsync(CancellationToken cancellationToken = default)
+ {
+ var conn = new NpgsqlConnection(connectionString);
+ await conn.OpenAsync(cancellationToken);
+ return conn;
+ }
+
+
+ ///
+ /// Query works by finidng all the columns in any table that *could* be from an NServiceBus
+ /// queue table, grouping by schema+name, and then using the HAVING COUNT(*) = 5 clause
+ /// to ensure that all 5 columns are represented. Delay tables, for example, will match
+ /// on 3 of the columns (Headers, Body, RowVersion) and many user tables might have an
+ /// Id column, but the HAVING clause filters these out.
+ /// ///
+ const string GetQueueListCommandText = @"
+SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
+
+SELECT C.TABLE_SCHEMA as TableSchema, C.TABLE_NAME as TableName
+FROM information_schema.columns C
+WHERE
+ (C.COLUMN_NAME = 'id' AND C.DATA_TYPE = 'uuid') OR
+ (C.COLUMN_NAME = 'expires' AND C.DATA_TYPE = 'timestamp without time zone') OR
+ (C.COLUMN_NAME = 'headers' AND C.DATA_TYPE = 'text') OR
+ (C.COLUMN_NAME = 'body' AND C.DATA_TYPE = 'bytea') OR
+ (C.COLUMN_NAME = 'seq' AND C.DATA_TYPE = 'integer')
+GROUP BY C.TABLE_SCHEMA, C.TABLE_NAME
+HAVING COUNT(*) = 5
+";
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs
new file mode 100644
index 0000000000..3d1ca0023a
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs
@@ -0,0 +1,37 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+// NOTE: Copied from the SQL Transport
+
+public static class PostgreSqlNameHelper
+{
+ const string Delimiter = "\"";
+ static readonly string EscapedDelimiter = Delimiter + Delimiter;
+
+ public static string Quote(string unquotedName)
+ {
+ if (unquotedName == null)
+ {
+ return null;
+ }
+ //Quotes are escaped by using double quotes
+ return Delimiter + unquotedName.Replace(Delimiter, EscapedDelimiter) + Delimiter;
+ }
+
+ public static string Unquote(string quotedString)
+ {
+ if (quotedString == null)
+ {
+ return null;
+ }
+
+ if (!quotedString.StartsWith(Delimiter) || !quotedString.EndsWith(Delimiter))
+ {
+ //Already unquoted
+ return quotedString;
+ }
+
+ return quotedString
+ .Substring(Delimiter.Length, quotedString.Length - (2 * Delimiter.Length))
+ .Replace(EscapedDelimiter, Delimiter);
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs
new file mode 100644
index 0000000000..12ed41546b
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs
@@ -0,0 +1,116 @@
+#nullable enable
+namespace ServiceControl.Transports.PostgreSql;
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using BrokerThroughput;
+using Microsoft.Extensions.Logging;
+
+public class PostgreSqlQuery(
+ ILogger logger,
+ TimeProvider timeProvider,
+ TransportSettings transportSettings) : BrokerThroughputQuery(logger, "PostgreSql")
+{
+ readonly List databases = [];
+
+ protected override void InitializeCore(ReadOnlyDictionary settings)
+ {
+ if (!settings.TryGetValue(PostgreSqlSettings.ConnectionString, out string? connectionString))
+ {
+ logger.LogInformation("Using ConnectionString used by instance");
+
+ connectionString = transportSettings.ConnectionString.RemoveCustomConnectionStringParts(out string _, out string _);
+
+ Diagnostics.AppendLine("ConnectionString not set, defaulted to using ConnectionString used by instance");
+ }
+ else
+ {
+ Diagnostics.AppendLine("ConnectionString set");
+ }
+
+ databases.Add(new DatabaseDetails(connectionString));
+ }
+
+ public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue,
+ DateOnly startDate,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ var queueTableName = (BrokerQueueTable)brokerQueue;
+ var startData =
+ await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken);
+
+ // looping for 24 hours
+ for (var i = 0; i < 24; i++)
+ {
+ await Task.Delay(TimeSpan.FromHours(1), timeProvider, cancellationToken);
+ var endData =
+ await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken);
+
+ yield return new QueueThroughput
+ {
+ DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
+ TotalThroughput = endData.RowVersion - startData.RowVersion
+ };
+
+ startData = endData;
+ }
+ }
+
+ public override async IAsyncEnumerable GetQueueNames(
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ var tables = new List();
+
+ foreach (var db in databases)
+ {
+ var version = await db.TestConnection(cancellationToken);
+ Data["SqlVersion"] = version;
+ tables.AddRange(await db.GetTables(cancellationToken));
+ }
+
+ ScopeType = "Catalog & Schema";
+
+ foreach (var tableName in tables)
+ {
+ yield return tableName;
+ }
+ }
+
+ public override KeyDescriptionPair[] Settings =>
+ [
+ new KeyDescriptionPair(PostgreSqlSettings.ConnectionString, PostgreSqlSettings.ConnectionStringDescription)
+ ];
+
+ protected override async Task<(bool Success, List Errors)> TestConnectionCore(
+ CancellationToken cancellationToken)
+ {
+ List errors = [];
+
+ foreach (DatabaseDetails db in databases)
+ {
+ try
+ {
+ await db.TestConnection(cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Test connection failed");
+ errors.Add(ex.Message);
+ }
+ }
+
+ return (errors.Count == 0, errors);
+ }
+
+ public static class PostgreSqlSettings
+ {
+ public static readonly string ConnectionString = "PostgreSQL/ConnectionString";
+
+ public static readonly string ConnectionStringDescription =
+ "Database connection string that will provide at least read access to all queue tables.";
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs
new file mode 100644
index 0000000000..0e7ed6c5f0
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs
@@ -0,0 +1,51 @@
+#nullable enable
+namespace ServiceControl.Transports.PostgreSql;
+
+class PostgreSqlTable
+{
+ public PostgreSqlTable(string name, string schema)
+ {
+ //HINT: The query approximates queue length value based on max and min of the table sequence.
+ fullTableName = $"\"{schema}\".\"{name}\"";
+ //NOTE: Postgres doesn't have NOLOCK since it utilises snapshot isolation by default
+ LengthQuery = $$"""
+ SELECT CASE WHEN (EXISTS (SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{{schema}}' AND TABLE_NAME = '{{name}}')) THEN
+ COALESCE(cast(max(seq) - min(seq) + 1 AS int), 0)
+ ELSE
+ -1
+ END AS Id FROM {{fullTableName}};
+ """;
+ }
+
+ readonly string fullTableName;
+ public string LengthQuery { get; }
+
+ public override string ToString() =>
+ fullTableName;
+
+ bool Equals(PostgreSqlTable other) =>
+ string.Equals(fullTableName, other.fullTableName);
+
+ public override bool Equals(object? obj)
+ {
+ if (obj is null)
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ if (obj.GetType() != GetType())
+ {
+ return false;
+ }
+
+ return Equals((PostgreSqlTable)obj);
+ }
+
+ public override int GetHashCode() =>
+ fullTableName.GetHashCode();
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs
new file mode 100644
index 0000000000..9322f67f33
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs
@@ -0,0 +1,86 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+using System.Linq;
+using System.Runtime.CompilerServices;
+using BrokerThroughput;
+using Microsoft.Extensions.DependencyInjection;
+using NServiceBus;
+using NServiceBus.Logging;
+using NServiceBus.Transport.PostgreSql;
+
+public class PostgreSqlTransportCustomization : TransportCustomization
+{
+ protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
+
+ protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
+
+ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
+
+ protected override void AddTransportForPrimaryCore(IServiceCollection services,
+ TransportSettings transportSettings) =>
+ services.AddSingleton();
+
+ protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
+ {
+ services.AddSingleton();
+ services.AddHostedService(provider => provider.GetRequiredService());
+ }
+
+ protected override PostgreSqlTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
+ {
+ var connectionString = transportSettings.ConnectionString.RemoveCustomConnectionStringParts(out var customSchema, out var subscriptionsTableSetting);
+
+ var transport = new PostgreSqlTransport(connectionString);
+
+ var subscriptions = transport.Subscriptions;
+
+ if (customSchema != null)
+ {
+ transport.DefaultSchema = customSchema;
+ subscriptions.SubscriptionTableName = new SubscriptionTableName(DefaultSubscriptionTableName, customSchema);
+ }
+
+ if (subscriptionsTableSetting != null)
+ {
+ var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting);
+
+ subscriptions.SubscriptionTableName =
+ new SubscriptionTableName(subscriptionsAddress.Table,
+ subscriptionsAddress.Schema ?? customSchema);
+ }
+
+ if (transportSettings.GetOrDefault("TransportSettings.EnableDtc"))
+ {
+ Logger.Error("The EnableDtc setting is no longer supported natively within ServiceControl. If you require distributed transactions, you will have to use a Transport Adapter (https://docs.particular.net/servicecontrol/transport-adapter/)");
+ }
+
+ DisableDelayedDelivery(transport) = true;
+
+ transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
+
+ return transport;
+ }
+
+ protected override string ToTransportQualifiedQueueNameCore(string queueName)
+ {
+ const string delimiter = "\"";
+ const string escapedDelimiter = delimiter + delimiter;
+
+ if (queueName.StartsWith(delimiter) || queueName.EndsWith(delimiter))
+ {
+ return queueName;
+ }
+
+ return delimiter + queueName.Replace(delimiter, escapedDelimiter) + delimiter;
+ }
+
+ [UnsafeAccessor(UnsafeAccessorKind.Field, Name = "k__BackingField")]
+ static extern ref bool DisableDelayedDelivery(PostgreSqlTransport transport);
+
+ const string DefaultSubscriptionTableName = "SubscriptionRouting";
+
+ static readonly ILog Logger = LogManager.GetLogger(typeof(PostgreSqlTransportCustomization));
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs b/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs
new file mode 100644
index 0000000000..03060d8d17
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs
@@ -0,0 +1,50 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+using System;
+
+// NOTE: Copied from the SQL Transport
+public class QueueAddress
+{
+ public QueueAddress(string table, string schemaName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(table);
+ Table = SafeUnquote(table);
+ Schema = SafeUnquote(schemaName);
+ QualifiedTableName = $"{PostgreSqlNameHelper.Quote(Schema)}.{PostgreSqlNameHelper.Quote(Table)}";
+ }
+
+ public string Table { get; }
+ public string Schema { get; }
+ public string QualifiedTableName { get; }
+
+ public static QueueAddress Parse(string address)
+ {
+ var index = 0;
+ var quoteCount = 0;
+ while (index < address.Length)
+ {
+ if (address[index] == '"')
+ {
+ quoteCount++;
+ }
+ else if (address[index] == '.' && quoteCount % 2 == 0)
+ {
+ var schema = address.Substring(0, index);
+ var table = address.Substring(index + 1);
+
+ return new QueueAddress(table, schema);
+ }
+ index++;
+ }
+
+ return new QueueAddress(address, null);
+ }
+
+ static string SafeUnquote(string name)
+ {
+ var result = PostgreSqlNameHelper.Unquote(name);
+ return string.IsNullOrWhiteSpace(result)
+ ? null
+ : result;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs
new file mode 100644
index 0000000000..28b4fdea4b
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs
@@ -0,0 +1,138 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Npgsql;
+using NServiceBus.Logging;
+
+class QueueLengthProvider : AbstractQueueLengthProvider
+{
+ public QueueLengthProvider(TransportSettings settings, Action store) : base(settings, store)
+ {
+ connectionString = ConnectionString
+ .RemoveCustomConnectionStringParts(out var customSchema, out _);
+
+ defaultSchema = customSchema ?? "public";
+ }
+ public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack)
+ {
+ var parsedAddress = QueueAddress.Parse(queueToTrack.InputQueue);
+
+ var sqlTable = new PostgreSqlTable(parsedAddress.Table, parsedAddress.Schema ?? defaultSchema);
+
+ tableNames.AddOrUpdate(queueToTrack, _ => sqlTable, (_, currentSqlTable) =>
+ {
+ if (!currentSqlTable.Equals(sqlTable))
+ {
+ tableSizes.TryRemove(currentSqlTable, out var _);
+ }
+
+ return sqlTable;
+ });
+
+ tableSizes.TryAdd(sqlTable, 0);
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(QueryDelayInterval, stoppingToken);
+
+ await QueryTableSizes(stoppingToken);
+
+ UpdateQueueLengthStore();
+ }
+ catch (OperationCanceledException)
+ {
+ // no-op
+ }
+ catch (Exception e)
+ {
+ Logger.Error("Error querying sql queue sizes.", e);
+ }
+ }
+ }
+
+ void UpdateQueueLengthStore()
+ {
+ var nowTicks = DateTime.UtcNow.Ticks;
+
+ foreach (var tableNamePair in tableNames)
+ {
+ Store(
+ [
+ new QueueLengthEntry
+ {
+ DateTicks = nowTicks,
+ Value = tableSizes.GetValueOrDefault(tableNamePair.Value, 0)
+ }
+ ],
+ tableNamePair.Key);
+ }
+ }
+
+ async Task QueryTableSizes(CancellationToken cancellationToken)
+ {
+ var chunks = tableSizes
+ .Select((i, index) => new
+ {
+ i,
+ index
+ })
+ .GroupBy(p => p.index / QueryChunkSize)
+ .Select(grp => grp.Select(g => g.i).ToArray())
+ .ToList();
+
+ await using var connection = new NpgsqlConnection(connectionString);
+ await connection.OpenAsync(cancellationToken);
+
+ foreach (var chunk in chunks)
+ {
+ await UpdateChunk(connection, chunk, cancellationToken);
+ }
+ }
+
+ async Task UpdateChunk(NpgsqlConnection connection, KeyValuePair[] chunk, CancellationToken cancellationToken)
+ {
+ var query = string.Join(Environment.NewLine, chunk.Select(c => c.Key.LengthQuery));
+
+ await using var command = new NpgsqlCommand(query, connection);
+ await using var reader = await command.ExecuteReaderAsync(cancellationToken);
+ foreach (var chunkPair in chunk)
+ {
+ await reader.ReadAsync(cancellationToken);
+
+ var queueLength = reader.GetInt32(0);
+
+ if (queueLength == -1)
+ {
+ Logger.Warn($"Table {chunkPair.Key} does not exist.");
+ }
+ else
+ {
+ tableSizes.TryUpdate(chunkPair.Key, queueLength, chunkPair.Value);
+ }
+
+ await reader.NextResultAsync(cancellationToken);
+ }
+ }
+
+ readonly ConcurrentDictionary tableNames = new ConcurrentDictionary();
+ readonly ConcurrentDictionary tableSizes = new ConcurrentDictionary();
+
+ readonly string connectionString;
+ readonly string defaultSchema;
+
+ static readonly ILog Logger = LogManager.GetLogger();
+
+ static readonly TimeSpan QueryDelayInterval = TimeSpan.FromSeconds(1);
+
+ const int QueryChunkSize = 10;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs b/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs
new file mode 100644
index 0000000000..976d4ec29c
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs
@@ -0,0 +1,16 @@
+#nullable enable
+namespace ServiceControl.Transports.PostgreSql;
+
+using System.Collections.Generic;
+using ServiceControl.Transports.BrokerThroughput;
+public class BrokerQueueTable(DatabaseDetails databaseDetails, QueueAddress queueAddress)
+ : IBrokerQueue
+{
+ public DatabaseDetails DatabaseDetails { get; } = databaseDetails;
+ public QueueAddress QueueAddress { get; } = queueAddress;
+ public string SequenceName => $"{QueueAddress.Table}_seq_seq";
+ public string QueueName => QueueAddress.QualifiedTableName;
+ public string SanitizedName => QueueAddress.Table;
+ public string? Scope => $"[{DatabaseDetails.DatabaseName}].[{QueueAddress.Schema}]";
+ public List EndpointIndicators { get; } = [];
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs
new file mode 100644
index 0000000000..ba4fc7180b
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Transports.PostgreSql;
+
+public class BrokerQueueTableSnapshot(BrokerQueueTable details) : BrokerQueueTable(details.DatabaseDetails, details.QueueAddress)
+{
+ public long RowVersion { get; set; }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj b/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj
new file mode 100644
index 0000000000..1fa0bde372
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj
@@ -0,0 +1,25 @@
+
+
+
+ net8.0
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ServiceControl.Transports.PostgreSql/transport.manifest b/src/ServiceControl.Transports.PostgreSql/transport.manifest
new file mode 100644
index 0000000000..ac3130f8de
--- /dev/null
+++ b/src/ServiceControl.Transports.PostgreSql/transport.manifest
@@ -0,0 +1,16 @@
+{
+ "Definitions": [
+ {
+ "Name": "PostgreSQL",
+ "DisplayName": "PostgreSQL",
+ "AssemblyName": "ServiceControl.Transports.PostgreSql",
+ "TypeName": "ServiceControl.Transports.PostgreSql.PostgreSqlTransportCustomization, ServiceControl.Transports.PostgreSql",
+ "DefaultPrimaryMaximumConcurrencyLevel": 10,
+ "DefaultAuditMaximumConcurrencyLevel": 10,
+ "DefaultMonitoringMaximumConcurrencyLevel": 10,
+ "SampleConnectionString": "Server=;Database=nservicebus;Port=5432;User Id=;Password=;Queue Schema=myschema;Subscriptions Table=schema.tablename",
+ "AvailableInSCMU": true,
+ "Help": "Specify optional 'Queue Schema' to override the default schema. Specify optional 'Subscriptions Table' to override the default subscriptions table location."
+ }
+ ]
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt b/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt
index 0288ca6324..242979cbf3 100644
--- a/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt
+++ b/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt
@@ -1,6 +1,5 @@
Connection test to SqlTransport failed:
-Cannot open database "not_here" requested by the login. The login failed.
-Login failed for user.
+Could not connect to 'not_here' SQL Server database.
Connection attempted with the following settings:
ConnectionString set
diff --git a/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs b/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs
index 2c0bddfd9b..ab8b40bce3 100644
--- a/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs
+++ b/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs
@@ -69,7 +69,7 @@ public async Task TestConnectionWithInvalidCatalogSettings()
await query.TestConnection(cancellationTokenSource.Token);
Assert.That(success, Is.False);
- Assert.That(errors.Single(), Does.StartWith("Cannot open database \"not_here\""));
+ Assert.That(errors.Single(), Does.StartWith("Could not connect to 'not_here'"));
Approver.Verify(diagnostics,
s => Regex.Replace(s, "^Login failed for user .*$", "Login failed for user.", RegexOptions.Multiline));
}
diff --git a/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs b/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs
index 0e0b2573e3..54bb7151af 100644
--- a/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs
+++ b/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs
@@ -30,11 +30,11 @@ public DatabaseDetails(string connectionString)
}
}
- public Task TestConnection(CancellationToken cancellationToken)
+ public async Task TestConnection(CancellationToken cancellationToken)
{
try
{
- return GetSqlVersion(cancellationToken);
+ return await GetSqlVersion(cancellationToken);
}
catch (SqlException ex) when (IsConnectionOrLoginIssue(ex))
{
diff --git a/src/ServiceControl.Transports.SqlServer/NameHelper.cs b/src/ServiceControl.Transports.SqlServer/NameHelper.cs
index cdc332a660..b21f097f67 100644
--- a/src/ServiceControl.Transports.SqlServer/NameHelper.cs
+++ b/src/ServiceControl.Transports.SqlServer/NameHelper.cs
@@ -5,27 +5,22 @@ class NameHelper
const string prefix = "[";
const string suffix = "]";
- public static string Quote(string unquotedName)
+ public static string Quote(string name)
{
- if (unquotedName == null)
+ if (name.StartsWith(prefix) && name.EndsWith(suffix))
{
- return null;
+ return name;
}
- return prefix + unquotedName.Replace(suffix, suffix + suffix) + suffix;
+
+ return prefix + name.Replace(suffix, suffix + suffix) + suffix;
}
public static string Unquote(string quotedString)
{
- if (quotedString == null)
- {
- return null;
- }
-
if (!quotedString.StartsWith(prefix) || !quotedString.EndsWith(suffix))
{
return quotedString;
}
-
return quotedString
.Substring(prefix.Length, quotedString.Length - prefix.Length - suffix.Length).Replace(suffix + suffix, suffix);
}
diff --git a/src/ServiceControl.Transports.SqlServer/QueueAddress.cs b/src/ServiceControl.Transports.SqlServer/QueueAddress.cs
index 69d81a9a93..98501755be 100644
--- a/src/ServiceControl.Transports.SqlServer/QueueAddress.cs
+++ b/src/ServiceControl.Transports.SqlServer/QueueAddress.cs
@@ -101,12 +101,12 @@ static string ExtractNextPart(string address, out string part)
static string Quote(string name)
{
- return NameHelper.Quote(name);
+ return SqlServerNameHelper.Quote(name);
}
static string SafeUnquote(string name)
{
- var result = NameHelper.Unquote(name);
+ var result = SqlServerNameHelper.Unquote(name);
return string.IsNullOrWhiteSpace(result)
? null
: result;
diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs
index bfcd15062e..fe06db3da6 100644
--- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs
+++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs
@@ -130,7 +130,7 @@ async Task UpdateChunk(SqlConnection connection, KeyValuePair[] c
static readonly ILog Logger = LogManager.GetLogger();
- static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
+ static readonly TimeSpan QueryDelayInterval = TimeSpan.FromSeconds(1);
const int QueryChunkSize = 10;
}
diff --git a/src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs b/src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs
similarity index 54%
rename from src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs
rename to src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs
index 9552a7b68c..24d3869041 100644
--- a/src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs
+++ b/src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs
@@ -1,27 +1,32 @@
-#nullable enable
-namespace ServiceControl.Transports.SqlServer
+namespace ServiceControl.Transports.SqlServer
{
- static class SqlNameHelper
+ // NOTE: Copied from the SQL Transport
+ static class SqlServerNameHelper
{
const string prefix = "[";
const string suffix = "]";
- public static string Quote(string name)
+ public static string Quote(string unquotedName)
{
- if (name.StartsWith(prefix) && name.EndsWith(suffix))
+ if (unquotedName == null)
{
- return name;
+ return null;
}
-
- return prefix + name.Replace(suffix, suffix + suffix) + suffix;
+ return prefix + unquotedName.Replace(suffix, suffix + suffix) + suffix;
}
public static string Unquote(string quotedString)
{
+ if (quotedString == null)
+ {
+ return null;
+ }
+
if (!quotedString.StartsWith(prefix) || !quotedString.EndsWith(suffix))
{
return quotedString;
}
+
return quotedString
.Substring(prefix.Length, quotedString.Length - prefix.Length - suffix.Length).Replace(suffix + suffix, suffix);
}
diff --git a/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs b/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs
index 86b66146b0..e84e58b8c4 100644
--- a/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs
+++ b/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs
@@ -92,9 +92,6 @@ public override async IAsyncEnumerable GetQueueNames(
tables.AddRange(await db.GetTables(cancellationToken));
}
- var catalogCount = tables.Select(t => t.DatabaseDetails.DatabaseName).Distinct().Count();
- var schemaCount = tables.Select(t => $"{t.DatabaseDetails.DatabaseName}/{t.Schema}").Distinct().Count();
-
ScopeType = "Catalog & Schema";
foreach (var tableName in tables)
diff --git a/src/ServiceControl.Transports.SqlServer/SqlTable.cs b/src/ServiceControl.Transports.SqlServer/SqlTable.cs
index 5eeae69701..0e629b1762 100644
--- a/src/ServiceControl.Transports.SqlServer/SqlTable.cs
+++ b/src/ServiceControl.Transports.SqlServer/SqlTable.cs
@@ -6,10 +6,10 @@ class SqlTable
{
SqlTable(string name, string schema, string? catalog)
{
- var unquotedSchema = SqlNameHelper.Unquote(schema);
- var unquotedName = SqlNameHelper.Unquote(name);
- var quotedName = SqlNameHelper.Quote(name);
- var quotedSchema = SqlNameHelper.Quote(schema);
+ var unquotedSchema = NameHelper.Unquote(schema);
+ var unquotedName = NameHelper.Unquote(name);
+ var quotedName = NameHelper.Quote(name);
+ var quotedSchema = NameHelper.Quote(schema);
//HINT: The query approximates queue length value based on max and min
// of RowVersion IDENTITY(1,1) column. There are couple of scenarios
// that might lead to the approximation being off. More details here:
@@ -29,7 +29,7 @@ SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {_
}
else
{
- var quotedCatalog = SqlNameHelper.Quote(catalog);
+ var quotedCatalog = NameHelper.Quote(catalog);
_fullTableName = $"{quotedCatalog}.{quotedSchema}.{quotedName}";
LengthQuery = $"""
diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt
index 9856c18f66..d7cc93804f 100644
--- a/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt
+++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt
@@ -32,6 +32,7 @@ namespace ServiceControl.Transports
void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, ServiceControl.Transports.TransportSettings transportSettings);
void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, ServiceControl.Transports.TransportSettings transportSettings);
System.Threading.Tasks.Task ProvisionQueues(ServiceControl.Transports.TransportSettings transportSettings, System.Collections.Generic.IEnumerable additionalQueues);
+ string ToTransportQualifiedQueueName(string queueName);
}
public class QueueLengthEntry
{
@@ -59,6 +60,8 @@ namespace ServiceControl.Transports
protected abstract void CustomizeTransportForMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TTransport transportDefinition, ServiceControl.Transports.TransportSettings transportSettings);
protected abstract void CustomizeTransportForPrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TTransport transportDefinition, ServiceControl.Transports.TransportSettings transportSettings);
public virtual System.Threading.Tasks.Task ProvisionQueues(ServiceControl.Transports.TransportSettings transportSettings, System.Collections.Generic.IEnumerable additionalQueues) { }
+ public string ToTransportQualifiedQueueName(string queueName) { }
+ protected virtual string ToTransportQualifiedQueueNameCore(string queueName) { }
}
public static class TransportFactory
{
@@ -75,6 +78,9 @@ namespace ServiceControl.Transports
public TransportManifestDefinition() { }
public string[] Aliases { get; set; }
public string AssemblyName { get; set; }
+ public int? DefaultAuditMaximumConcurrencyLevel { get; set; }
+ public int? DefaultMonitoringMaximumConcurrencyLevel { get; set; }
+ public int? DefaultPrimaryMaximumConcurrencyLevel { get; set; }
public string DisplayName { get; set; }
public string Location { get; set; }
public string Name { get; set; }
diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
index 6318beb0ad..6d201c38cc 100644
--- a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
+++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
@@ -5,6 +5,7 @@
"LearningTransport",
"MSMQ",
"NetStandardAzureServiceBus",
+ "PostgreSQL",
"RabbitMQ.ClassicConventionalRouting",
"RabbitMQ.ClassicDirectRouting",
"RabbitMQ.ConventionalRouting",
diff --git a/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs b/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs
index 35860cffe9..dc45c3ffb0 100644
--- a/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs
+++ b/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs
@@ -41,7 +41,7 @@ await StartQueueIngestor(
for (int i = 0; i < numMessagesToIngest; i++)
{
- await Dispatcher.SendTestMessage(queueName, $"message{i}");
+ await Dispatcher.SendTestMessage(queueName, $"message{i}", configuration.TransportCustomization);
}
var allMessagesProcessed = await onMessagesProcessed.Task;
@@ -67,7 +67,7 @@ await StartQueueIngestor(
return Task.FromResult(ErrorHandleResult.Handled);
});
- await Dispatcher.SendTestMessage(queueName, $"some failing message");
+ await Dispatcher.SendTestMessage(queueName, $"some failing message", configuration.TransportCustomization);
var onErrorWasCalled = await onErrorCalled.Task;
diff --git a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
index 9a6aeaa7c9..5504962847 100644
--- a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
+++ b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
@@ -24,7 +24,7 @@ public async Task Should_report_queue_length()
}
});
- await Dispatcher.SendTestMessage(queueName, "some content");
+ await Dispatcher.SendTestMessage(queueName, "some content", configuration.TransportCustomization);
var queueLengthEntry = await onQueueLengthEntryReceived.Task;
diff --git a/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs b/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs
index 98e5942431..518e5242ee 100644
--- a/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs
+++ b/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs
@@ -16,10 +16,10 @@ public async Task Should_provision_queues()
await ProvisionQueues(queueName, errorQueue, [additionalQueue1, additionalQueue2]);
- Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(queueName, "some content"));
- Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(errorQueue, "some content"));
- Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue1, "some content"));
- Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue2, "some content"));
+ Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(queueName, "some content", configuration.TransportCustomization));
+ Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(errorQueue, "some content", configuration.TransportCustomization));
+ Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue1, "some content", configuration.TransportCustomization));
+ Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue2, "some content", configuration.TransportCustomization));
}
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs b/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs
index 785b87551b..23608ffead 100644
--- a/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs
+++ b/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs
@@ -6,15 +6,16 @@
using System.Threading.Tasks;
using NServiceBus.Routing;
using NServiceBus.Transport;
+ using ServiceControl.Transports;
static class TestDispatcherExtensions
{
- public static Task SendTestMessage(this IMessageDispatcher dispatcher, string queue, string content)
+ public static Task SendTestMessage(this IMessageDispatcher dispatcher, string queue, string content, ITransportCustomization transportCustomization)
{
var transportOperation = new TransportOperation(
new OutgoingMessage(Guid.NewGuid().ToString(), [],
Encoding.UTF8.GetBytes(content)),
- new UnicastAddressTag(queue), []);
+ new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(queue)), []);
return dispatcher.Dispatch(new TransportOperations(transportOperation), new TransportTransaction(), CancellationToken.None);
}
diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
index 87caa7be55..ced3129705 100644
--- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
+++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
@@ -194,7 +194,7 @@ await StartQueueIngestor(
for (int i = 0; i < numMessagesToIngest; i++)
{
- await Dispatcher.SendTestMessage(queueName, $"message{i}");
+ await Dispatcher.SendTestMessage(queueName, $"message{i}", configuration.TransportCustomization);
}
bool allMessagesProcessed = await onMessagesProcessed.Task;
diff --git a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
index cf6d111c10..f58bcff9bd 100644
--- a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
+++ b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
@@ -24,6 +24,7 @@ static DevelopmentTransportLocations()
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.RabbitMQ"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.SqlServer"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.SQS"));
+ ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.PostgreSql"));
}
}
diff --git a/src/ServiceControl.Transports/TransportCustomization.cs b/src/ServiceControl.Transports/TransportCustomization.cs
index c6b6cd717a..025189a09e 100644
--- a/src/ServiceControl.Transports/TransportCustomization.cs
+++ b/src/ServiceControl.Transports/TransportCustomization.cs
@@ -23,6 +23,7 @@ public interface ITransportCustomization
void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings);
Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues);
+ string ToTransportQualifiedQueueName(string queueName);
Task CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func onCriticalError = null, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly);
}
@@ -109,6 +110,10 @@ protected void ConfigureDefaultEndpointSettings(EndpointConfiguration endpointCo
endpointConfiguration.SendFailedMessagesTo(transportSettings.ErrorQueue);
}
+ public string ToTransportQualifiedQueueName(string queueName) => ToTransportQualifiedQueueNameCore(queueName);
+
+ protected virtual string ToTransportQualifiedQueueNameCore(string queueName) => queueName;
+
public virtual async Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues)
{
var transport = CreateTransport(transportSettings);
@@ -129,7 +134,7 @@ public virtual async Task ProvisionQueues(TransportSettings transportSettings, I
false,
transportSettings.ErrorQueue)};
- var transportInfrastructure = await transport.Initialize(hostSettings, receivers, additionalQueues.Union(new[] { transportSettings.ErrorQueue }).ToArray());
+ var transportInfrastructure = await transport.Initialize(hostSettings, receivers, additionalQueues.Union([transportSettings.ErrorQueue]).Select(ToTransportQualifiedQueueNameCore).ToArray());
await transportInfrastructure.Shutdown();
}
@@ -160,7 +165,7 @@ public async Task CreateTransportInfrastructure(string
receivers = [];
}
- var transportInfrastructure = await transport.Initialize(hostSettings, receivers, new[] { transportSettings.ErrorQueue });
+ var transportInfrastructure = await transport.Initialize(hostSettings, receivers, new[] { ToTransportQualifiedQueueNameCore(transportSettings.ErrorQueue) });
if (createReceiver)
{
diff --git a/src/ServiceControl.Transports/TransportManifest.cs b/src/ServiceControl.Transports/TransportManifest.cs
index c37538867a..78d6324ae2 100644
--- a/src/ServiceControl.Transports/TransportManifest.cs
+++ b/src/ServiceControl.Transports/TransportManifest.cs
@@ -28,6 +28,10 @@ public class TransportManifestDefinition
public string[] Aliases { get; set; } = [];
+ public int? DefaultPrimaryMaximumConcurrencyLevel { get; set; }
+ public int? DefaultAuditMaximumConcurrencyLevel { get; set; }
+ public int? DefaultMonitoringMaximumConcurrencyLevel { get; set; }
+
internal bool IsMatch(string transportType) =>
string.Equals(TypeName, transportType, StringComparison.Ordinal) // Type names are case-sensitive
|| string.Equals(Name, transportType, StringComparison.OrdinalIgnoreCase)
diff --git a/src/ServiceControl.sln b/src/ServiceControl.sln
index 43223c31a3..04eb5829f0 100644
--- a/src/ServiceControl.sln
+++ b/src/ServiceControl.sln
@@ -157,13 +157,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.M
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.Persistence", "Particular.LicensingComponent.Persistence\Particular.LicensingComponent.Persistence.csproj", "{0EBBFE57-5760-43FC-A646-FE4C49F4BF59}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.RavenDB", "ServiceControl.RavenDB\ServiceControl.RavenDB.csproj", "{D8610AB6-EB24-4617-8089-F11732D3E033}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.RavenDB", "ServiceControl.RavenDB\ServiceControl.RavenDB.csproj", "{D8610AB6-EB24-4617-8089-F11732D3E033}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Particular.LicensingComponent", "Particular.LicensingComponent", "{2D732554-9C3D-4509-8D0B-5595266A4D92}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{C22BFC89-851E-4D42-A878-C13B3F206908}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Api", "ServiceControl.Api\ServiceControl.Api.csproj", "{9682F7C5-C631-4924-A698-4C355BB14540}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Api", "ServiceControl.Api\ServiceControl.Api.csproj", "{9682F7C5-C631-4924-A698-4C355BB14540}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.Persistence.InMemory", "Particular.LicensingComponent.Persistence.InMemory\Particular.LicensingComponent.Persistence.InMemory.csproj", "{9D134CF7-F1A3-4935-BCBD-458406F8AA97}"
EndProject
@@ -173,12 +173,16 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Persistence.
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Testing", "Testing", "{80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Particular.LicensingComponent", "Particular.LicensingComponent\Particular.LicensingComponent.csproj", "{E9407280-401E-4D23-ADF5-D146C5ACBE4D}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent", "Particular.LicensingComponent\Particular.LicensingComponent.csproj", "{E9407280-401E-4D23-ADF5-D146C5ACBE4D}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Particular.LicensingComponent.UnitTests", "Particular.LicensingComponent.UnitTests\Particular.LicensingComponent.UnitTests.csproj", "{51F5504E-E915-40EC-B96E-CA700A57982C}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.UnitTests", "Particular.LicensingComponent.UnitTests\Particular.LicensingComponent.UnitTests.csproj", "{51F5504E-E915-40EC-B96E-CA700A57982C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HealthCheckApp", "HealthCheckApp\HealthCheckApp.csproj", "{D523E575-6975-4974-A173-B47996A73914}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.PostgreSql", "ServiceControl.Transports.PostgreSql\ServiceControl.Transports.PostgreSql.csproj", "{448CBDCF-718D-4BC7-8F7C-099C9A362B59}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.PostgreSql.Tests", "ServiceControl.Transports.PostgreSql.Tests\ServiceControl.Transports.PostgreSql.Tests.csproj", "{18DBEEF5-42EE-4C1D-A05B-87B21C067D53}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -861,6 +865,18 @@ Global
{5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x64.Build.0 = Release|Any CPU
{5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x86.ActiveCfg = Release|Any CPU
{5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x86.Build.0 = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.Build.0 = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.Build.0 = Debug|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.Build.0 = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.ActiveCfg = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.Build.0 = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.ActiveCfg = Release|Any CPU
+ {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.Build.0 = Release|Any CPU
{D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -885,18 +901,6 @@ Global
{9682F7C5-C631-4924-A698-4C355BB14540}.Release|x64.Build.0 = Release|Any CPU
{9682F7C5-C631-4924-A698-4C355BB14540}.Release|x86.ActiveCfg = Release|Any CPU
{9682F7C5-C631-4924-A698-4C355BB14540}.Release|x86.Build.0 = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.ActiveCfg = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.Build.0 = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.ActiveCfg = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.Build.0 = Debug|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.Build.0 = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.ActiveCfg = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.Build.0 = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.ActiveCfg = Release|Any CPU
- {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.Build.0 = Release|Any CPU
{9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -969,6 +973,30 @@ Global
{D523E575-6975-4974-A173-B47996A73914}.Release|x64.Build.0 = Release|Any CPU
{D523E575-6975-4974-A173-B47996A73914}.Release|x86.ActiveCfg = Release|Any CPU
{D523E575-6975-4974-A173-B47996A73914}.Release|x86.Build.0 = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x64.Build.0 = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x86.Build.0 = Debug|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|Any CPU.Build.0 = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x64.ActiveCfg = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x64.Build.0 = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x86.ActiveCfg = Release|Any CPU
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x86.Build.0 = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x64.Build.0 = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x86.Build.0 = Debug|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|Any CPU.Build.0 = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x64.ActiveCfg = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x64.Build.0 = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x86.ActiveCfg = Release|Any CPU
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -1040,16 +1068,18 @@ Global
{E067C14F-867B-4479-BC85-39F2AFAF25D0} = {E2249BAA-D9E9-4369-9C70-0E21C69A3E56}
{F04B9D2C-7E31-4697-BAE3-D3FFC5FBBDFE} = {A21A1A89-0B07-4E87-8E3C-41D9C280DCB8}
{5F8E6C64-B505-4FF7-81CC-9161FBC198A8} = {E0E45F22-35E3-4AD8-B09E-EFEA5A2F18EE}
- {D8610AB6-EB24-4617-8089-F11732D3E033} = {9AF9D3C7-E859-451B-BA4D-B954D289213A}
- {9682F7C5-C631-4924-A698-4C355BB14540} = {9AF9D3C7-E859-451B-BA4D-B954D289213A}
{0EBBFE57-5760-43FC-A646-FE4C49F4BF59} = {C22BFC89-851E-4D42-A878-C13B3F206908}
+ {D8610AB6-EB24-4617-8089-F11732D3E033} = {9AF9D3C7-E859-451B-BA4D-B954D289213A}
{C22BFC89-851E-4D42-A878-C13B3F206908} = {2D732554-9C3D-4509-8D0B-5595266A4D92}
+ {9682F7C5-C631-4924-A698-4C355BB14540} = {9AF9D3C7-E859-451B-BA4D-B954D289213A}
{9D134CF7-F1A3-4935-BCBD-458406F8AA97} = {C22BFC89-851E-4D42-A878-C13B3F206908}
{6D07AC20-25B3-423F-A88E-64DED435DA06} = {2D732554-9C3D-4509-8D0B-5595266A4D92}
{71F18E74-944B-49B9-9AA9-EA2B253CAF0C} = {350F72AB-142D-4AAD-9EF1-1A83DC991D87}
{80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D} = {2D732554-9C3D-4509-8D0B-5595266A4D92}
{E9407280-401E-4D23-ADF5-D146C5ACBE4D} = {2D732554-9C3D-4509-8D0B-5595266A4D92}
{51F5504E-E915-40EC-B96E-CA700A57982C} = {80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D}
+ {448CBDCF-718D-4BC7-8F7C-099C9A362B59} = {A21A1A89-0B07-4E87-8E3C-41D9C280DCB8}
+ {18DBEEF5-42EE-4C1D-A05B-87B21C067D53} = {E0E45F22-35E3-4AD8-B09E-EFEA5A2F18EE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3B9E5B72-F580-465A-A22C-2D2148AF4EB4}
diff --git a/src/ServiceControl/App.config b/src/ServiceControl/App.config
index 430444537b..654ab4b8d9 100644
--- a/src/ServiceControl/App.config
+++ b/src/ServiceControl/App.config
@@ -20,6 +20,7 @@ These settings are only here so that we can debug ServiceControl while developin
+
@@ -47,5 +48,8 @@ These settings are only here so that we can debug ServiceControl while developin
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs
index 7dfaf90afb..6b15a86936 100644
--- a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs
+++ b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs
@@ -45,7 +45,7 @@ public static void Configure(Settings.Settings settings, ITransportCustomization
recoverability.Delayed(c => c.NumberOfRetries(0));
recoverability.AddUnrecoverableException();
- configuration.SendFailedMessagesTo(transportSettings.ErrorQueue);
+ configuration.SendFailedMessagesTo(transportCustomization.ToTransportQualifiedQueueName(transportSettings.ErrorQueue));
recoverability.CustomPolicy(SendEmailNotificationHandler.RecoverabilityPolicy);
diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs
index 399fbd344f..d6433a29f1 100644
--- a/src/ServiceControl/Infrastructure/Settings/Settings.cs
+++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs
@@ -57,7 +57,7 @@ public Settings(
}
ProcessRetryBatchesFrequency = TimeSpan.FromSeconds(30);
- MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 10);
+ MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultPrimaryMaximumConcurrencyLevel ?? 10);
RetryHistoryDepth = SettingsReader.Read(SettingsRootNamespace, "RetryHistoryDepth", 10);
AllowMessageEditing = SettingsReader.Read(SettingsRootNamespace, "AllowMessageEditing");
NotificationsFilter = SettingsReader.Read(SettingsRootNamespace, "NotificationsFilter");
diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs
index 9edfc492a7..a2f13028cf 100644
--- a/src/ServiceControl/Operations/ErrorIngestor.cs
+++ b/src/ServiceControl/Operations/ErrorIngestor.cs
@@ -14,6 +14,7 @@
using Recoverability;
using ServiceBus.Management.Infrastructure.Settings;
using ServiceControl.Persistence.UnitOfWork;
+ using ServiceControl.Transports;
public class ErrorIngestor
{
@@ -25,6 +26,7 @@ public ErrorIngestor(Metrics metrics,
IDomainEvents domainEvents,
IIngestionUnitOfWorkFactory unitOfWorkFactory,
Lazy messageDispatcher,
+ ITransportCustomization transportCustomization,
Settings settings)
{
this.unitOfWorkFactory = unitOfWorkFactory;
@@ -44,6 +46,7 @@ public ErrorIngestor(Metrics metrics,
errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter);
retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents);
+ logQueueAddress = new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(this.settings.ErrorLogQueue));
}
public async Task Ingest(List contexts)
@@ -167,7 +170,7 @@ Task Forward(IReadOnlyCollection messageContexts)
// Forwarded messages should last as long as possible
outgoingMessage.Headers.Remove(NServiceBus.Headers.TimeToBeReceived);
- transportOperations[index] = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.ErrorLogQueue));
+ transportOperations[index] = new TransportOperation(outgoingMessage, logQueueAddress);
index++;
}
@@ -186,7 +189,7 @@ public async Task VerifyCanReachForwardingAddress()
new TransportOperation(
new OutgoingMessage(Guid.Empty.ToString("N"),
[], Array.Empty()),
- new UnicastAddressTag(settings.ErrorLogQueue)
+ logQueueAddress
)
);
@@ -204,6 +207,8 @@ public async Task VerifyCanReachForwardingAddress()
readonly ErrorProcessor errorProcessor;
readonly Lazy messageDispatcher;
readonly RetryConfirmationProcessor retryConfirmationProcessor;
+ readonly UnicastAddressTag logQueueAddress;
+
static readonly ILog Logger = LogManager.GetLogger();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs
index 11ab232ef0..edd08e220d 100644
--- a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs
+++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs
@@ -16,7 +16,7 @@ class ReturnToSenderDequeuer : IHostedService
{
public ReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore dataStore, IDomainEvents domainEvents, ITransportCustomization transportCustomization, TransportSettings transportSettings, Settings settings)
{
- InputAddress = settings.StagingQueue;
+ InputAddress = transportCustomization.ToTransportQualifiedQueueName(settings.StagingQueue);
this.returnToSender = returnToSender;
errorQueue = settings.ErrorQueue;
this.transportCustomization = transportCustomization;
diff --git a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
index d172846297..bb2e392a4e 100644
--- a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
+++ b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
@@ -55,6 +55,11 @@
"ServiceControl.Transports.AzureServiceBus.AzureServiceBusTransport, ServiceControl.Transports.AzureServiceBus"
]
},
+ {
+ "Name": "PostgreSQL",
+ "DisplayName": "PostgreSQL",
+ "Aliases": []
+ },
{
"Name": "RabbitMQ.ClassicConventionalRouting",
"DisplayName": "RabbitMQ - Conventional routing topology (classic queues)",
diff --git a/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj b/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj
index 254c3ac192..79dab8b17a 100644
--- a/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj
+++ b/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj
@@ -17,6 +17,7 @@
+
diff --git a/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs b/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs
index 1136f47255..b0f91b7fe6 100644
--- a/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs
+++ b/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs
@@ -5,6 +5,7 @@
using System.Linq;
using Accounts;
using Microsoft.Data.SqlClient;
+ using Npgsql;
class ConnectionStringValidator
{
@@ -21,6 +22,10 @@ public static void Validate(IServiceControlAuditInstance instance)
{
validator.CheckMsSqlConnectionString();
}
+ else if (instance.TransportPackage.Name == "PostgreSQL")
+ {
+ validator.CheckPostgreSqlConnectString();
+ }
}
public static void Validate(IServiceControlInstance instance)
@@ -30,6 +35,10 @@ public static void Validate(IServiceControlInstance instance)
{
validator.CheckMsSqlConnectionString();
}
+ else if (instance.TransportPackage.Name == "PostgreSQL")
+ {
+ validator.CheckPostgreSqlConnectString();
+ }
}
public static void Validate(IMonitoringInstance instance)
@@ -39,6 +48,10 @@ public static void Validate(IMonitoringInstance instance)
{
validator.CheckMsSqlConnectionString();
}
+ else if (instance.TransportPackage.Name == "PostgreSQL")
+ {
+ validator.CheckPostgreSqlConnectString();
+ }
}
void CheckMsSqlConnectionString()
@@ -96,6 +109,41 @@ void CheckMsSqlConnectionString()
}
}
+ void CheckPostgreSqlConnectString()
+ {
+ string[] customKeys = { "Queue Schema", "Subscriptions Table" };
+
+ try
+ {
+ //Check validity of connection string. This will throw if invalid
+ var builder = new DbConnectionStringBuilder { ConnectionString = connectionString };
+
+ //The NSB PostgreSQL Transport can have custom key/value pairs in the connection string
+ // that won't make sense to PostgreSQL. Remove these from the string we want to validate.
+ foreach (var customKey in customKeys)
+ {
+ if (builder.ContainsKey(customKey))
+ {
+ builder.Remove(customKey);
+ }
+ }
+
+ //Attempt to connect to DB
+ using (var s = new NpgsqlConnection(builder.ConnectionString))
+ {
+ s.Open();
+ }
+ }
+ catch (ArgumentException argumentException)
+ {
+ throw new EngineValidationException($"Connection String is invalid - {argumentException.Message}");
+ }
+ catch (SqlException sqlEx)
+ {
+ throw new EngineValidationException($"PostgreSQL connection failed - {sqlEx.Message}");
+ }
+ }
+
string connectionString;
string serviceAccount;
}
diff --git a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
index 6d4b44dc99..d2111f730f 100644
--- a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
+++ b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
@@ -87,7 +87,8 @@ public void Should_package_all_transports()
"RabbitMQ",
"MSMQ",
"AmazonSQS",
- "LearningTransport"};
+ "LearningTransport",
+ "PostgreSql"};
var bundledTransports = deploymentPackage.DeploymentUnits
.Where(u => u.Category == "Transports")
diff --git a/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs b/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs
new file mode 100644
index 0000000000..4691a26412
--- /dev/null
+++ b/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs
@@ -0,0 +1,4 @@
+public class IncludeInPostgreSqlTestsAttribute : IncludeInTestsAttribute
+{
+ protected override string Filter => "PostgreSql";
+}
diff --git a/src/container-integration-test/postgres.yml b/src/container-integration-test/postgres.yml
new file mode 100644
index 0000000000..63807d29f1
--- /dev/null
+++ b/src/container-integration-test/postgres.yml
@@ -0,0 +1,28 @@
+services:
+
+ postgres:
+ image: postgres
+ restart: unless-stopped
+ ports:
+ - "5432:5432"
+ environment:
+ POSTGRES_PASSWORD: ServiceControl1!
+ healthcheck:
+ test: ["CMD-SHELL", "psql -U postgres -d postgres -c 'SELECT 1' || exit 1"]
+ interval: 10s
+ timeout: 3s
+ retries: 3
+
+ # Add service health dependencies to ServiceControl instances
+ servicecontrol:
+ depends_on:
+ postgres:
+ condition: service_healthy
+ servicecontrol-audit:
+ depends_on:
+ postgres:
+ condition: service_healthy
+ servicecontrol-monitoring:
+ depends_on:
+ postgres:
+ condition: service_healthy
\ No newline at end of file