Skip to content

Batch function Write #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -67,40 +67,36 @@ public Task<bool> SucceedFunction(
byte[]? result,
long timestamp,
int expectedEpoch,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
IReadOnlyList<StoredEffectChange>? effects,
ComplimentaryState complimentaryState
) => _inner.SucceedFunction(storedId, result, timestamp, expectedEpoch, effects, messages, complimentaryState);
) => _inner.SucceedFunction(storedId, result, timestamp, expectedEpoch, effects, complimentaryState);

public Task<bool> PostponeFunction(
StoredId storedId,
long postponeUntil,
long timestamp,
bool ignoreInterrupted,
int expectedEpoch,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
IReadOnlyList<StoredEffectChange>? effects,
ComplimentaryState complimentaryState
) => _inner.PostponeFunction(storedId, postponeUntil, timestamp, ignoreInterrupted, expectedEpoch, effects, messages, complimentaryState);
) => _inner.PostponeFunction(storedId, postponeUntil, timestamp, ignoreInterrupted, expectedEpoch, effects, complimentaryState);

public Task<bool> FailFunction(
StoredId storedId,
StoredException storedException,
long timestamp,
int expectedEpoch,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
IReadOnlyList<StoredEffectChange>? effects,
ComplimentaryState complimentaryState
) => _inner.FailFunction(storedId, storedException, timestamp, expectedEpoch, effects, messages, complimentaryState);
) => _inner.FailFunction(storedId, storedException, timestamp, expectedEpoch, effects, complimentaryState);

public Task<bool> SuspendFunction(
StoredId storedId,
long timestamp,
int expectedEpoch,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
IReadOnlyList<StoredEffectChange>? effects,
ComplimentaryState complimentaryState
) => _inner.SuspendFunction(storedId, timestamp, expectedEpoch, effects, messages, complimentaryState);
) => _inner.SuspendFunction(storedId, timestamp, expectedEpoch, effects, complimentaryState);

public Task<bool> Interrupt(StoredId storedId, bool onlyIfExecuting)
=> _inner.Interrupt(storedId, onlyIfExecuting);
Original file line number Diff line number Diff line change
@@ -166,14 +166,6 @@ public override Task ExistingStateCanBeReplacedRemovedAndAdded()
public override Task SaveChangesPersistsChangedResult()
=> SaveChangesPersistsChangedResult(Utils.CreateInMemoryFunctionStoreTask());

[TestMethod]
public override Task ExistingTimeoutCanBeUpdatedForAction()
=> ExistingTimeoutCanBeUpdatedForAction(Utils.CreateInMemoryFunctionStoreTask());

[TestMethod]
public override Task ExistingTimeoutCanBeUpdatedForFunc()
=> ExistingTimeoutCanBeUpdatedForFunc(Utils.CreateInMemoryFunctionStoreTask());

[TestMethod]
public override Task CorrelationsCanBeChanged()
=> CorrelationsCanBeChanged(FunctionStoreFactory.Create());
Original file line number Diff line number Diff line change
@@ -111,7 +111,6 @@ await store.PostponeFunction(
ignoreInterrupted: true,
expectedEpoch: 0,
effects: null,
messages: null,
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
).ShouldBeTrueAsync();

@@ -151,7 +150,6 @@ await store.PostponeFunction(
ignoreInterrupted: true,
expectedEpoch: 0,
effects: null,
messages: null,
new ComplimentaryState(storedParameter.ToFunc(), LeaseLength: 0)
);

Original file line number Diff line number Diff line change
@@ -34,6 +34,14 @@ public override Task TaskWhenAnyFuncTest()
public override Task TaskWhenAllFuncTest()
=> TaskWhenAllFuncTest(FunctionStoreFactory.Create());

[TestMethod]
public override Task TaskWhenAnyPostponeTest()
=> TaskWhenAnyPostponeTest(FunctionStoreFactory.Create());

[TestMethod]
public override Task TaskWhenAllPostponeTest()
=> TaskWhenAllPostponeTest(FunctionStoreFactory.Create());

[TestMethod]
public override Task ClearEffectsTest()
=> ClearEffectsTest(FunctionStoreFactory.Create());
@@ -42,6 +50,10 @@ public override Task ClearEffectsTest()
public override Task EffectsCrudTest()
=> EffectsCrudTest(FunctionStoreFactory.Create());

