Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge feature/core-entities to dev #2574

Merged
merged 31 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4a5edf8
Merge pull request #2559 from Azure/dev
bachuv Aug 29, 2023
c9ad121
Merge pull request #2562 from Azure/dev
bachuv Aug 31, 2023
c36d3d9
udpate readme.
sebastianburckhardt Sep 7, 2023
10111e0
update durability provider class for new core-entities support. (#2570)
sebastianburckhardt Sep 11, 2023
6ff3e7b
update DurableClient to take advantage of native entity queries (#2571)
sebastianburckhardt Sep 11, 2023
d1d6074
implement passthrough middleware for entities (#2572)
sebastianburckhardt Sep 11, 2023
eb961e0
implement entity queries for grpc listener (#2573)
sebastianburckhardt Sep 11, 2023
e378bfa
Merge pull request #2579 from Azure/dev
jviau Sep 12, 2023
32680a3
Merge pull request #2583 from Azure/dev
jviau Sep 13, 2023
cc7b93a
Various fixes (#2585)
sebastianburckhardt Sep 19, 2023
db60e7f
simplify how entities are excluded from instance queries (#2586)
sebastianburckhardt Sep 19, 2023
2c5a7e5
add an entity example to the DotNetIsolated smoke test project. (#2584)
sebastianburckhardt Sep 21, 2023
ac6e0d2
Entities: Add worker side entity trigger and logic (#2576)
jviau Sep 21, 2023
6e9d615
Merge commit '2455636cca50cfb4d1543633d28657437bbb5173' into feature/…
sebastianburckhardt Sep 21, 2023
6251fca
another small fix that got lost somewhere. (#2596)
sebastianburckhardt Sep 25, 2023
06d0713
Update packages and version for entities preview (#2599)
jviau Sep 26, 2023
5429a27
Switch to Microsoft.DurableTask.Grpc (#2605)
jviau Sep 27, 2023
991c8f0
Fix grpc core (#2616)
jviau Oct 4, 2023
0e26d15
pass entity parameters for task orchestration. (#2611)
sebastianburckhardt Oct 5, 2023
07ecbc8
Core entities/various fixes and updates (#2619)
sebastianburckhardt Oct 5, 2023
62d7049
Update to entities preview 2 (#2620)
jviau Oct 6, 2023
9e311a6
Add callback handler for entity dispatching (#2624)
jviau Oct 6, 2023
c4a89b0
Core entities/propagate changes (#2625)
sebastianburckhardt Oct 9, 2023
565d548
Rev dependencies to entities-preview.2 (#2627)
jviau Oct 9, 2023
cc0d0ed
Call EnsureLegalAccess from EntityFeature in dotnet-isolated (#2633)
jviau Oct 10, 2023
c545e42
create a better error message in situations where client entity funct…
sebastianburckhardt Oct 12, 2023
55daf73
Merge branch 'main' into feature/core-entities
jviau Oct 12, 2023
c9747a1
Rev package versions, update release notes (#2638)
jviau Oct 18, 2023
97112d3
Merge branch 'dev' into feature/core-entities
jviau Oct 18, 2023
f3fa1b2
Address smoke test build issue (#2647)
jviau Oct 18, 2023
7a63ce3
fix translation of legacy query to new entity query support (#2648)
sebastianburckhardt Oct 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
pr:
- main
- dev
- feature/*

jobs:

- job: FunctionsV1Tests
Expand Down
23 changes: 21 additions & 2 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
# Release Notes

## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.1

### New Features

- Support entities for .NET isolated

### Bug Fixes

- Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634)
- Update Microsoft.DurableTask.\* dependencies to v1.0.5
### Breaking Changes

### Dependency Updates

`Microsoft.DurableTask.*` to `1.1.0-preview.1`

## Microsoft.Azure.WebJobs.Extensions.DurableTask v2.12.0-preview.1

### New Features

- Updates to take advantage of new core-entity support

### Bug Fixes

### Breaking Changes

### Dependency Updates

`Microsoft.Azure.DurableTask.Core` to `2.16.0-preview.2`
`Microsoft.Azure.DurableTask.AzureStorage` to `1.16.0-preview.2`

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
Expand All @@ -19,6 +20,7 @@
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -54,8 +56,6 @@ public AzureStorageDurabilityProvider(
this.logger = logger;
}

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
Expand Down Expand Up @@ -98,6 +98,29 @@ public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesW

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;

if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);

if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}

private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor
private readonly AzureStorageOptions azureStorageOptions;
private readonly INameResolver nameResolver;
private readonly ILoggerFactory loggerFactory;
private readonly bool useSeparateQueueForEntityWorkItems;
private readonly bool inConsumption; // If true, optimize defaults for consumption
private AzureStorageDurabilityProvider defaultStorageProvider;

Expand Down Expand Up @@ -56,6 +57,7 @@ public AzureStorageDurabilityProviderFactory(
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;

if (this.inConsumption)
Expand All @@ -71,9 +73,18 @@ public AzureStorageDurabilityProviderFactory(
}
}

WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueueForEntityWorkItems = true;
}

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// Override the configuration defaults with user-provided values in host.json, if any.
Expand Down Expand Up @@ -188,6 +199,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"),
ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval,
Expand All @@ -202,6 +214,9 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
LoggerFactory = this.loggerFactory,
UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement,
UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement,
UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems,
EntityMessageReorderWindowInMinutes = this.options.EntityMessageReorderWindowInMinutes,
MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize,
};

if (this.inConsumption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider(

private class EntityTriggerBinding : ITriggerBinding
{
private static readonly IReadOnlyDictionary<string, object?> EmptyBindingData = new Dictionary<string, object?>(capacity: 0);

private readonly DurableTaskExtension config;
private readonly ParameterInfo parameterInfo;
private readonly FunctionName entityName;
Expand All @@ -75,7 +78,10 @@ public EntityTriggerBinding(
this.BindingDataContract = GetBindingDataContract(parameterInfo);
}

public Type TriggerValueType => typeof(IDurableEntityContext);
// Out-of-proc V2 uses a different trigger value type
public Type TriggerValueType => this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough ?
typeof(RemoteEntityContext) :
typeof(IDurableEntityContext);

public IReadOnlyDictionary<string, Type> BindingDataContract { get; }

Expand All @@ -95,31 +101,52 @@ private static IReadOnlyDictionary<string, Type> GetBindingDataContract(Paramete

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
var entityContext = (DurableEntityContext)value;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
if (value is DurableEntityContext entityContext)
{
convertedValue = entityContext;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
{
convertedValue = entityContext;
#if !FUNCTIONS_V1
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
#endif
}
else if (destinationType == typeof(string))
{
convertedValue = EntityContextToString(entityContext);
}

var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);
var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);

var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;
var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;

var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#if FUNCTIONS_V3_OR_GREATER
else if (value is RemoteEntityContext remoteContext)
{
// Generate a byte array which is the serialized protobuf payload
// https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization
var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest();

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest);
var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string));
var triggerData = new TriggerData(contextValueProvider, EmptyBindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#endif
else
{
throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value));
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
InstanceId = remoteContext.InstanceId,
PastEvents = { remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) },
NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) },
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
};

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.WebApiCompatShim;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -548,6 +550,28 @@ Task<EntityStateResponse<T>> IDurableEntityClient.ReadEntityStateAsync<T>(Entity
}

private async Task<EntityStateResponse<T>> ReadEntityStateAsync<T>(DurabilityProvider provider, EntityId entityId)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
EntityBackendQueries.EntityMetadata? metaData = await entityBackendQueries.GetEntityAsync(
new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey),
includeState: true,
includeStateless: false,
cancellation: default);

return new EntityStateResponse<T>()
{
EntityExists = metaData.HasValue,
EntityState = metaData.HasValue ? this.messageDataConverter.Deserialize<T>(metaData.Value.SerializedState) : default,
};
}
else
{
return await this.ReadEntityStateLegacyAsync<T>(provider, entityId);
}
}

private async Task<EntityStateResponse<T>> ReadEntityStateLegacyAsync<T>(DurabilityProvider provider, EntityId entityId)
{
string entityState = await provider.RetrieveSerializedEntityState(entityId, this.messageDataConverter.JsonSettings);

Expand Down Expand Up @@ -611,6 +635,40 @@ private static EntityQueryResult ConvertToEntityQueryResult(IEnumerable<DurableE

/// <inheritdoc />
async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery query, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = query.EntityName != null ? $"@{query.EntityName.ToLowerInvariant()}@" : null,
IncludeTransient = query.IncludeDeleted,
IncludeState = query.FetchState,
LastModifiedFrom = query.LastOperationFrom == DateTime.MinValue ? (DateTime?)null : (DateTime?)query.LastOperationFrom,
LastModifiedTo = query.LastOperationTo,
PageSize = query.PageSize,
ContinuationToken = query.ContinuationToken,
},
cancellationToken);

return new EntityQueryResult()
{
Entities = result.Results.Select(ConvertEntityMetadata).ToList(),
ContinuationToken = result.ContinuationToken,
};

DurableEntityStatus ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metadata)
{
return new DurableEntityStatus(metadata);
}
}
else
{
return await this.ListEntitiesLegacyAsync(query, cancellationToken);
}
}

private async Task<EntityQueryResult> ListEntitiesLegacyAsync(EntityQuery query, CancellationToken cancellationToken)
{
var condition = new OrchestrationStatusQueryCondition(query);
EntityQueryResult entityResult;
Expand All @@ -633,6 +691,30 @@ async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery

/// <inheritdoc />
async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = removeEmptyEntities,
ReleaseOrphanedLocks = releaseOrphanedLocks,
},
cancellationToken);

return new CleanEntityStorageResult()
{
NumberOfEmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
NumberOfOrphanedLocksRemoved = result.OrphanedLocksReleased,
};
}
else
{
return await this.CleanEntityStorageLegacyAsync(removeEmptyEntities, releaseOrphanedLocks, cancellationToken);
}
}

private async Task<CleanEntityStorageResult> CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
DateTime now = DateTime.UtcNow;
CleanEntityStorageResult finalResult = default;
Expand Down Expand Up @@ -706,6 +788,12 @@ async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, strin
return finalResult;
}

private bool HasNativeEntityQuerySupport(DurabilityProvider provider, out EntityBackendQueries entityBackendQueries)
{
entityBackendQueries = (provider as IEntityOrchestrationService)?.EntityBackendQueries;
return entityBackendQueries != null;
}

private async Task<OrchestrationState> GetOrchestrationInstanceStateAsync(string instanceId)
{
return await GetOrchestrationInstanceStateAsync(this.client, instanceId);
Expand Down
Loading
Loading