Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

# Claude configuration
.claude/

# User-specific files
*.suo
*.user
Expand Down
15 changes: 13 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
#### 1.5.48 August 14th, 2025 ####
#### 1.5.48 August 21st, 2025 ####

**Placeholder for nightly build**
Akka.NET v1.5.48 is a minor patch containing stability improvement to Akka.TestKit.

* [TestKit: Fix deadlock during parallel test execution](https://github.com/akkadotnet/akka.net/pull/7787)

2 contributors since release 1.5.47

| COMMITS | LOC+ | LOC- | AUTHOR |
|---------|------|------|---------------------|
| 4 | 5494 | 5561 | Aaron Stannard |
| 2 | 204 | 66 | Gregorius Soedharmo |

To [see the full set of changes in Akka.NET v1.5.48, click here](https://github.com/akkadotnet/akka.net/milestone/131?closed=1)

#### 1.5.47 August 12th, 2025 ####

Expand Down
13 changes: 13 additions & 0 deletions build-system/azure-pipeline.mntr-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ jobs:
inputs:
packageType: 'sdk'
useGlobalJson: true

# Set the Incrementalist base branch based on PR target branch
- pwsh: |
if ('$(Build.Reason)' -eq 'PullRequest') {
# Extract branch name from refs/heads/branch format
$targetBranch = '$(System.PullRequest.TargetBranch)'.Replace('refs/heads/', '')
Write-Host "PR detected - using base branch: $targetBranch"
Write-Host "##vso[task.setvariable variable=IncrementalistBaseBranch]$targetBranch"
} else {
Write-Host "Not a PR - using default base branch: dev"
Write-Host "##vso[task.setvariable variable=IncrementalistBaseBranch]dev"
}
displayName: 'Set Incrementalist base branch'

- script: dotnet tool restore
displayName: 'Restore dotnet tools'
Expand Down
13 changes: 13 additions & 0 deletions build-system/azure-pipeline.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ jobs:
inputs:
packageType: 'sdk'
useGlobalJson: true

# Set the Incrementalist base branch based on PR target branch
- pwsh: |
if ('$(Build.Reason)' -eq 'PullRequest') {
# Extract branch name from refs/heads/branch format
$targetBranch = '$(System.PullRequest.TargetBranch)'.Replace('refs/heads/', '')
Write-Host "PR detected - using base branch: $targetBranch"
Write-Host "##vso[task.setvariable variable=IncrementalistBaseBranch]$targetBranch"
} else {
Write-Host "Not a PR - using default base branch: dev"
Write-Host "##vso[task.setvariable variable=IncrementalistBaseBranch]dev"
}
displayName: 'Set Incrementalist base branch'

- script: dotnet tool restore
displayName: 'Restore dotnet tools'
Expand Down
8 changes: 4 additions & 4 deletions build-system/pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
name: "netfx_tests_windows"
displayName: ".NET Framework Unit Tests (Windows)"
vmImage: "windows-latest"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json -- test -c Release --no-build --framework net48 --logger:trx --results-directory TestResults"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json --branch $(IncrementalistBaseBranch) -- test -c Release --no-build --framework net48 --logger:trx --results-directory TestResults"
outputDirectory: "TestResults"
artifactName: "netfx_tests_windows-$(Build.BuildId)"

Expand All @@ -80,7 +80,7 @@ jobs:
name: "net_tests_windows"
displayName: ".NET Unit Tests (Windows)"
vmImage: "windows-latest"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json --branch $(IncrementalistBaseBranch) -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults"
outputDirectory: "TestResults"
artifactName: "net_tests_windows-$(Build.BuildId)"

Expand All @@ -89,7 +89,7 @@ jobs:
name: "net_tests_linux"
displayName: ".NET Unit Tests (Linux)"
vmImage: "ubuntu-latest"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults"
command: "dotnet incrementalist run --config .incrementalist/testsOnly.json --branch $(IncrementalistBaseBranch) -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults"
outputDirectory: "TestResults"
artifactName: "net_tests_linux-$(Build.BuildId)"

Expand All @@ -98,7 +98,7 @@ jobs:
name: "net_mntr_windows"
displayName: ".NET Multi-Node Tests (Windows)"
vmImage: "windows-latest"
command: "dotnet incrementalist run --config .incrementalist/mutliNodeOnly.json -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults/multinode"
command: "dotnet incrementalist run --config .incrementalist/mutliNodeOnly.json --branch $(IncrementalistBaseBranch) -- test -c Release --no-build --framework net8.0 --logger:trx --results-directory TestResults/multinode"
outputDirectory: "TestResults"
artifactName: "net_mntr_windows-$(Build.BuildId)"
mntrFailuresDir: 'TestResults\\multinode'
Expand Down
2 changes: 2 additions & 0 deletions src/contrib/testkits/Akka.TestKit.Xunit/Internals/Loggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
Receive<InitializeLogger>(e =>
{
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
// so it won't interfere with dead letter detection or TestActor message expectations
Sender.Tell(new LoggerInitialized());
});
}
Expand Down
36 changes: 27 additions & 9 deletions src/contrib/testkits/Akka.TestKit.Xunit/TestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -170,22 +171,39 @@ protected void InitializeLogger(ActorSystem system)
if (Output == null)
return;

var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
.ConfigureAwait(false).GetAwaiter().GetResult();
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(Output)),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}

