Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InputStreamPublisher cannot be cancelled and StreamRef will be blocked #4744

Closed
ClockGet opened this issue Jan 26, 2021 · 4 comments · Fixed by #4788
Closed

InputStreamPublisher cannot be cancelled and StreamRef will be blocked #4744

ClockGet opened this issue Jan 26, 2021 · 4 comments · Fixed by #4788

Comments

@ClockGet
Copy link

ClockGet commented Jan 26, 2021

Akka.Net version: 1.4.14
Akka.Streams version: 1.4.14
platform: win 10, .net core 3.1
gist: https://gist.github.com/ClockGet/a3c114c053caba681eddbec4b3b3c75b

Hello everyone,
Thank you for this great project and I am trying to use this amazing project recently. I found that when an ArgumentException threw by a task in ReadStreamActor, the task in SendStreamActor will be blocked.
What's more, when I referenced the source code of Akka.Stream and found out after debugging, when the ArgumentException is thrown, the InputStreamPublisher can receive an Akka.Streams.Actors.Cancel message but not handle. Then I replace Akka.Streams.Implementation.Cancel with Akka.Streams.Actors.Cancel (With<Cancel>() -> With<Actors.Cancel>()), the task in SendStreamActor can be completed normally. I am not sure if this is a bug or there is a problem with my usage, I look forward to your reply.
Thank you very much.

@ClockGet ClockGet changed the title InputStreamPublisher cannot be cancelled 安定 InputStreamPublisher cannot be cancelled and StreamRef will be blocked Jan 26, 2021
@ClockGet
Copy link
Author

I write a less elegant workaround:

var StreamWatchMonitorProps = Props.Create(() => new StreamMonitor());
var streamMonitorWatchActorRef = system.ActorOf(StreamWatchMonitorProps, "StreamMonitorActor");

...

internal class StreamMonitor : ReceiveActor
{
        public StreamMonitor()
        {
	        Receive<UnhandledMessage>(unhandle => unhandle.Message is Akka.Streams.Actors.Cancel, p =>
		        {
			        p.Recipient.Tell(PoisonPill.Instance);
		        });
        }
        protected override void PreStart()
        {
	        Context.System.EventStream.Subscribe<UnhandledMessage>(Self);
	        base.PreStart();
        }
        protected override void PostStop()
        {
	        Context.System.EventStream.Unsubscribe<UnhandledMessage>(Self);
	        base.PostStop();
        }
}

Now, the task in SendStreamActor can be completed normally.

@Aaronontheweb
Copy link
Member

mind taking a look at this one @IgorFedchenko ?

@IgorFedchenko
Copy link
Contributor

Sure, will do

@IgorFedchenko
Copy link
Contributor

Nice work with debugging this, @ClockGet !

Indeed, while current implementation of SourceRefs is a bit different in scala Akka, but at the moment of our port it was like this:

def receive = {
    case ActorPublisherMessage.Request(elements)  readAndSignal()
    case Continue                                 readAndSignal()
    case ActorPublisherMessage.Cancel             context.stop(self)
  }

That is, Cancel message was from the ActorPublisher piece of context. So we missed this namespace here and in one another place - I have fixed this in my PR and made sure it is working properly now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants