Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Data Pipeline Operators #1378

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
441b232
Added support for Dapr data processor - still working on unit tests
WhitWaldo Oct 19, 2024
4c521a9
Adding missed unit test
WhitWaldo Oct 19, 2024
008c2a7
Adding unit tests - still a few remaining failures
WhitWaldo Oct 19, 2024
0fcd4f5
Building out unit tests, made some perf changes.
WhitWaldo Oct 22, 2024
701f326
Shortened operator names, made more consistent
WhitWaldo Oct 22, 2024
7f558a6
Fixed unit tests, removed mapping operations
WhitWaldo Oct 23, 2024
3982164
Added another unit test
WhitWaldo Oct 23, 2024
1420a39
Updated to support duplicate operations with appropriate metadata ret…
WhitWaldo Oct 23, 2024
1d57fd5
Updated unit tests + unit test class name
WhitWaldo Oct 23, 2024
0cd2586
Minor file rename
WhitWaldo Oct 23, 2024
7ca1b6b
Removed outdated project reference
WhitWaldo Oct 23, 2024
fcf02b0
Removed unnecessary usings that were inadvertently added
WhitWaldo Oct 23, 2024
cdc73ed
Merge branch 'master' into dapr-data-operator
WhitWaldo Oct 23, 2024
2b584ad
Merge branch 'master' into dapr-data-operator
WhitWaldo Oct 25, 2024
cf9e5c1
Merge branch 'master' into dapr-data-operator
WhitWaldo Nov 1, 2024
56303f9
Merge branch 'master' into dapr-data-operator
WhitWaldo Nov 1, 2024
5bb151e
Merge branch 'master' into dapr-data-operator
WhitWaldo Nov 4, 2024
2b0cafc
Merge branch 'master' into dapr-data-operator
WhitWaldo Nov 5, 2024
9e1361c
Merge branch 'master' into dapr-data-operator
WhitWaldo Nov 30, 2024
1ff81f2
Merge branch 'master' into dapr-data-operator
WhitWaldo Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/Dapr.Common/Data/Attributes/DataPipelineAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Common.Data.Operations;

namespace Dapr.Common.Data.Attributes;

/// <summary>
/// Attribute-based approach for indicating which data operations should be performed on a type and in what order.
/// </summary>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)]
public sealed class DataPipelineAttribute : Attribute
{
/// <summary>
/// Contains the various data operation types available and the order in which to apply them.
/// </summary>
public readonly IReadOnlyList<Type> DataOperationTypes;

/// <summary>
/// Initializes a new <see cref="DataPipelineAttribute"/>.
/// </summary>
/// <param name="dataOperationTypes"></param>
/// <exception cref="DaprException"></exception>
public DataPipelineAttribute(params Type[] dataOperationTypes)
{
var registeredTypes = new List<Type>();

foreach (var type in dataOperationTypes)
{
if (!typeof(IDaprDataOperation).IsAssignableFrom(type))
throw new DaprException($"Unable to register data preparation operation as {nameof(type)} does not implement `IDataOperation`");

registeredTypes.Add(type);
}

DataOperationTypes = registeredTypes;
}
}
145 changes: 145 additions & 0 deletions src/Dapr.Common/Data/DaprDecoderPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Common.Data.Operations;

namespace Dapr.Common.Data;