protected void InitializeLogger(ActorSystem system, string prefix)
{
if (Output == null)
return;

var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout)
.ConfigureAwait(false).GetAwaiter().GetResult();
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions src/contrib/testkits/Akka.TestKit.Xunit2/Internals/Loggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public TestOutputLogger(ITestOutputHelper output)
Receive<InitializeLogger>(e =>
{
e.LoggingBus.Subscribe(Self, typeof (LogEvent));
// Send response to maintain protocol - LoggerInitialized implements IDeadLetterSuppression
// so it won't interfere with dead letter detection or TestActor message expectations
Sender.Tell(new LoggerInitialized());
});
}
Expand Down
40 changes: 25 additions & 15 deletions src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -140,30 +141,39 @@ protected void InitializeLogger(ActorSystem system)
{
if (Output != null)
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
// Start the logger initialization task but don't wait for it yet
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// By the time TestActor is ready (which happens in base constructor),
// the logger is likely ready too. Now we can safely wait.
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
// Create logger actor synchronously to avoid deadlock during parallel test execution
// Use AttachChildWithAsync with isAsync:false to create LocalActorRef instead of RepointableActorRef
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(Output)),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}
}

protected void InitializeLogger(ActorSystem system, string prefix)
{
if (Output != null)
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))), "log-test");
// Start the logger initialization task but don't wait for it yet
var loggerTask = logger.Ask<LoggerInitialized>(new InitializeLogger(system.EventStream), TestKitSettings.TestKitStartupTimeout);
var systemImpl = system as ActorSystemImpl ?? throw new InvalidOperationException("Expected ActorSystemImpl");

// Create logger actor synchronously to avoid deadlock during parallel test execution
var logger = systemImpl.Provider.SystemGuardian.Cell.AttachChildWithAsync(
Props.Create(() => new TestOutputLogger(
string.IsNullOrEmpty(prefix) ? Output : new PrefixedOutput(Output, prefix))),
isSystemService: true, // Mark as system service
isAsync: false, // Create synchronously to avoid deadlock
name: "log-test");

// By the time TestActor is ready (which happens in base constructor),
// the logger is likely ready too. Now we can safely wait.
loggerTask.ConfigureAwait(false).GetAwaiter().GetResult();
// Send the initialization message without waiting for response to avoid deadlock
// The logger will subscribe to the event stream when it processes this message
logger.Tell(new InitializeLogger(system.EventStream), ActorRefs.NoSender);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Streams.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.TestKit.Xunit2")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
Expand Down
55 changes: 55 additions & 0 deletions src/core/Akka.Streams.Tests/Issue7794Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// -----------------------------------------------------------------------
// <copyright file="Issue7794Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Implementation;

public class Issue7794Spec: AkkaSpec
{
private ActorMaterializer Materializer { get; }

public Issue7794Spec(ITestOutputHelper helper) : base(helper)
{
Materializer = Sys.Materializer();
}

[Fact(DisplayName = "ChannelSource should not throw NRE when Channel completes")]
public async Task Issue_7794_ChannelSource_NRE()
{
var channel = Channel.CreateBounded<Message<string, string>>(new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});

var streamRes = ChannelSource.FromReader(channel.Reader)
.Select(e => e)
.RunWith(Sink.Ignore<Message<string, string>>(), Materializer);

_ = Task.Run(async () =>
{
await Task.Delay(100);
channel.Writer.Complete();
});

await streamRes;
}

private class Message<TKey, TValue>
{
public TKey Key { get; set; }
public TValue Value { get; set; }
}
}
17 changes: 12 additions & 5 deletions src/core/Akka.Streams/Stage/GraphStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -885,11 +885,18 @@ public ConcurrentAsyncCallback(Action<T> handler, GraphStageLogic ownedStage)
_ownedStage = ownedStage;
_wrappedHandler = obj =>
{
if (obj is T e)
handler1(e);
else
throw new ArgumentException(
$"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
switch (obj)
{
// Always assume that T can be null and the handler will handle null values
case null:
handler1(default);
break;
case T e:
handler1(e);
break;
default:
throw new ArgumentException($"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
}
};
}

Expand Down
Loading
Loading