Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming non-encoded binaries from plugins #31

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Dan.Common/Enums/EvidenceValueType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public enum EvidenceValueType
DateTime = 4,

/// <summary>
/// Binary attachment
/// Binary attachment (Base64-encoded)
/// </summary>
[EnumMember(Value = "attachment")]
Attachment = 5,
Expand All @@ -48,8 +48,14 @@ public enum EvidenceValueType
Amount = 7,

/// <summary>
/// Currency value
/// Arbitrary JSON
/// </summary>
[EnumMember(Value = "jsonSchema")]
JsonSchema = 8
JsonSchema = 8,

/// <summary>
/// Raw binary (only available without envelope, cannot be combined with other values)
/// </summary>
[EnumMember(Value = "binary")]
Binary = 9,
}
42 changes: 35 additions & 7 deletions Dan.Core.UnitTest/EvidenceHarvesterServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ public class EvidenceHarvesterServiceTest
private const string EVIDENCECODE_LEGALBASISORCONSENT_WITH_LEGALBASIS = "EvidenceCodeRequiringLegalBasisOrConsentWithLegalBasis";
private const string EVIDENCECODE_ASYNC = "EvidenceCodeAsync";
private const string EVIDENCECODE_SCOPE = "EvidenceCodeScope";
private const string EVIDENCECODE_STREAM = "EvidenceCodeStream";

private const string MOCK_HTTP_CLIENT_RESPONSE_BODY = "[{}]";