/// <summary>
/// Processes the data using the provided <see cref="IDaprDataOperation{TInput,TOutput}"/> providers.
/// </summary>
internal sealed class DaprDecoderPipeline<TOutput>
{
private readonly Stack<string> prefixKeys;
private readonly IDaprTStringTransitionOperation<TOutput>? genericToStringOp;
private readonly List<IDaprStringBasedOperation> stringOps = new();
private readonly IDaprStringByteTransitionOperation? stringToByteOp;
private readonly List<IDaprByteBasedOperation> byteOps = new();

/// <summary>
/// Used to initialize a new <see cref="DaprDecoderPipeline{TInput}"/>.
/// </summary>
public DaprDecoderPipeline(IEnumerable<IDaprDataOperation> operations, Stack<string> prefixKeys)
{
this.prefixKeys = prefixKeys;

foreach (var op in operations)
{
switch (op)
{
case IDaprTStringTransitionOperation<TOutput> genToStrOp when genericToStringOp is not null:
throw new DaprException(
$"Multiple types are declared for the conversion of the data to a string in the data pipeline for {typeof(TOutput)} - only one is allowed");
case IDaprTStringTransitionOperation<TOutput> genToStrOp:
genericToStringOp = genToStrOp;
break;
case IDaprStringBasedOperation strOp:
stringOps.Add(strOp);
break;
case IDaprStringByteTransitionOperation strToByte when stringToByteOp is not null:
throw new DaprException(
$"Multiple types are declared for the pipeline conversion from a string to a byte array for {typeof(TOutput)} - only one is allowed");
case IDaprStringByteTransitionOperation strToByte:
stringToByteOp = strToByte;
break;
case IDaprByteBasedOperation byteOp:
byteOps.Add(byteOp);
break;
}
}

if (genericToStringOp is null)
{
throw new DaprException(
$"A pipeline operation must be specified to convert a {typeof(TOutput)} into a serializable string");
}

if (stringToByteOp is null)
{
throw new DaprException(
$"A pipeline operation must be specified to convert a {typeof(TOutput)} into a byte array");
}
}

/// <summary>
/// Processes the reverse of the data in the order of the provided list of <see cref="IDaprDataOperation{TInput,TOutput}"/>.
/// </summary>
/// <param name="payload">The data to process in reverse.</param>
/// <param name="metadata">The metadata providing the mechanism(s) used to encode the data.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The evaluated data.</returns>
public async Task<DaprOperationPayload<TOutput?>> ReverseProcessAsync(ReadOnlyMemory<byte> payload,
Dictionary<string, string> metadata, CancellationToken cancellationToken = default)
{
var metadataPrefixes = new Stack<string>(prefixKeys);

//First, perform byte-based operations
var inboundPayload = new DaprOperationPayload<ReadOnlyMemory<byte>>(payload) { Metadata = metadata };
var byteBasedResult = await ReverseByteOperationsAsync(inboundPayload, metadataPrefixes, cancellationToken);

//Convert this back to a string from a byte array
var currentPrefix = metadataPrefixes.Pop();
var stringResult = await stringToByteOp!.ReverseAsync(byteBasedResult, currentPrefix, cancellationToken);

//Perform the string-based operations
var stringBasedResult = await ReverseStringOperationsAsync(stringResult, metadataPrefixes, cancellationToken);

//Convert from a string back into its generic type
currentPrefix = metadataPrefixes.Pop();
var genericResult = await genericToStringOp!.ReverseAsync(stringBasedResult, currentPrefix, cancellationToken);

return genericResult;
}

/// <summary>
/// Performs a reversal operation for the string-based operations.
/// </summary>
/// <param name="payload">The payload to run the reverse operation against.</param>
/// <param name="metadataPrefixes">The prefix values for retrieving data from the metadata for this operation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
private async Task<DaprOperationPayload<string?>> ReverseStringOperationsAsync(
DaprOperationPayload<string?> payload,
Stack<string> metadataPrefixes, CancellationToken cancellationToken)
{
stringOps.Reverse();
foreach (var op in stringOps)
{
var currentPrefix = metadataPrefixes.Pop();
payload = await op.ReverseAsync(payload, currentPrefix, cancellationToken);
}

return payload;
}

/// <summary>
/// Performs a reversal operation for the byte-based operations.
/// </summary>
/// <param name="payload">The current state of the payload.</param>
/// <param name="metadataPrefixes">The prefix values for retrieving data from the metadata for this operation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The most up-to-date payload.</returns>
private async Task<DaprOperationPayload<ReadOnlyMemory<byte>>>
ReverseByteOperationsAsync(DaprOperationPayload<ReadOnlyMemory<byte>> payload, Stack<string> metadataPrefixes,
CancellationToken cancellationToken)
{
byteOps.Reverse();
foreach (var op in byteOps)
{
var currentPrefix = metadataPrefixes.Pop();
payload = await op.ReverseAsync(payload, currentPrefix, cancellationToken);
}

return payload;
}
}
155 changes: 155 additions & 0 deletions src/Dapr.Common/Data/DaprEncoderPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Common.Data.Extensions;
using Dapr.Common.Data.Operations;

namespace Dapr.Common.Data;

