Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5406,6 +5406,7 @@ namespace Akka.Util
public AtomicReference() { }
public T Value { get; set; }
public bool CompareAndSet(T expected, T newValue) { }
public T CompareExchange(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
public static T op_Implicit(Akka.Util.AtomicReference<T> atomicReference) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5415,6 +5415,7 @@ namespace Akka.Util
public AtomicReference() { }
public T Value { get; set; }
public bool CompareAndSet(T expected, T newValue) { }
public T CompareExchange(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
public static T op_Implicit(Akka.Util.AtomicReference<T> atomicReference) { }
}
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowIdleInjectSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public async Task KeepAlive_must_work_if_timer_fires_before_initial_request_afte
.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));
var received = downstream.ExpectNextN(10);
received.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 10));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the ordering deterministics


await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await downstream.RequestAsync(1);
Expand Down Expand Up @@ -181,7 +182,8 @@ public async Task KeepAlive_must_prefer_upstream_element_over_injected_after_bus
.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));
var received = downstream.ExpectNextN(10);
received.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 10));

await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await upstream.SendNextAsync(1);
Expand Down
47 changes: 30 additions & 17 deletions src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,31 +1242,44 @@ public Logic(SubSource<T> stage) : base(stage.Shape)

private void SetCallback(Action<IActorSubscriberMessage> callback)
{
var status = _stage._status.Value;

if (status == null)
// Single atomic operation that both attempts the change AND returns the previous value
var previous = _stage._status.CompareExchange(null, callback);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real fix - only read the value once, as the result of the CompareExchange


switch (previous)
{
if (!_stage._status.CompareAndSet(null, callback))
SetCallback(callback);
case null:
return; // Success - we set the callback, previous was null
// CompareExchange failed - handle the different states based on what was actually there
case OnComplete:
CompleteStage();
break;
case OnError error:
FailStage(error.Cause);
break;
case Action<IActorSubscriberMessage>:
throw new IllegalStateException("Substream Source cannot be materialized more than once");
default:
// Unexpected state - should not happen but be safe
throw new IllegalStateException($"Substream Source cannot be materialized more than once - found [{previous}]");
}
else if (status is OnComplete)
CompleteStage();
else if (status is OnError error)
FailStage(error.Cause);
else if (status is Action<IActorSubscriberMessage>)
throw new IllegalStateException("Substream Source cannot be materialized more than once");
}

public override void PreStart()
{
var ourOwnCallback = GetAsyncCallback<IActorSubscriberMessage>(msg =>
{
if (msg is OnComplete)
CompleteStage();
else if (msg is OnError error)
FailStage(error.Cause);
else if (msg is OnNext next)
Push(_stage._out, (T) next.Element);
switch (msg)
{
case OnComplete:
CompleteStage();
break;
case OnError error:
FailStage(error.Cause);
break;
case OnNext next:
Push(_stage._out, (T) next.Element);
break;
}
});
SetCallback(ourOwnCallback);
}
Expand Down
17 changes: 16 additions & 1 deletion src/core/Akka/Util/AtomicReference.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,27 @@ public T Value
/// <param name="expected">The value expected to be referenced currently.</param>
/// <param name="newValue">The new value to reference if the current matches the expected value.</param>
/// <returns><c>true</c> if <paramref name="newValue"/> was set</returns>
/// <remarks>
/// WARNING: if you need to know the previous value, use <see cref="CompareExchange(T,T)"/> instead.
/// </remarks>
public bool CompareAndSet(T expected, T newValue)
{
var previous = Interlocked.CompareExchange(ref atomicValue, newValue, expected);
var previous = CompareExchange(expected, newValue);
return ReferenceEquals(previous, expected);
}

/// <summary>
/// Atomically compares the current value with <paramref name="expected"/> and, if they are equal,
/// replaces the current value with <paramref name="newValue"/>.
/// </summary>
/// <param name="expected">The value expected to be referenced currently.</param>
/// <param name="newValue">The new value to reference if the current matches the expected value.</param>
/// <returns>The original value that was in the atomic reference before the operation.</returns>
public T CompareExchange(T expected, T newValue)
{
return Interlocked.CompareExchange(ref atomicValue, newValue, expected);
}

/// <summary>
/// Atomically sets the <see cref="Value"/> to <paramref name="newValue"/> and returns the old <see cref="Value"/>.
/// </summary>
Expand Down
Loading