Skip to content

Commit

Permalink
feature: #964: Added new local function to allow import and scoring o…
Browse files Browse the repository at this point in the history
…f WMDA consensus dataset exercises.
  • Loading branch information
zabeen committed May 10, 2023
1 parent 546d26e commit bfe4766
Show file tree
Hide file tree
Showing 13 changed files with 456 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Atlas.Functions.PublicApi/CHANGELOG_Atlas.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ The project version will be appropriately incremented with each change to the pr
#### Match Prediction
* Changed the way match prediction requests are queued for processing by activity functions, to prevent search requests with many donors from blocking the completion of smaller search requests.

#### Manual Testing
- Two locally-running functions added to `Atlas.ManualTesting.Functions` to allow running of exercises 1 and 2 of the WMDA consensus dataset.

### 1.5.0

Expand Down
29 changes: 16 additions & 13 deletions Atlas.ManualTesting.Common/SubjectImport/SubjectInfoReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,26 @@ public class SubjectInfoReader : ISubjectInfoReader
{
public async Task<IReadOnlyCollection<ImportedSubject>> Read(string filePath)
{
if (!File.Exists(filePath))
if (string.IsNullOrEmpty(filePath))
{
throw new ArgumentException($"File not found at {filePath}.");
throw new ArgumentNullException(nameof(filePath));
}
await using (var stream = File.OpenRead(filePath))

if (!File.Exists(filePath))
{
using (var reader = new StreamReader(stream))
using (var csv = new CsvReader(reader))
{
csv.Configuration.Delimiter = ";";
csv.Configuration.HeaderValidated = null;
csv.Configuration.MissingFieldFound = null;
csv.Configuration.TypeConverterOptionsCache.GetOptions<string>().NullValues.Add("");
return csv.GetRecords<ImportedSubject>().ToList();
}
throw new ArgumentException($"File not found at {filePath}.");
}

await using var stream = File.OpenRead(filePath);
using var reader = new StreamReader(stream);
using var csv = new CsvReader(reader);

csv.Configuration.Delimiter = ";";
csv.Configuration.HeaderValidated = null;
csv.Configuration.MissingFieldFound = null;
csv.Configuration.TypeConverterOptionsCache.GetOptions<string>().NullValues.Add("");

return csv.GetRecords<ImportedSubject>().ToList();
}
}
}
2 changes: 2 additions & 0 deletions Atlas.ManualTesting/Atlas.ManualTesting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
<ProjectReference Include="..\Atlas.Client.Models\Atlas.Client.Models.csproj" />
<ProjectReference Include="..\Atlas.Common\Atlas.Common.csproj" />
<ProjectReference Include="..\Atlas.DonorImport\Atlas.DonorImport.csproj" />
<ProjectReference Include="..\Atlas.ManualTesting.Common\Atlas.ManualTesting.Common.csproj" />
<ProjectReference Include="..\Atlas.MatchingAlgorithm.Client.Models\Atlas.MatchingAlgorithm.Client.Models.csproj" />
<ProjectReference Include="..\Atlas.MatchingAlgorithm.Common\Atlas.MatchingAlgorithm.Common.csproj" />
<ProjectReference Include="..\Atlas.MatchingAlgorithm\Atlas.MatchingAlgorithm.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using Atlas.Common.Utils.Extensions;
using Atlas.DonorImport.Data.Repositories;
using Atlas.DonorImport.ExternalInterface.Models;
using Atlas.ManualTesting.Common.SubjectImport;
using Atlas.ManualTesting.Services;
using Atlas.ManualTesting.Services.Scoring;
using Atlas.ManualTesting.Services.ServiceBus;
using Atlas.ManualTesting.Settings;
using Atlas.MatchingAlgorithm.Common.Models;
Expand Down Expand Up @@ -34,6 +36,7 @@ private static void RegisterSettings(this IServiceCollection services)
services.RegisterAsOptions<MessagingServiceBusSettings>("MessagingServiceBus");
services.RegisterAsOptions<MatchingSettings>("Matching");
services.RegisterAsOptions<DonorManagementSettings>("Matching:DonorManagement");
services.RegisterAsOptions<ScoringSettings>("Scoring");
services.RegisterAsOptions<SearchSettings>("Search");
}