/// <summary>
/// Processes the data using the provided <see cref="IDaprDataOperation{TInput,TOutput}"/> providers.
/// </summary>
internal sealed class DaprEncoderPipeline<TInput>
{
/// <summary>
/// The metadata key containing the operations.
/// </summary>
private const string OperationKey = "ops";

private readonly List<string> operationNames = new();
private readonly Dictionary<string, int> operationInvocations = new();

private readonly IDaprTStringTransitionOperation<TInput>? genericToStringOp;
private readonly List<IDaprStringBasedOperation> stringOps = new();
private readonly IDaprStringByteTransitionOperation? stringToByteOp;
private readonly List<IDaprByteBasedOperation> byteOps = new();

/// <summary>
/// Used to initialize a new <see cref="DaprEncoderPipeline{TInput}"/>.
/// </summary>
public DaprEncoderPipeline(IEnumerable<IDaprDataOperation> operations)
{
foreach (var op in operations)
{
switch (op)
{
case IDaprTStringTransitionOperation<TInput> genToStrOp when genericToStringOp is not null:
throw new DaprException(
$"Multiple types are declared for the conversion of the data to a string in the data pipeline for {typeof(TInput)} - only one is allowed");
case IDaprTStringTransitionOperation<TInput> genToStrOp:
genericToStringOp = genToStrOp;
break;
case IDaprStringBasedOperation strOp:
stringOps.Add(strOp);
break;
case IDaprStringByteTransitionOperation strToByte when stringToByteOp is not null:
throw new DaprException(
$"Multiple types are declared for the pipeline conversion from a string to a byte array for {typeof(TInput)} - only one is allowed");
case IDaprStringByteTransitionOperation strToByte:
stringToByteOp = strToByte;
break;
case IDaprByteBasedOperation byteOp:
byteOps.Add(byteOp);
break;
}
}

if (genericToStringOp is null)
{
throw new DaprException(
$"A pipeline operation must be specified to convert a {typeof(TInput)} into a serializable string");
}

if (stringToByteOp is null)
{
throw new DaprException(
$"A pipeline operation must be specified to convert a {typeof(TInput)} into a byte array");
}
}

/// <summary>
/// Processes the data in the order of the provided list of <see cref="IDaprDataOperation{TInput,TOutput}"/>.
/// </summary>
/// <param name="input">The data to evaluate.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The evaluated data.</returns>
public async Task<DaprOperationPayload<ReadOnlyMemory<byte>>> ProcessAsync(TInput input, CancellationToken cancellationToken = default)
{
//Combines the metadata across each operation to be returned with the result
var combinedMetadata = new Dictionary<string, string>();

//Start by serializing the input to a string
var serializationPayload = await genericToStringOp!.ExecuteAsync(input, cancellationToken);
combinedMetadata.MergeFrom(serializationPayload.Metadata, RegisterOperationInvocation(genericToStringOp.Name));

//Run through any provided string-based operations
var stringPayload = new DaprOperationPayload<string?>(serializationPayload.Payload);
foreach (var strOp in stringOps)
{
stringPayload = await strOp.ExecuteAsync(stringPayload.Payload, cancellationToken);
combinedMetadata.MergeFrom(stringPayload.Metadata, RegisterOperationInvocation(strOp.Name));
}

//Encode the string payload to a byte array
var encodedPayload = await stringToByteOp!.ExecuteAsync(stringPayload.Payload, cancellationToken);
combinedMetadata.MergeFrom(encodedPayload.Metadata, RegisterOperationInvocation(stringToByteOp.Name));

//Run through any provided byte-based operations
var bytePayload = new DaprOperationPayload<ReadOnlyMemory<byte>>(encodedPayload.Payload);
foreach (var byteOp in byteOps)
{
bytePayload = await byteOp.ExecuteAsync(bytePayload.Payload, cancellationToken);
combinedMetadata.MergeFrom(bytePayload.Metadata, RegisterOperationInvocation(byteOp.Name));
}

//Persist the op names to the metadata
combinedMetadata[OperationKey] = string.Join(',', operationNames);

//Create a payload that combines the payload and metadata
var resultPayload = new DaprOperationPayload<ReadOnlyMemory<byte>>(bytePayload.Payload)
{
Metadata = combinedMetadata
};
return resultPayload;
}

/// <summary>
/// Gets the formatted operation name with its zero-based invocation count.
/// </summary>
/// <param name="operationName">The name of the operation.</param>
/// <returns>A string value containing the operation name and its zero-based invocation count.</returns>
private string RegisterOperationInvocation(string operationName)
{
//Add to the operation names
var result = $"{operationName}[{GetAndAddOperationInvocation(operationName)}]";
operationNames.Add(result);

//Return to be used in the metadata key
return result;
}

/// <summary>
/// Registers another operation invocation.
/// </summary>
/// <param name="operationName">The name of the operation.</param>
/// <returns>The zero-based count of the operational invocation.</returns>
private int GetAndAddOperationInvocation(string operationName)
{
if (!operationInvocations.TryGetValue(operationName, out var invocationCount))
operationInvocations[operationName] = 1;
else
operationInvocations[operationName]++;

return invocationCount;
}
}
Loading
Loading