[TestMethod]
public override Task EffectsCreateOrGetWithoutFlushTest()
=> EffectsCreateOrGetWithoutFlushTest(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue()
=> ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(FunctionStoreFactory.Create());
Original file line number Diff line number Diff line change
@@ -91,4 +91,20 @@ public override Task ParamlessCanBeCreatedWithInitialFailedEffect()
[TestMethod]
public override Task FunctionCanAcceptAndReturnOptionType()
=> FunctionCanAcceptAndReturnOptionType(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingEffectChangesArePersistedWithSucceedFunctionResult()
=> PendingEffectChangesArePersistedWithSucceedFunctionResult(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingEffectChangesArePersistedWithPostponedFunctionResult()
=> PendingEffectChangesArePersistedWithPostponedFunctionResult(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingEffectChangesArePersistedWithSuspendedFunctionResult()
=> PendingEffectChangesArePersistedWithSuspendedFunctionResult(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingEffectChangesArePersistedWithFailedFunctionResult()
=> PendingEffectChangesArePersistedWithFailedFunctionResult(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
@@ -14,18 +14,6 @@ public override Task ExpiredTimeoutIsAddedToMessages()
public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
=> ExpiredTimeoutMakesReactiveChainThrowTimeoutException(FunctionStoreFactory.Create());

[TestMethod]
public override Task RegisteredTimeoutIsCancelledAfterReactiveChainCompletes()
=> RegisteredTimeoutIsCancelledAfterReactiveChainCompletes(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeRemovedFromControlPanel()
=> PendingTimeoutCanBeRemovedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeUpdatedFromControlPanel()
=> PendingTimeoutCanBeUpdatedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredImplicitTimeoutsAreAddedToMessages()
=> ExpiredImplicitTimeoutsAreAddedToMessages(FunctionStoreFactory.Create());
Original file line number Diff line number Diff line change
@@ -184,7 +184,6 @@ await store.PostponeFunction(
ignoreInterrupted: true,
expectedEpoch: 0,
effects: null,
messages: null,
new ComplimentaryState(() => storedParameter.ToUtf8Bytes(), LeaseLength: 0)
).ShouldBeTrueAsync();

20 changes: 20 additions & 0 deletions Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs
Original file line number Diff line number Diff line change
@@ -227,4 +227,24 @@ public override Task RestartExecutionReturnsEffectsAndMessages()
[TestMethod]
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());

[TestMethod]
public override Task EffectsArePersistedOnSuspendFunction()
=> EffectsArePersistedOnSuspendFunction(FunctionStoreFactory.Create());

[TestMethod]
public override Task EffectsArePersistedOnSucceededFunction()
=> EffectsArePersistedOnSucceededFunction(FunctionStoreFactory.Create());

[TestMethod]
public override Task EffectsArePersistedOnPostponeFunction()
=> EffectsArePersistedOnPostponeFunction(FunctionStoreFactory.Create());

[TestMethod]
public override Task EffectsArePersistedOnFailFunction()
=> EffectsArePersistedOnFailFunction(FunctionStoreFactory.Create());

[TestMethod]
public override Task AppendMessageNoStatusAndInterruptWorks()
=> AppendMessageNoStatusAndInterruptWorks(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;
@@ -20,27 +19,11 @@ public override Task ExistingTimeoutCanUpdatedSuccessfully()
public override Task OverwriteFalseDoesNotAffectExistingTimeout()
=> OverwriteFalseDoesNotAffectExistingTimeout(FunctionStoreFactory.Create().SelectAsync(fs => fs.TimeoutStore));

[TestMethod]
public override Task RegisteredTimeoutIsReturnedFromRegisteredTimeouts()
=> RegisteredTimeoutIsReturnedFromRegisteredTimeouts(FunctionStoreFactory.Create());

[TestMethod]
public override Task TimeoutStoreCanBeInitializedMultipleTimes()
=> TimeoutStoreCanBeInitializedMultipleTimes(FunctionStoreFactory.Create().SelectAsync(fs => fs.TimeoutStore));

[TestMethod]
public override Task RegisteredTimeoutIsReturnedFromRegisteredTimeoutsForFunctionId()
=> RegisteredTimeoutIsReturnedFromRegisteredTimeoutsForFunctionId(FunctionStoreFactory.Create());

[TestMethod]
public override Task TimeoutIsNotRegisteredAgainWhenProviderAlreadyContainsTimeout()
=> TimeoutIsNotRegisteredAgainWhenProviderAlreadyContainsTimeout(FunctionStoreFactory.Create());


[TestMethod]
public override Task TimeoutsForDifferentTypesCanBeCreatedFetchedSuccessfully()
=> TimeoutsForDifferentTypesCanBeCreatedFetchedSuccessfully(FunctionStoreFactory.Create().SelectAsync(fs => fs.TimeoutStore));

[TestMethod]
public override Task CancellingNonExistingTimeoutDoesResultInIO()
=> CancellingNonExistingTimeoutDoesResultInIO(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,10 @@ public override Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst()
public override Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond()
=> MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessagesFirstOfTypesReturnsNoneForTimeout()
=> MessagesFirstOfTypesReturnsNoneForTimeout(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExistingEventsShouldBeSameAsAllAfterEmit()
=> ExistingEventsShouldBeSameAsAllAfterEmit(FunctionStoreFactory.Create());
Original file line number Diff line number Diff line change
@@ -31,21 +31,16 @@ await functionStore.CreateFunction(
);
var eventSerializer = new EventSerializer();
var messagesWriter = new MessageWriter(storedId, functionStore, eventSerializer, scheduleReInvocation: (_, _) => Task.CompletedTask);
var lazyExistingEffects = new Lazy<Task<IReadOnlyList<StoredEffect>>>(() => Task.FromResult((IReadOnlyList<StoredEffect>) new List<StoredEffect>()));
var effectResults = new EffectResults(flowId, storedId, lazyExistingEffects, functionStore.EffectsStore, DefaultSerializer.Instance);
var effect = new Effect(effectResults);
var registeredTimeouts = new RegisteredTimeouts(storedId, functionStore.TimeoutStore, effect);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
storedId,
defaultDelay: TimeSpan.FromSeconds(1),
defaultMaxWait: TimeSpan.Zero,
isWorkflowRunning: () => true,
functionStore,
eventSerializer,
registeredTimeouts,
initialMessages: null
);
var messages = new Messages(messagesWriter, registeredTimeouts, messagesPullerAndEmitter);
var messages = new Messages(messagesWriter, messagesPullerAndEmitter);

await messages.AppendMessage("hello world");

Loading