Skip to content

Commit

Permalink
Bugfix/adapt chunking implementation to fix wrong chunk size based on…
Browse files Browse the repository at this point in the history
… utf8 strings (#107)

* Adapt release version.
Change owner of the assembly.

* Adapt release version to prepare release 3.1.0

* Adapt chunking implementation and chunk based on the byte[].
Adapt test case to real outcome checking.

* Remove abstract parameter class from payload parameters to decrease clutter.

* Add raw data fetching for data provider.

* Reduce logging for test cases.

* Change name to make the functionality a little more obvious.

* Changed implementation order.

* ...

* ...
  • Loading branch information
saschadoemer authored Mar 2, 2022
1 parent efceca1 commit ca99b16
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Agrirouter.Api.Service.Messaging
{
/// <summary>
/// An implementation for a cancellation token to run multiple attempts for fetching messages.
/// </summary>
public interface ICancellationToken
{
/// <summary>
/// Signal for the polling process to cancel the polling.
/// </summary>
/// <returns>true if the polling can be cancelled, false otherwise.</returns>
bool IsNotCancelled();

/// <summary>
/// Will wait a dedicated amount of time before starting the next step if the token is not cancelled.
/// </summary>
void WaitIfNotCancelled();

/// <summary>
/// Will be called after one step of the polling is completed and the next step is about to start.
/// </summary>
void NextStep();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Agrirouter.Api.Service.Parameters
/// <summary>
/// Parameter container definition.
/// </summary>
public class MessagePayloadParameters : Parameters
public class MessagePayloadParameters
{
/// <summary>
/// Every endpoint can send messages based on its capabilities. The size of a message is however limited. A message contains 2 parts: Header and Body. The limitation of a message is defined as follows:
Expand All @@ -16,7 +16,7 @@ public class MessagePayloadParameters : Parameters
/// The AR will return an error indicating that the message size is above the limit.
/// If the message size is above 5 MB the AR will not return any error. In order to send messages with sizes above threshold, these messages must be split into chunks with the above limit.
/// </summary>
public static int MaxLengthForRawMessageContent { get; } = 767997;
public static int MaxLengthForRawMessageContent => 767997;

/// <summary>
/// Type URL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
<PackageProjectUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard/blob/develop/LICENSE</PackageLicenseUrl>
<RepositoryUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard</RepositoryUrl>
<PackageVersion>2.3.0</PackageVersion>
<AssemblyVersion>2.3.0</AssemblyVersion>
<PackageVersion>3.1.0</PackageVersion>
<AssemblyVersion>3.1.0</AssemblyVersion>
<Company>DKE-Data GmbH &amp; Co. KG</Company>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ public static string Encode(MessageHeaderParameters messageHeaderParameters,
/// The chunk information and all IDs will be set by the SDK and are no longer in control of the application.
/// </summary>
/// <returns></returns>
public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(MessageHeaderParameters messageHeaderParameters,
MessagePayloadParameters messagePayloadParameters, OnboardResponse onboardResponse)
public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(
MessageHeaderParameters messageHeaderParameters,
MessagePayloadParameters messagePayloadParameters)
{
if (null == messageHeaderParameters || null == messagePayloadParameters || null == onboardResponse)
if (null == messageHeaderParameters || null == messagePayloadParameters)
{
throw new MissingParameterException(
"Header and payload parameters are required, as well as the onboard response.");
Expand All @@ -68,15 +69,19 @@ public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(MessageH
Log.Debug(
"The message should be chunked, current size of the payload ({}) is above the limitation.",
messagePayloadParameters.Value.ToStringUtf8().Length);
var wholeMessage = messagePayloadParameters.Value.ToStringUtf8();
var wholeMessage = messagePayloadParameters.Value.ToByteArray();
var messageChunks = SplitByLength(wholeMessage,
MessagePayloadParameters.MaxLengthForRawMessageContent).ToList();
var chunkNr = 1;
var chunkContextId = ChunkContextIdService.ChunkContextId();

return (from messageChunk in messageChunks
let messageId = MessageIdService.ApplicationMessageId()
let chunkInfo = new ChunkComponent() { Current = chunkNr++, Total = messageChunks.Count(), ContextId = chunkContextId, TotalSize = wholeMessage.Length }
let chunkInfo = new ChunkComponent()
{
Current = chunkNr++, Total = messageChunks.Count(), ContextId = chunkContextId,
TotalSize = wholeMessage.Length
}
let messageHeaderParametersForChunk = new MessageHeaderParameters()
{
Metadata = messageHeaderParameters.Metadata,
Expand All @@ -87,9 +92,19 @@ public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(MessageH
TeamSetContextId = messageHeaderParameters.TeamSetContextId,
ChunkInfo = chunkInfo
}
let messagePayloadParametersForChunk = new MessagePayloadParameters() { Value = ByteString.CopyFromUtf8(Convert.ToBase64String(Encoding.UTF8.GetBytes(messageChunk))), TypeUrl = messagePayloadParameters.TypeUrl, }
select new MessageParameterTuple { MessageHeaderParameters = messageHeaderParametersForChunk, MessagePayloadParameters = messagePayloadParametersForChunk }).ToList();
let messagePayloadParametersForChunk = new MessagePayloadParameters()
{
Value = ByteString.CopyFromUtf8(
Convert.ToBase64String(messageChunk)),
TypeUrl = messagePayloadParameters.TypeUrl,
}
select new MessageParameterTuple
{
MessageHeaderParameters = messageHeaderParametersForChunk,
MessagePayloadParameters = messagePayloadParametersForChunk
}).ToList();
}

Log.Debug("The message type needs to be base64 encoded, therefore we are encoding the raw value.");
var messagePayloadParametersWithEncodedValue = new MessagePayloadParameters()
{
Expand All @@ -106,6 +121,7 @@ public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(MessageH
}
};
}

Log.Debug(
"The message type does not need base 64 encoding, we are returning the tuple 'as it is'.");
return new List<MessageParameterTuple>()
Expand All @@ -118,12 +134,22 @@ public static List<MessageParameterTuple> ChunkAndBase64EncodeEachChunk(MessageH
};
}

private static IEnumerable<string> SplitByLength(string str, int maxLength)
private static IEnumerable<byte[]> SplitByLength(byte[] bytes, int maxLength)
{
for (int index = 0; index < str.Length; index += maxLength)
var byteArrays = new List<byte[]>();
do
{
yield return str.Substring(index, Math.Min(maxLength, str.Length - index));
var chunk = bytes.Take(maxLength).ToArray();
bytes = bytes.TakeLast(bytes.Length - maxLength).ToArray();
byteArrays.Add(chunk);
} while (bytes.Length > maxLength);

if (bytes.Length > 0)
{
byteArrays.Add(bytes);
}

return byteArrays;
}

private static RequestEnvelope Header(MessageHeaderParameters messageHeaderParameters)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Threading;
using Agrirouter.Api.Service.Messaging;

namespace Agrirouter.Impl.Service.Messaging.CancellationToken
{
/// <summary>
/// A default implementation, based on tries and waiting time.
/// </summary>
public class DefaultCancellationToken : ICancellationToken
{
private readonly int _maxTries;
private readonly int _waitTimeInMilliseconds;

private int _nrOfRetries = 0;

public DefaultCancellationToken(int maxTries, int waitTimeInMilliseconds)
{
_maxTries = maxTries;
_waitTimeInMilliseconds = waitTimeInMilliseconds;
}

public bool IsNotCancelled()
{
return _nrOfRetries < _maxTries;
}

public void WaitIfNotCancelled()
{
if (IsNotCancelled())
{
Thread.Sleep(_waitTimeInMilliseconds);
}
}

public void NextStep()
{
_nrOfRetries++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Agrirouter.Api.Dto.Messaging;
using Agrirouter.Api.Dto.Onboard;
using Agrirouter.Api.Exception;
using Agrirouter.Api.Service.Messaging;
using Newtonsoft.Json;
using Serilog;

Expand All @@ -29,10 +30,31 @@ public FetchMessageService(HttpClient httpClient)
}

/// <summary>
/// Fetch messages from the inbox using the given onboarding response.
/// Fetch messages from the inbox using the given onboard response and an implementation of a cancellation token to perform polling over a certain amount of time / tries.
/// </summary>
/// <param name="onboardResponse">All the messages that are in the inbox.</param>
/// <returns>-</returns>
/// <param name="onboardResponse">Onboard response to connect to the agrirouter.</param>
/// <param name="cancellationToken">A cancellation token to cancel the polling process.</param>
/// <returns>All the messages that are in the inbox.</returns>
/// <exception cref="CouldNotFetchMessagesException">Will be thrown if the messages can not be fetched.</exception>
public List<MessageResponse> Fetch(OnboardResponse onboardResponse, ICancellationToken cancellationToken)
{
var totalMessageResponses = new List<MessageResponse>();
while (cancellationToken.IsNotCancelled())
{
var messageResponses = Fetch(onboardResponse);
totalMessageResponses.AddRange(messageResponses);
cancellationToken.NextStep();
cancellationToken.WaitIfNotCancelled();
}

return totalMessageResponses;
}

/// <summary>
/// Fetch messages from the inbox using the given onboard response.
/// </summary>
/// <param name="onboardResponse">Onboard response to connect to the agrirouter.</param>
/// <returns>All the messages that are in the inbox.</returns>
/// <exception cref="CouldNotFetchMessagesException">Will be thrown if the messages can not be fetched.</exception>
public List<MessageResponse> Fetch(OnboardResponse onboardResponse)
{
Expand Down Expand Up @@ -60,12 +82,12 @@ public List<MessageResponse> Fetch(OnboardResponse onboardResponse)
public async Task<List<MessageResponse>> FetchAsync(OnboardResponse onboardResponse)
{
Log.Debug("Begin fetching messages.");

var httpRequestMessage = new HttpRequestMessage
{
RequestUri = new Uri(onboardResponse.ConnectionCriteria.Commands)
};

httpRequestMessage.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(MediaTypeNames.Application.Json));

var httpResponseMessage = await _httpClient.SendAsync(httpRequestMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
<PackageProjectUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard/blob/develop/LICENSE</PackageLicenseUrl>
<RepositoryUrl>https://github.com/DKE-Data/agrirouter-sdk-dotnet-standard</RepositoryUrl>
<PackageVersion>2.3.0</PackageVersion>
<AssemblyVersion>2.3.0</AssemblyVersion>
<PackageVersion>3.1.0</PackageVersion>
<AssemblyVersion>3.1.0</AssemblyVersion>
<Company>DKE-Data GmbH &amp; Co. KG</Company>
</PropertyGroup>

<ItemGroup>
Expand Down
19 changes: 9 additions & 10 deletions agrirouter-sdk-dotnet-standard-test/Data/DataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ public static string ReadBase64EncodedLargeBmp()
return Encode.ToMessageContent(allBytes);
}

public static byte[] ReadLargeBmp()
{
var path = Path.Combine(
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location) ??
throw new InvalidOperationException(),
@"Data/Content/large_bmp.bmp");
return File.ReadAllBytes(path);
}

public static string ReadBase64EncodedSmallShape()
{
var path = Path.Combine(
Expand Down Expand Up @@ -56,15 +65,5 @@ public static string ReadBase64EncodedSmallTaskData()
var allBytes = File.ReadAllBytes(path);
return Encode.ToMessageContent(allBytes);
}

public static string ReadRawLargeContentThatNeedsToBeChunked()
{
var path = Path.Combine(
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location) ??
throw new InvalidOperationException(),
@"Data/Content/Bugfixing/large_content_that_needs_to_be_chunked.zip");
var raw = File.ReadAllBytes(path);
return Convert.ToBase64String(raw);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AbstractIntegrationTestForCommunicationUnits
protected AbstractIntegrationTestForCommunicationUnits()
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Error()
.WriteTo.Console()
.WriteTo.Debug()
.CreateLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AbstractSecuredIntegrationTestForFarmingSoftware
protected AbstractSecuredIntegrationTestForFarmingSoftware()
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Error()
.WriteTo.Console()
.WriteTo.Debug()
.CreateLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AbstractSecuredIntegrationTestForTelemetryPlatform
protected AbstractSecuredIntegrationTestForTelemetryPlatform()
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Error()
.WriteTo.Console()
.WriteTo.Debug()
.CreateLogger();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using Agrirouter.Request.Payload.Endpoint;
using Agrirouter.Api.Dto.Onboard;
using Agrirouter.Api.Service.Parameters;
using Agrirouter.Api.Service.Parameters.Inner;
using Agrirouter.Impl.Service.Common;
using Agrirouter.Impl.Service.Messaging;
using Agrirouter.Impl.Service.Messaging.CancellationToken;
using Agrirouter.Test.Data;
using Agrirouter.Test.Helper;
using Xunit;

namespace Agrirouter.Test.Service.Messaging.Http
{
/// <summary>
/// Functional tests.
/// </summary>
[Collection("Integrationtest")]
public class CancellationTokenTest : AbstractIntegrationTestForCommunicationUnits
{
private static readonly HttpClient HttpClient = HttpClientFactory.AuthenticatedHttpClient(OnboardResponse);

private static OnboardResponse OnboardResponse =>
OnboardResponseIntegrationService.Read(Identifier.Http.CommunicationUnit.SingleEndpointWithP12Certificate);

[Fact(Timeout = 1600)]
public void
GivenDefaultCancellationTokenWhenFetchingMessagesTheCancellationTokenShouldBeCancelledAfterCertainRetries()
{
var fetchMessageService = new FetchMessageService(HttpClient);
var fetch = fetchMessageService.Fetch(OnboardResponse, new DefaultCancellationToken(3, 500));
Assert.Empty(fetch);
}
}
}
Loading

0 comments on commit ca99b16

Please sign in to comment.