diff --git a/src/Microsoft.Health.Fhir.Api/Features/ApiNotifications/ApiNotificationMiddleware.cs b/src/Microsoft.Health.Fhir.Api/Features/ApiNotifications/ApiNotificationMiddleware.cs index 2fd4f8efba..4bd723d7ba 100644 --- a/src/Microsoft.Health.Fhir.Api/Features/ApiNotifications/ApiNotificationMiddleware.cs +++ b/src/Microsoft.Health.Fhir.Api/Features/ApiNotifications/ApiNotificationMiddleware.cs @@ -14,6 +14,7 @@ using Microsoft.Health.Core; using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Fhir.Api.Extensions; +using Microsoft.Health.Fhir.Core.Extensions; using Microsoft.Health.Fhir.Core.Features.Context; namespace Microsoft.Health.Fhir.Api.Features.ApiNotifications @@ -79,7 +80,7 @@ protected virtual async Task PublishNotificationAsync(HttpContext context, Reque apiNotification.ResourceType = fhirRequestContext.ResourceType; apiNotification.StatusCode = (HttpStatusCode)context.Response.StatusCode; - await _mediator.Publish(apiNotification, CancellationToken.None); + await _mediator.PublishNotificationWithExceptionHandling(nameof(ApiNotificationMiddleware), apiNotification, _logger, CancellationToken.None); } } catch (Exception e) diff --git a/src/Microsoft.Health.Fhir.Api/Features/ExceptionNotifications/ExceptionNotificationMiddleware.cs b/src/Microsoft.Health.Fhir.Api/Features/ExceptionNotifications/ExceptionNotificationMiddleware.cs index 0408f0c636..3fea922323 100644 --- a/src/Microsoft.Health.Fhir.Api/Features/ExceptionNotifications/ExceptionNotificationMiddleware.cs +++ b/src/Microsoft.Health.Fhir.Api/Features/ExceptionNotifications/ExceptionNotificationMiddleware.cs @@ -72,7 +72,7 @@ public async Task Invoke(HttpContext context) exceptionNotification.IsRequestRateExceeded = exception.IsRequestRateExceeded(); exceptionNotification.BaseException = exception; - await _mediator.Publish(exceptionNotification, CancellationToken.None); + await _mediator.PublishNotificationWithExceptionHandling(nameof(ExceptionNotificationMiddleware), exceptionNotification, _logger, CancellationToken.None); } catch (Exception e) { diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs index e87579db2a..491e1f03c7 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using MediatR; +using Microsoft.Extensions.Logging; using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Features.Context; @@ -43,7 +44,7 @@ public BulkDeleteProcessingJobTests() .Returns(Task.FromResult(new SearchResult(5, new List>()))); _queueClient = Substitute.For(); _deleter = Substitute.For(); - _processingJob = new BulkDeleteProcessingJob(_deleter.CreateMockScopeFactory(), Substitute.For>(), Substitute.For(), _searchService.CreateMockScopeFactory(), _queueClient); + _processingJob = new BulkDeleteProcessingJob(_deleter.CreateMockScopeFactory(), Substitute.For>(), Substitute.For(), _searchService.CreateMockScopeFactory(), _queueClient, Substitute.For>()); } [Fact] diff --git a/src/Microsoft.Health.Fhir.Core/Extensions/FhirMediatorExtensions.cs b/src/Microsoft.Health.Fhir.Core/Extensions/FhirMediatorExtensions.cs new file mode 100644 index 0000000000..c3b678bd47 --- /dev/null +++ b/src/Microsoft.Health.Fhir.Core/Extensions/FhirMediatorExtensions.cs @@ -0,0 +1,47 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using EnsureThat; +using MediatR; +using Microsoft.Extensions.Logging; +using Microsoft.Health.Fhir.Core.Features.Operations.Export; +using Microsoft.Health.Fhir.Core.Messages.Export; +using Microsoft.Health.Fhir.Core.Models; + +namespace Microsoft.Health.Fhir.Core.Extensions +{ + public static class FhirMediatorExtensions + { + public static async Task PublishNotificationWithExceptionHandling(this IMediator mediator, string metricName, object notification, ILogger logger, CancellationToken cancellationToken) + { + try + { + await mediator.Publish(notification, cancellationToken); + } + catch (ObjectDisposedException ode) + { + if (cancellationToken.IsCancellationRequested) + { + logger.LogWarning(ode, $"ObjectDisposedException. Unable to publish {metricName} metric. Cancellation was requested."); + } + else + { + logger.LogCritical(ode, $"ObjectDisposedException. Unable to publish {metricName} metric."); + } + } + catch (OperationCanceledException oce) + { + logger.LogWarning(oce, $"OperationCanceledException. Unable to publish {metricName} metric. Cancellation was requested."); + } + catch (Exception ex) + { + logger.LogCritical(ex, $"Unable to publish {metricName} metric."); + } + } + } +} diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs index a92c531d70..c2d91ae7c5 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs @@ -11,10 +11,12 @@ using System.Threading.Tasks; using EnsureThat; using MediatR; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Primitives; using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Exceptions; +using Microsoft.Health.Fhir.Core.Extensions; using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete.Messages; using Microsoft.Health.Fhir.Core.Features.Persistence; @@ -33,19 +35,22 @@ public class BulkDeleteProcessingJob : IJob private readonly IMediator _mediator; private readonly Func> _searchService; private readonly IQueueClient _queueClient; + private readonly ILogger _logger; public BulkDeleteProcessingJob( Func> deleterFactory, RequestContextAccessor contextAccessor, IMediator mediator, Func> searchService, - IQueueClient queueClient) + IQueueClient queueClient, + ILogger logger) { _deleterFactory = EnsureArg.IsNotNull(deleterFactory, nameof(deleterFactory)); _contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor)); _mediator = EnsureArg.IsNotNull(mediator, nameof(mediator)); _searchService = EnsureArg.IsNotNull(searchService, nameof(searchService)); _queueClient = EnsureArg.IsNotNull(queueClient, nameof(queueClient)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); } public async Task ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken) @@ -100,7 +105,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, CancellationToken cancel result.ResourcesDeleted.Add(types[0], numDeleted); - await _mediator.Publish(new BulkDeleteMetricsNotification(jobInfo.Id, numDeleted), cancellationToken); + await _mediator.PublishNotificationWithExceptionHandling(nameof(BulkDeleteProcessingJob), new BulkDeleteMetricsNotification(jobInfo.Id, numDeleted), _logger, cancellationToken); if (exception != null) { diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs index 471a3bb738..1171ca4831 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs @@ -342,7 +342,7 @@ private async Task CompleteJobAsync(OperationStatus completionStatus, Cancellati dataSize, isAnonymizedExport); - await _mediator.Publish(new ExportTaskMetricsNotification(_exportJobRecord), CancellationToken.None); + await _mediator.PublishNotificationWithExceptionHandling(nameof(ExportJobTask), new ExportTaskMetricsNotification(_exportJobRecord), _logger, cancellationToken); } private async Task UpdateJobRecordAsync(CancellationToken cancellationToken) diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosResponseProcessor.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosResponseProcessor.cs index 88f09e43e4..bbac10d87f 100644 --- a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosResponseProcessor.cs +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/CosmosResponseProcessor.cs @@ -168,29 +168,7 @@ private async Task AddRequestChargeToFhirRequestContextAsync(double responseRequ cosmosMetrics.IsThrottled = true; } - try - { - await _mediator.Publish(cosmosMetrics, cancellationToken); - } - catch (ObjectDisposedException ode) - { - if (cancellationToken.IsCancellationRequested) - { - _logger.LogWarning(ode, "ObjectDisposedException. Unable to publish CosmosDB metric. Cancellation was requested."); - } - else - { - _logger.LogCritical(ode, "ObjectDisposedException. Unable to publish CosmosDB metric."); - } - } - catch (OperationCanceledException oce) - { - _logger.LogWarning(oce, "OperationCanceledException. Unable to publish CosmosDB metric. Cancellation was requested."); - } - catch (Exception ex) - { - _logger.LogCritical(ex, "Unable to publish CosmosDB metric."); - } + await _mediator.PublishNotificationWithExceptionHandling(string.Join(':', [nameof(CosmosResponseProcessor), nameof(AddRequestChargeToFhirRequestContextAsync)]), cosmosMetrics, _logger, cancellationToken); } private static string GetCustomerManagedKeyErrorMessage(int subStatusCode) @@ -257,18 +235,11 @@ private async Task EmitExceptionNotificationAsync(HttpStatusCode statusCode, Exc exceptionNotification.BaseException = exception; await _mediator.Publish(exceptionNotification, cancellationToken); - } - catch (ObjectDisposedException ode) - { - _logger.LogWarning(ode, "ObjectDisposedException. Failure while publishing Exception notification."); - } - catch (OperationCanceledException oce) - { - _logger.LogWarning(oce, "OperationCanceledException. Unable to publish CosmosDB metric. Cancellation was requested."); + await _mediator.PublishNotificationWithExceptionHandling(string.Join(':', [nameof(CosmosResponseProcessor), nameof(EmitExceptionNotificationAsync)]), exceptionNotification, _logger, cancellationToken); } catch (Exception ex) { - _logger.LogWarning(ex, "Unable to publish CosmosDB metric."); + _logger.LogCritical(ex, "Unable to publish CosmosDB metric."); } } } diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs index af614314be..e3821f43eb 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs @@ -433,7 +433,8 @@ private async Task PublishNotification(Hl7.Fhir.Model.Bundle responseBundle, Bun }); } - await _mediator.Publish(new BundleMetricsNotification(apiCallResults, bundleType == BundleType.Batch ? AuditEventSubType.Batch : AuditEventSubType.Transaction), CancellationToken.None); + var notification = new BundleMetricsNotification(apiCallResults, bundleType == BundleType.Batch ? AuditEventSubType.Batch : AuditEventSubType.Transaction); + await _mediator.PublishNotificationWithExceptionHandling(nameof(BundleHandler), notification, _logger, CancellationToken.None); } private async Task ExecuteTransactionForAllRequestsAsync(Hl7.Fhir.Model.Bundle responseBundle, BundleProcessingLogic processingLogic, CancellationToken cancellationToken)