[TestInitialize]
public void Initialize()
{
_mockHttpClientFactory.Setup(_ => _.CreateClient(It.IsAny<string>())).Returns(TestHelpers.GetHttpClientMock("[{}]"));
_mockHttpClientFactory.Setup(_ => _.CreateClient(It.IsAny<string>())).Returns(TestHelpers.GetHttpClientMock(MOCK_HTTP_CLIENT_RESPONSE_BODY));
_mockTokenRequesterService.Setup(_ => _.GetMaskinportenToken(It.IsAny<string>(), It.IsAny<string>()))
.Returns(Task.FromResult("{\"access_token\":\"\"}"));
_mockRequestContextService.SetupProperty(_ => _.Request,
Expand Down Expand Up @@ -240,6 +243,30 @@ public async Task Harvest_Success_AsyncOpen()
Assert.AreEqual((int)StatusCodeId.Available, response.EvidenceStatus.Status.Code);
Assert.IsNotNull(response.EvidenceValues);
}

[TestMethod]
public async Task Harvest_Success_Stream()
{
_mockEvidenceStatusService.Setup(_ =>
_.GetEvidenceStatusAsync(It.IsAny<Accreditation>(), It.IsAny<EvidenceCode>(), It.IsAny<bool>()))
.Returns(
Task.FromResult(new EvidenceStatus()
{
Status = EvidenceStatusCode.Available
}
)
);

Accreditation accreditation = MakeAccreditation("aid", Certificates.DEFAULT_ORG, DateTime.Now.AddDays(-1));

var evidenceHarvesterService = new EvidenceHarvesterService(_loggerFactory, _mockHttpClientFactory.Object, _mockConsentService.Object, _mockEvidenceStatusService.Object, _mockTokenRequesterService.Object, _mockRequestContextService.Object);
var response = await evidenceHarvesterService.HarvestStream(EVIDENCECODE_STREAM, accreditation);

var sr = new StreamReader(response);
var responseString = await sr.ReadToEndAsync();

Assert.AreEqual(responseString, MOCK_HTTP_CLIENT_RESPONSE_BODY);
}

private static Accreditation MakeAccreditation(string id, string org, DateTime? validTo = null, string authorizationCode = null, List<string> evidenceCodes = null)
{
Expand All @@ -265,13 +292,14 @@ private static Accreditation MakeAccreditation(string id, string org, DateTime?

private static List<EvidenceCode> GetEvidenceCodes(string provider = "unittest")
{
return new List<EvidenceCode>()
return new List<EvidenceCode>
{
new EvidenceCode() { EvidenceCodeName = EVIDENCECODE_OPEN, EvidenceSource = provider, RequiredScopes = "foo" },
new EvidenceCode() { EvidenceCodeName = EVIDENCECODE_LEGALBASIS, EvidenceSource = provider },
new EvidenceCode() { EvidenceCodeName = EVIDENCECODE_CONSENT, EvidenceSource = provider, ServiceCode = "1", ServiceEditionCode = 1, },
new EvidenceCode() { EvidenceCodeName = EVIDENCECODE_ASYNC, IsAsynchronous = true, EvidenceSource = "unittest" },
new EvidenceCode() { EvidenceCodeName = EVIDENCECODE_SCOPE, EvidenceSource = "unittest", RequiredScopes = "foo"},
new() { EvidenceCodeName = EVIDENCECODE_OPEN, EvidenceSource = provider, RequiredScopes = "foo" },
new() { EvidenceCodeName = EVIDENCECODE_LEGALBASIS, EvidenceSource = provider },
new() { EvidenceCodeName = EVIDENCECODE_CONSENT, EvidenceSource = provider, ServiceCode = "1", ServiceEditionCode = 1, },
new() { EvidenceCodeName = EVIDENCECODE_ASYNC, IsAsynchronous = true, EvidenceSource = "unittest" },
new() { EvidenceCodeName = EVIDENCECODE_SCOPE, EvidenceSource = "unittest", RequiredScopes = "foo"},
new() { EvidenceCodeName = EVIDENCECODE_STREAM, EvidenceSource = "unittest", Values = new List<EvidenceValue> { new() { EvidenceValueName = "streamtest", ValueType = EvidenceValueType.Binary }}},
};
}

Expand Down
44 changes: 26 additions & 18 deletions Dan.Core/FuncDirectHarvester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,25 @@ public async Task<HttpResponseData> RunAsync(
HttpRequestData req,
string evidenceCodeName)
{

await _requestContextService.BuildRequestContext(req);

var authorizationRequest = await GetAuthorizationRequest(req, evidenceCodeName);

await _authorizationRequestValidatorService.Validate(authorizationRequest);
var evidenceCodes = _authorizationRequestValidatorService.GetEvidenceCodes();
if (evidenceCodes.Any(x => x.IsAsynchronous || (x.AuthorizationRequirements ?? new List<Requirement>()).Any(y => y is ConsentRequirement)))
var evidenceCodeForHarvest = _authorizationRequestValidatorService.GetEvidenceCodes().First();

if (evidenceCodeForHarvest.IsAsynchronous || evidenceCodeForHarvest.AuthorizationRequirements.Any(y => y is ConsentRequirement))
{
throw new InvalidEvidenceRequestException("Unable to directly harvest async or consent-based evidence code");
}

foreach (var es in evidenceCodes)
{
es.AuthorizationRequirements = new List<Requirement>();
}
evidenceCodeForHarvest.AuthorizationRequirements = new List<Requirement>();

// Create a dummy accreditation for the harvest
var validTo = _authorizationRequestValidatorService.GetValidTo();
var accreditation = new Accreditation
{
AccreditationId = Guid.NewGuid().ToString(),
EvidenceCodes = evidenceCodes,
EvidenceCodes = new List<EvidenceCode> { evidenceCodeForHarvest },
Requestor = authorizationRequest.Requestor,
RequestorParty = authorizationRequest.RequestorParty,
Subject = authorizationRequest.Subject,
Expand All @@ -87,23 +83,35 @@ public async Task<HttpResponseData> RunAsync(
ServiceContext = _requestContextService.ServiceContext.Name
};

var evidence = await _evidenceHarvesterService.Harvest(evidenceCodeName, accreditation, _requestContextService.GetEvidenceHarvesterOptionsFromRequest());
_logger.DanLog(accreditation, LogAction.AuthorizationGranted);

var response = req.CreateResponse(HttpStatusCode.OK);
if (req.HasQueryParam("envelope") && !req.GetBoolQueryParam("envelope"))

if (evidenceCodeForHarvest.Values.Any(x => x.ValueType == EvidenceValueType.Binary))
{
await response.SetUnenvelopedEvidenceValuesAsync(evidence.EvidenceValues, req.GetQueryParam(JmesPathTransfomer.QueryParameter));
// Binary content, "stream" response to client, This is not actually streaming, as
// the HttpResponseData representation does not provide access to the actual HttpRequest.
response.Headers.Add("Content-Type", "application/octet-stream");
var upstreamResponse = await _evidenceHarvesterService.HarvestStream(evidenceCodeName, accreditation,
_requestContextService.GetEvidenceHarvesterOptionsFromRequest());

await upstreamResponse.CopyToAsync(response.Body);
}
else
{
await response.SetEvidenceAsync(evidence);
var evidence = await _evidenceHarvesterService.Harvest(evidenceCodeName, accreditation, _requestContextService.GetEvidenceHarvesterOptionsFromRequest());
if (req.HasQueryParam("envelope") && !req.GetBoolQueryParam("envelope"))
{
await response.SetUnenvelopedEvidenceValuesAsync(evidence.EvidenceValues, req.GetQueryParam(JmesPathTransfomer.QueryParameter));
}
else
{
await response.SetEvidenceAsync(evidence);
}
}


_logger.DanLog(accreditation, LogAction.AuthorizationGranted);
_logger.DanLog(accreditation, LogAction.DataRetrieved);

return response;

}

private static async Task<AuthorizationRequest> GetAuthorizationRequest(HttpRequestData req, string evidenceCodeName)
Expand Down
40 changes: 27 additions & 13 deletions Dan.Core/FuncEvidenceHarvester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,47 @@ public async Task<HttpResponseData> RunAsync(

var authorizationRequest = GetAuthorizationRequest(accreditation);
await _authorizationRequestValidatorService.Validate(authorizationRequest);

var evidence = await _evidenceHarvesterService.Harvest(evidenceCodeName, accreditation,
_requestContextService.GetEvidenceHarvesterOptionsFromRequest());

var evidenceCodeForHarvest = accreditation.GetValidEvidenceCode(evidenceCodeName);

var response = req.CreateResponse(HttpStatusCode.OK);
if (req.HasQueryParam("envelope") && !req.GetBoolQueryParam("envelope"))

if (evidenceCodeForHarvest.Values.Any(x => x.ValueType == EvidenceValueType.Binary))
{
await response.SetUnenvelopedEvidenceValuesAsync(evidence.EvidenceValues, req.GetQueryParam(JmesPathTransfomer.QueryParameter));
// Binary content, "stream" response to client, This is not actually streaming, as
// the HttpResponseData representation does not provide access to the actual HttpRequest.
response.Headers.Add("Content-Type", "application/octet-stream");
var upstreamResponse = await _evidenceHarvesterService.HarvestStream(evidenceCodeName, accreditation,
_requestContextService.GetEvidenceHarvesterOptionsFromRequest());

await upstreamResponse.CopyToAsync(response.Body);
}
else
{
await response.SetEvidenceAsync(evidence);
}
var evidence = await _evidenceHarvesterService.Harvest(evidenceCodeName, accreditation,
_requestContextService.GetEvidenceHarvesterOptionsFromRequest());

var evidenceCode = accreditation.GetValidEvidenceCode(evidenceCodeName);
if (req.HasQueryParam("envelope") && !req.GetBoolQueryParam("envelope"))
{
await response.SetUnenvelopedEvidenceValuesAsync(evidence.EvidenceValues,
req.GetQueryParam(JmesPathTransfomer.QueryParameter));
}
else
{
await response.SetEvidenceAsync(evidence);
}
}

if (_consentService.EvidenceCodeRequiresConsent(evidenceCode))
if (_consentService.EvidenceCodeRequiresConsent(evidenceCodeForHarvest))
{
using (var t = _logger.Timer("consent-log-usage"))
{
_logger.LogInformation(
"Start logging consent based harvest aid={accreditationId} evidenceCode={evidenceCode}",
accreditation.AccreditationId, evidenceCode.EvidenceCodeName);
await LogConsentBasedHarvest(evidenceCode, accreditation);
accreditation.AccreditationId, evidenceCodeForHarvest.EvidenceCodeName);
await LogConsentBasedHarvest(evidenceCodeForHarvest, accreditation);
_logger.LogInformation(
"Completed logging consent based harvest aid={accreditationId} evidenceCode={evidenceCode} elapsedMs={elapsedMs}",
accreditation.AccreditationId, evidenceCode.EvidenceCodeName, t.ElapsedMilliseconds);
accreditation.AccreditationId, evidenceCodeForHarvest.EvidenceCodeName, t.ElapsedMilliseconds);
}
}

Expand Down
97 changes: 78 additions & 19 deletions Dan.Core/Services/EvidenceHarvesterService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Dan.Common.Enums;
using System.Net;
using Dan.Common.Enums;
using Dan.Common.Models;
using Dan.Core.Config;
using Dan.Core.Exceptions;
Expand Down Expand Up @@ -57,6 +58,29 @@ public async Task<Evidence> Harvest(string evidenceCodeName, Accreditation accre
return evidence;
}

public async Task<Stream> HarvestStream(string evidenceCodeName, Accreditation accreditation,
EvidenceHarvesterOptions? evidenceHarvesterOptions = default)
{
var evidenceCode = accreditation.GetValidEvidenceCode(evidenceCodeName);

_log.LogInformation("Start get evidence status | aid={accreditationId}, evidenceCode={evidenceCodeName}", accreditation.AccreditationId, evidenceCode.EvidenceCodeName);
var evidenceStatus = await _evidenceStatusService.GetEvidenceStatusAsync(accreditation, evidenceCode, false);

ThrowIfNotAvailableForHarvest(evidenceStatus);

Stream harvestedEvidenceStream;
using (var _ = _log.Timer($"{evidenceCode.EvidenceCodeName}-harvest-stream"))
{
_log.LogInformation("Start harvesting evidence as stream | aid={accreditationId}, status={evidenceStatus}, evidenceCode={evidenceCodeName}",
accreditation.AccreditationId, evidenceStatus.Status.Description, evidenceCode.EvidenceCodeName
);
harvestedEvidenceStream = await HarvestEvidenceStream(evidenceCode, accreditation, evidenceHarvesterOptions);
_log.LogInformation("Completed harvesting evidence as stream | aid={accreditationId}", accreditation.AccreditationId);
}

return harvestedEvidenceStream;
}

public async Task<Evidence> HarvestOpenData(EvidenceCode evidenceCode, string identifier = "")
{
_log.LogDebug("Running HaaS (Harvest as a Service) for open data with dataset {evidenceCodeName} and identifier {identifier}", evidenceCode.EvidenceCodeName, identifier == "" ? "(empty)" : identifier);
Expand Down Expand Up @@ -110,17 +134,65 @@ public async Task<Evidence> HarvestOpenData(EvidenceCode evidenceCode, string id
return evidence;
}

private async Task<List<EvidenceValue>> HarvestEvidenceValues(EvidenceCode evidenceCode, Accreditation accreditation, EvidenceHarvesterOptions? evidenceHarvesterOptions = default)
private async Task<Stream> HarvestEvidenceStream(EvidenceCode evidenceCode, Accreditation accreditation,
EvidenceHarvesterOptions? evidenceHarvesterOptions = default)
{
var url = evidenceCode.GetEvidenceSourceUrl();
var request = await GetEvidenceHarvesterRequestMessage(accreditation, evidenceCode, evidenceHarvesterOptions);
var timeoutSeconds = evidenceCode.Timeout ??= Settings.DefaultHarvestTaskCancellation;
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(timeoutSeconds));
request.SetPolicyExecutionContext(new Context(request.Key(CacheArea.Absolute)));
try
{
var client = _httpClientFactory.CreateClient("SafeHttpClient");

// When attempting to stream from the evidence source, we simplify error handling
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsStreamAsync(cts.Token);
}

var request = new HttpRequestMessage(HttpMethod.Post, url);
request.Headers.TryAddWithoutValidation("Content-Type", "application/json");
throw new ServiceNotAvailableException(
$"Unable to stream response from source (got {response.StatusCode})");

}
catch (TaskCanceledException)
{
_log.LogError("Streaming evidence for evidenceCode={evidenceCodeName} and subject {subject} was cancelled", evidenceCode.EvidenceCodeName, accreditation.SubjectParty.GetAsString());
throw new ServiceNotAvailableException($"The request was cancelled after exceeding max duration ({timeoutSeconds} seconds)");
}
}

private async Task<List<EvidenceValue>> HarvestEvidenceValues(EvidenceCode evidenceCode, Accreditation accreditation, EvidenceHarvesterOptions? evidenceHarvesterOptions = default)
{
var request = await GetEvidenceHarvesterRequestMessage(accreditation, evidenceCode, evidenceHarvesterOptions);
var timeoutSeconds = evidenceCode.Timeout ??= Settings.DefaultHarvestTaskCancellation;

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(timeoutSeconds));
request.SetPolicyExecutionContext(new Context(request.Key(CacheArea.Absolute)));
try
{
var client = _httpClientFactory.CreateClient("SafeHttpClient");
return (await EvidenceSourceHelper.DoRequest<List<EvidenceValue>>(
request,
() => client.SendAsync(request, cts.Token)))!;
}
catch (TaskCanceledException)
{
_log.LogError("Harvesting evidence values for open data evidenceCode={evidenceCodeName} and subject {subject} was cancelled", evidenceCode.EvidenceCodeName, accreditation.SubjectParty.GetAsString());
throw new ServiceNotAvailableException($"The request was cancelled after exceeding max duration of {timeoutSeconds} seconds)");
}
}