Expand Down Expand Up @@ -80,6 +83,10 @@ Func<IServiceProvider, DonorManagementSettings> fetchDonorManagementSettings
services.AddScoped<ISearchableDonorUpdatesPeeker, SearchableDonorUpdatesPeeker>();

services.AddScoped<IDonorStoresInspector, DonorStoresInspector>();

services.AddScoped<ISubjectInfoReader, SubjectInfoReader>();
services.AddScoped<IScoreBatchRequester, ScoreBatchRequester>();
services.AddScoped<IScoreRequestProcessor, ScoreRequestProcessor>();
}

private static void RegisterDatabaseServices(
Expand Down
100 changes: 100 additions & 0 deletions Atlas.ManualTesting/Functions/WmdaConsensusDatasetScoringFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Atlas.Client.Models.Search.Requests;
using Atlas.Client.Models.Search.Results.Matching.PerLocus;
using Atlas.Common.Public.Models.GeneticData;
using Atlas.ManualTesting.Models;
using Atlas.ManualTesting.Services.Scoring;
using Atlas.MatchingAlgorithm.Client.Models.Scoring;
using AzureFunctions.Extensions.Swashbuckle.Attribute;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Newtonsoft.Json;

namespace Atlas.ManualTesting.Functions
{
/// <summary>
/// Functions that process the WMDA consensus datasets for exercises that involve the counting of mismatches.
/// </summary>
public class WmdaConsensusDatasetScoringFunctions
{
private readonly IScoreRequestProcessor scoreRequestProcessor;

public WmdaConsensusDatasetScoringFunctions(IScoreRequestProcessor scoreRequestProcessor)
{
this.scoreRequestProcessor = scoreRequestProcessor;
}

[FunctionName(nameof(ProcessWmdaConsensusDataset_Exercise1))]
public async Task ProcessWmdaConsensusDataset_Exercise1(
[RequestBodyType(typeof(ImportAndScoreRequest), nameof(ImportAndScoreRequest))]
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequest request)
{
var importAndScoreRequest = JsonConvert.DeserializeObject<ImportAndScoreRequest>(await new StreamReader(request.Body).ReadToEndAsync());

await scoreRequestProcessor.ProcessScoreRequest(new ScoreRequestProcessorInput
{
ImportAndScoreRequest = importAndScoreRequest,
ScoringCriteria = BuildThreeLocusScoringCriteria(),
ResultTransformer = TransformScoringResultForExercise1
});
}

[FunctionName(nameof(ProcessWmdaConsensusDataset_Exercise2))]
public async Task ProcessWmdaConsensusDataset_Exercise2(
[RequestBodyType(typeof(ImportAndScoreRequest), nameof(ImportAndScoreRequest))]
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequest request)
{
var importAndScoreRequest = JsonConvert.DeserializeObject<ImportAndScoreRequest>(await new StreamReader(request.Body).ReadToEndAsync());

await scoreRequestProcessor.ProcessScoreRequest(new ScoreRequestProcessorInput
{
ImportAndScoreRequest = importAndScoreRequest,
ScoringCriteria = BuildThreeLocusScoringCriteria(),
ResultTransformer = TransformScoringResultForExercise2
});
}

private static ScoringCriteria BuildThreeLocusScoringCriteria()
{
return new ScoringCriteria
{
LociToScore = new[] { Locus.A, Locus.B, Locus.Drb1 },
LociToExcludeFromAggregateScore = new List<Locus>()
};
}

private static string TransformScoringResultForExercise1(string patientId, string donorId, ScoringResult result)
{
static string CountMismatches(LocusSearchResult locusResult) => $"{2-locusResult.MatchCount}";

return $"{patientId};{donorId};" +
$"{CountMismatches(result.SearchResultAtLocusA)};" +
$"{CountMismatches(result.SearchResultAtLocusB)};" +
$"{CountMismatches(result.SearchResultAtLocusDrb1)}";
}

private static string TransformScoringResultForExercise2(string patientId, string donorId, ScoringResult result)
{
static string CountMismatches(LocusSearchResult locusResult) => $"{2 - locusResult.MatchCount}";
static int CountAntigenMismatches(LocusSearchResult locusResult)
{
return new List<bool?>
{
locusResult.ScoreDetailsAtPositionOne.IsAntigenMatch,
locusResult.ScoreDetailsAtPositionTwo.IsAntigenMatch
}.Count(x => x.HasValue && !x.Value);
}

return $"{patientId};{donorId};" +
$"{CountMismatches(result.SearchResultAtLocusA)};{CountAntigenMismatches(result.SearchResultAtLocusA)};" +
$"{CountMismatches(result.SearchResultAtLocusB)};{CountAntigenMismatches(result.SearchResultAtLocusB)};" +
$"{CountMismatches(result.SearchResultAtLocusDrb1)};{CountAntigenMismatches(result.SearchResultAtLocusDrb1)}";
}
}
}
33 changes: 33 additions & 0 deletions Atlas.ManualTesting/Models/ImportAndScoreRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Atlas.ManualTesting.Models
{
public class ImportAndScoreRequest
{
/// <summary>
/// Path to file containing patient HLA
/// </summary>
public string PatientFilePath { get; set; }

/// <summary>
/// Path to file containing donor HLA
/// </summary>
public string DonorFilePath { get; set; }

/// <summary>
/// Path to file where results should be written
/// </summary>
public string ResultsFilePath { get; set; }

/// <summary>
/// In case the scoring request is interrupted, it can be restarted from a later patient id.
/// If not set, then scoring will commence from the start of the patient collection.
/// </summary>
public string StartFromPatientId { get; set; }

/// <summary>
/// In case the scoring request is interrupted, it can be restarted from a later donor id for the patient listed in <see cref="StartFromPatientId"/>.
/// The ID will only be applied to the first patient processed, thereafter the entire donor collection will be scored.
/// If not set, then scoring will commence from the start of the donor collection.
/// </summary>
public string StartFromDonorId { get; set; }
}
}
25 changes: 25 additions & 0 deletions Atlas.ManualTesting/Models/ImportedSubjectExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Atlas.Common.Public.Models.GeneticData.PhenotypeInfo;
using Atlas.Common.Public.Models.GeneticData.PhenotypeInfo.TransferModels;
using Atlas.ManualTesting.Common.SubjectImport;

namespace Atlas.ManualTesting.Models
{
internal static class ImportedSubjectExtensions
{
public static PhenotypeInfoTransfer<string> ToPhenotypeInfoTransfer(this ImportedSubject subject)
{
return new PhenotypeInfo<string>(
valueA_1: subject.A_1,
valueA_2: subject.A_2,
valueB_1: subject.B_1,
valueB_2: subject.B_2,
valueC_1: subject.C_1,
valueC_2: subject.C_2,
valueDqb1_1: subject.DQB1_1,
valueDqb1_2: subject.DQB1_2,
valueDrb1_1: subject.DRB1_1,
valueDrb1_2: subject.DRB1_2)
.ToPhenotypeInfoTransfer();
}
}
}
130 changes: 130 additions & 0 deletions Atlas.ManualTesting/Services/Scoring/ScoreBatchRequester.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Atlas.Client.Models.Search.Requests;
using Atlas.ManualTesting.Common.SubjectImport;
using Atlas.ManualTesting.Models;
using Atlas.ManualTesting.Settings;
using Atlas.MatchingAlgorithm.Client.Models.Scoring;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Polly;

namespace Atlas.ManualTesting.Services.Scoring
{
public class ScoreBatchRequest
{
public ImportedSubject Patient { get; set; }
public IEnumerable<ImportedSubject> Donors { get; set; }
public string ResultsDirectory { get; set; }
public ScoringCriteria ScoringCriteria { get; set; }
}

public interface IScoreBatchRequester
{
/// <returns>Id for Verification Run</returns>
Task<IEnumerable<DonorScoringResult>> ScoreBatch(ScoreBatchRequest request);
}

internal class ScoreBatchRequester : IScoreBatchRequester
{
private static readonly HttpClient HttpRequestClient = new();
private readonly ScoringSettings scoringSettings;

public ScoreBatchRequester(IOptions<ScoringSettings> settings)
{
scoringSettings = settings.Value;
}

public async Task<IEnumerable<DonorScoringResult>> ScoreBatch(ScoreBatchRequest request)
{
if (request?.Patient is null ||
!request.Donors.Any() ||
string.IsNullOrWhiteSpace(request.ResultsDirectory) ||
request.ScoringCriteria is null)
{
throw new ArgumentException("ScoreBatch request is missing required data.");
}

return await ScoreBatchRequest(request);
}

private async Task<IEnumerable<DonorScoringResult>> ScoreBatchRequest(ScoreBatchRequest scoreBatchRequest)
{
var retryPolicy = Policy.Handle<Exception>().RetryAsync(10);
var request = BuildScoreBatchRequest(scoreBatchRequest);

var requestResponse = await retryPolicy.ExecuteAndCaptureAsync(
async () => await SubmitScoreBatchRequest(request, scoreBatchRequest.Patient.ID));

if (requestResponse.Outcome == OutcomeType.Successful)
{
return requestResponse.Result;
}

await WriteFailuresToFile(scoreBatchRequest);
return new List<DonorScoringResult>();

}

private static BatchScoringRequest BuildScoreBatchRequest(ScoreBatchRequest request)
{
return new BatchScoringRequest
{
PatientHla = request.Patient.ToPhenotypeInfoTransfer(),
DonorsHla = request.Donors.Select(ToIdentifiedDonorHla),
ScoringCriteria = request.ScoringCriteria
};
}

private static IdentifiedDonorHla ToIdentifiedDonorHla(ImportedSubject donor)
{
var donorHla = donor.ToPhenotypeInfoTransfer();

return new IdentifiedDonorHla
{
DonorId = donor.ID,
A = donorHla.A,
B = donorHla.B,
C = donorHla.C,
Dqb1 = donorHla.Dqb1,
Drb1 = donorHla.Drb1,
Dpb1 = donorHla.Dpb1
};
}

private async Task<IReadOnlyCollection<DonorScoringResult>> SubmitScoreBatchRequest(BatchScoringRequest scoreBatchRequest, string patientId)
{
var firstDonorIdInBatch = scoreBatchRequest.DonorsHla.FirstOrDefault()?.DonorId;

try
{
var response = await HttpRequestClient.PostAsync(
scoringSettings.ScoreBatchRequestUrl, new StringContent(JsonConvert.SerializeObject(scoreBatchRequest)));
response.EnsureSuccessStatusCode();
var scoringResult = JsonConvert.DeserializeObject<List<DonorScoringResult>>(await response.Content.ReadAsStringAsync());

Debug.WriteLine($"ScoreBatch result received for patient {patientId}, first donor in batch {firstDonorIdInBatch}");

return scoringResult;
}
catch (Exception ex)
{
Debug.WriteLine($"ScoreBatch request for failed for patient {patientId}, first donor in batch {firstDonorIdInBatch}. Details: {ex.Message} " +
"Re-attempting until success or re-attempt count reached.");
throw;
}
}

private static async Task WriteFailuresToFile(ScoreBatchRequest scoreBatchRequest)
{
var failedRequestsPath = scoreBatchRequest.ResultsDirectory + "\\failedScoringRequests.txt";
var contents = $"{scoreBatchRequest.Patient.ID}:{string.Join(",", scoreBatchRequest.Donors.Select(d => d.ID))}";
await File.AppendAllTextAsync(failedRequestsPath, contents);
}
}
}
Loading

0 comments on commit bfe4766

Please sign in to comment.