private async Task<HttpRequestMessage> GetEvidenceHarvesterRequestMessage(Accreditation accreditation,
EvidenceCode evidenceCode, EvidenceHarvesterOptions? evidenceHarvesterOptions = default)
{
var url = evidenceCode.GetEvidenceSourceUrl();

var request = new HttpRequestMessage(HttpMethod.Post, url);
request.Headers.TryAddWithoutValidation("Content-Type", "application/json");

var evidenceHarvesterRequest = new EvidenceHarvesterRequest()
{
Expand Down Expand Up @@ -151,20 +223,7 @@ private async Task<List<EvidenceValue>> HarvestEvidenceValues(EvidenceCode evide

request.JsonContent(evidenceHarvesterRequest);

request.SetPolicyExecutionContext(new Context(request.Key(CacheArea.Absolute)));

try
{
var client = _httpClientFactory.CreateClient("SafeHttpClient");
return (await EvidenceSourceHelper.DoRequest<List<EvidenceValue>>(
request,
() => client.SendAsync(request, cts.Token)))!;
}
catch (TaskCanceledException)
{
_log.LogError("Harvesting evidence values for open data evidenceCode={evidenceCodeName} and subject {subject} was cancelled", evidenceCode.EvidenceCodeName, accreditation.SubjectParty.GetAsString(true));
throw new ServiceNotAvailableException($"The request was cancelled after exceeding max duration of {timeoutSeconds} seconds)");
}
return request;
}

private async Task<string> GetAccessToken(EvidenceCode evidenceCode, Accreditation accreditation, EvidenceHarvesterOptions evidenceHarvesterOptions)
Expand Down
Loading