From 4f38f738d62bb06b5a833c17f80b94609adf7a92 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 5 Jun 2017 16:56:37 +0700 Subject: [PATCH 1/9] Initial scala => C# port --- .../Akka.Streams.Tests/IO/FileSinkSpec.cs | 18 +++++++++++++++++ src/core/Akka.Streams/Implementation/Sinks.cs | 20 ++++++++++++++----- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 34c26686f0e..b3557781001 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -18,6 +18,7 @@ using Akka.Streams.IO; using Akka.Streams.TestKit.Tests; using Akka.TestKit; +using Akka.Util; using Akka.Util.Internal; using FluentAssertions; using Xunit; @@ -209,6 +210,23 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att }, _materializer); } + // Needed help converting this test case + [Fact] + public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sink() + { + this.AssertAllStagesStopped(() => { + TargetFile(f => { + var completion = Source.From(_testByteStrings); + completion.RunWith(Sink.LazySink>( + b => Task.FromResult(FileIO.ToFile(f)), + () => Task.FromResult(new IOResult(0, new Result(NotUsed.Instance))) + ), _materializer); + completion.MapMaterializedValue(); + + }); + }, _materializer); + } + private static void TargetFile(Action block, bool create = true) { var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), "synchronous-file-sink.tmp")); diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index 0c0dc4bcebd..d02c995ee8e 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -908,6 +908,7 @@ private sealed class Logic : InGraphStageLogic private readonly LazySink _stage; private readonly TaskCompletionSource _completion; private readonly Lazy _decider; + private SubSource _sourceOut; public Logic(LazySink stage, Attributes inheritedAttributes, TaskCompletionSource completion) : base(stage.Shape) @@ -938,6 +939,15 @@ public override void OnPush() _stage._sinkFactory(element) .ContinueWith(t => callback(Result.FromTask(t)), TaskContinuationOptions.ExecuteSynchronously); + SetHandler(_stage.In, new LambdaInHandler( + onPush: () => { }, + onUpstreamFinish: () => + { + SetKeepGoing(true); + _sourceOut.Completed = true; + }, + onUpstreamFailure: Failure + )); } catch (Exception ex) { @@ -973,8 +983,8 @@ private void Failure(Exception ex) private void InitInternalSource(Sink sink, TIn firstElement) { - var sourceOut = new SubSource(this, firstElement); - _completion.TrySetResult(Source.FromGraph(sourceOut.Source) + _sourceOut = new SubSource(this, firstElement); + _completion.TrySetResult(Source.FromGraph(_sourceOut.Source) .RunWith(sink, Interpreter.SubFusingMaterializer)); } @@ -984,7 +994,7 @@ private sealed class SubSource : SubSourceOutlet { private readonly Logic _logic; private readonly LazySink _stage; - private bool _completed; + internal bool Completed; public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") { @@ -994,7 +1004,7 @@ public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") SetHandler(new LambdaOutHandler(onPull: () => { Push(firstElement); - if (_completed) + if (Completed) SourceComplete(); else SwitchToFinalHandler(); @@ -1005,7 +1015,7 @@ public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") onUpstreamFinish: () => { logic.SetKeepGoing(true); - _completed = true; + Completed = true; }, onUpstreamFailure: SourceFailure)); } From f8262d69c12ab2bf5768a78c63d2dd818b39e5ac Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 7 Jun 2017 00:22:03 +0700 Subject: [PATCH 2/9] Move completed variable to logic class and onUpstreamFinish event handling logic to GotCompletionEvent function --- src/core/Akka.Streams/Implementation/Sinks.cs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index d02c995ee8e..4a149097076 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -908,7 +908,8 @@ private sealed class Logic : InGraphStageLogic private readonly LazySink _stage; private readonly TaskCompletionSource _completion; private readonly Lazy _decider; - private SubSource _sourceOut; + + internal bool Completed; public Logic(LazySink stage, Attributes inheritedAttributes, TaskCompletionSource completion) : base(stage.Shape) @@ -941,11 +942,7 @@ public override void OnPush() TaskContinuationOptions.ExecuteSynchronously); SetHandler(_stage.In, new LambdaInHandler( onPush: () => { }, - onUpstreamFinish: () => - { - SetKeepGoing(true); - _sourceOut.Completed = true; - }, + onUpstreamFinish: GotCompletionEvent, onUpstreamFailure: Failure )); } @@ -973,6 +970,12 @@ public override void OnUpstreamFinish() public override void OnUpstreamFailure(Exception e) => Failure(e); + private void GotCompletionEvent() + { + SetKeepGoing(true); + Completed = true; + } + public override void PreStart() => Pull(_stage.In); private void Failure(Exception ex) @@ -983,8 +986,8 @@ private void Failure(Exception ex) private void InitInternalSource(Sink sink, TIn firstElement) { - _sourceOut = new SubSource(this, firstElement); - _completion.TrySetResult(Source.FromGraph(_sourceOut.Source) + var sourceOut = new SubSource(this, firstElement); + _completion.TrySetResult(Source.FromGraph(sourceOut.Source) .RunWith(sink, Interpreter.SubFusingMaterializer)); } @@ -994,7 +997,6 @@ private sealed class SubSource : SubSourceOutlet { private readonly Logic _logic; private readonly LazySink _stage; - internal bool Completed; public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") { @@ -1004,7 +1006,7 @@ public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") SetHandler(new LambdaOutHandler(onPull: () => { Push(firstElement); - if (Completed) + if (_logic.Completed) SourceComplete(); else SwitchToFinalHandler(); @@ -1012,11 +1014,7 @@ public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") logic.SetHandler(_stage.In, new LambdaInHandler( onPush: () => Push(logic.Grab(_stage.In)), - onUpstreamFinish: () => - { - logic.SetKeepGoing(true); - Completed = true; - }, + onUpstreamFinish: logic.GotCompletionEvent, onUpstreamFailure: SourceFailure)); } From 7d61cb87a2028fb6b66024cf105b1867300f3a37 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 7 Jun 2017 01:14:08 +0700 Subject: [PATCH 3/9] Port #22647 Add try..catch block to LazySink --- src/core/Akka.Streams/Implementation/Sinks.cs | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index 4a149097076..46a6ab5976a 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -909,7 +909,7 @@ private sealed class Logic : InGraphStageLogic private readonly TaskCompletionSource _completion; private readonly Lazy _decider; - internal bool Completed; + private bool _completed; public Logic(LazySink stage, Attributes inheritedAttributes, TaskCompletionSource completion) : base(stage.Shape) @@ -973,7 +973,7 @@ public override void OnUpstreamFinish() private void GotCompletionEvent() { SetKeepGoing(true); - Completed = true; + _completed = true; } public override void PreStart() => Pull(_stage.In); @@ -987,8 +987,21 @@ private void Failure(Exception ex) private void InitInternalSource(Sink sink, TIn firstElement) { var sourceOut = new SubSource(this, firstElement); - _completion.TrySetResult(Source.FromGraph(sourceOut.Source) - .RunWith(sink, Interpreter.SubFusingMaterializer)); + + try { + var matVal = Source.FromGraph(sourceOut.Source) + .RunWith(sink, Interpreter.SubFusingMaterializer); + _completion.TrySetResult(matVal); + } catch (Exception ex) { + /* + case NonFatal(ex) => + promise.tryFailure(ex) + failStage(ex) + */ + _completion.TrySetException(ex); + FailStage(ex); + } + } #region SubSource @@ -1006,7 +1019,7 @@ public SubSource(Logic logic, TIn firstElement) : base(logic, "LazySink") SetHandler(new LambdaOutHandler(onPull: () => { Push(firstElement); - if (_logic.Completed) + if (_logic._completed) SourceComplete(); else SwitchToFinalHandler(); From dd2e3bd7b3fbf187e771250c3750324389934b9e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 7 Jun 2017 04:09:49 +0700 Subject: [PATCH 4/9] Port https://github.com/akka/akka/pull/22647 --- .../Dsl/FlowFlattenMergeSpec.cs | 52 +++++++++++++++++ .../Dsl/FlowRecoverWithSpec.cs | 55 ++++++++++++++++++ .../Akka.Streams.Tests/Dsl/LazySinkSpec.cs | 56 +++++++++++++++++++ .../Akka.Streams.Tests/Dsl/LazySourceSpec.cs | 54 ++++++++++++++++++ .../Akka.Streams/Implementation/Sources.cs | 4 +- 5 files changed, 220 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs index 2586d8e053a..c0ee9ebd322 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs @@ -289,5 +289,57 @@ public void A_FlattenMerge_must_propagate_attributes_to_inner_stream() }, Materializer); } + + [Fact] + public void A_FlattenMerge_must_bubble_up_substream_materialization_exception() + { + this.AssertAllStagesStopped(() => { + var matFail = new TestException("fail!"); + + var task = Source.Single("whatever") + .MergeMany(4, x => Source.FromGraph(new FailingInnerMat(matFail))) + .RunWith(Sink.Ignore(), Materializer); + + try + { + task.Wait(TimeSpan.FromSeconds(1)); + } + catch (AggregateException) { } + + task.IsFaulted.ShouldBe(true); + task.Exception.ShouldNotBe(null); + task.Exception.InnerException.ShouldBeEquivalentTo(matFail); + + }, Materializer); + } + + private sealed class FailingInnerMat : GraphStage> + { + #region Logic + private sealed class FailingLogic : GraphStageLogic + { + public FailingLogic(Shape shape, TestException ex) : base(shape) + { + throw ex; + } + } + #endregion + + public FailingInnerMat(TestException ex) + { + var outlet = new Outlet("out"); + Shape = new SourceShape(outlet); + _ex = ex; + } + + private readonly TestException _ex; + + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new FailingLogic(Shape, _ex); + } + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs index d90fe716424..83dd284aee5 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs @@ -8,6 +8,7 @@ using System; using System.Linq; using Akka.Streams.Dsl; +using Akka.Streams.Stage; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; using Akka.TestKit; @@ -278,5 +279,59 @@ public void A_RecoverWith_must_throw_ArgumentException_if_number_of_retries_is_l .ShouldThrow(); }, Materializer); } + + [Fact] + public void A_RecoverWith_must_fail_correctly_when_materialization_of_recover_source_fails() + { + this.AssertAllStagesStopped(() => + { + var matFail = new TestException("fail!"); + + var task = Source.Failed(new TestException("trigger")) + .RecoverWithRetries(ex => Source.FromGraph(new FailingInnerMat(matFail)), 1) + .RunWith(Sink.Ignore(), Materializer); + + try + { + task.Wait(TimeSpan.FromSeconds(1)); + } + catch (AggregateException) { } + + task.IsFaulted.ShouldBe(true); + task.Exception.ShouldNotBe(null); + task.Exception.InnerException.ShouldBeEquivalentTo(matFail); + + }, Materializer); + } + + private sealed class FailingInnerMat : GraphStage> + { + #region Logic + private sealed class FailingLogic : GraphStageLogic + { + public FailingLogic(Shape shape, TestException ex) : base(shape) + { + throw ex; + } + } + #endregion + + public FailingInnerMat(TestException ex) + { + var outlet = new Outlet("out"); + Shape = new SourceShape(outlet); + _ex = ex; + } + + private readonly TestException _ex; + + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new FailingLogic(Shape, _ex); + } + } + } } diff --git a/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs index 58f38f8a900..0462a39b494 100644 --- a/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs @@ -9,6 +9,7 @@ using System.Linq; using System.Threading.Tasks; using Akka.Streams.Dsl; +using Akka.Streams.Stage; using Akka.Streams.Supervision; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; @@ -238,5 +239,60 @@ public void A_LazySink_must_fail_task_when_zero_throws_expception() taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).ShouldThrow(); }, Materializer); } + + [Fact] + public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails() + { + this.AssertAllStagesStopped(() => + { + var matFail = new TestException("fail!"); + + var task = Source.Single("whatever") + .RunWith(Sink.LazySink( + str => Task.FromResult(Sink.FromGraph(new FailingInnerMat(matFail))), + () => NotUsed.Instance), Materializer); + + try + { + task.Wait(TimeSpan.FromSeconds(1)); + } + catch (AggregateException) { } + + task.IsFaulted.ShouldBe(true); + task.Exception.ShouldNotBe(null); + task.Exception.InnerException.ShouldBeEquivalentTo(matFail); + + }, Materializer); + } + + private sealed class FailingInnerMat : GraphStage> + { + #region Logic + private sealed class FailingLogic : GraphStageLogic + { + public FailingLogic(Shape shape, TestException ex) : base(shape) + { + throw ex; + } + } + #endregion + + public FailingInnerMat(TestException ex) + { + var inlet = new Inlet("in"); + Shape = new SinkShape(inlet); + _ex = ex; + } + + private readonly TestException _ex; + + public override SinkShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new FailingLogic(Shape, _ex); + } + } + } } diff --git a/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs index 000e89bac44..6a4094fec88 100644 --- a/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Collections.Immutable; using System.Linq; using Akka.Streams.Dsl; @@ -152,6 +153,59 @@ public void A_lazy_source_must_propagate_attributes_to_inner_stream() }, Materializer); } + [Fact] + public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails() + { + this.AssertAllStagesStopped(() => + { + var matFail = new TestException("fail!"); + + var task = Source.Lazily(() => Source.FromGraph(new FailingInnerMat(matFail))) + .To(Sink.Ignore()) + .Run(Materializer); + + try + { + task.Wait(TimeSpan.FromSeconds(1)); + } + catch (AggregateException) {} + + task.IsFaulted.ShouldBe(true); + task.Exception.ShouldNotBe(null); + task.Exception.InnerException.ShouldBeEquivalentTo(matFail); + + }, Materializer); + } + + private sealed class FailingInnerMat : GraphStage> + { + #region Logic + private sealed class FailingLogic : GraphStageLogic + { + public FailingLogic(Shape shape, TestException ex) : base(shape) + { + throw ex; + } + } + #endregion + + public FailingInnerMat(TestException ex) + { + var outlet = new Outlet("out"); + Shape = new SourceShape(outlet); + _ex = ex; + } + + private readonly TestException _ex; + + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new FailingLogic(Shape, _ex); + } + } + private sealed class AttibutesSourceStage : GraphStage> { #region Logic diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index 83941d63587..a07f4a66482 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -798,7 +798,9 @@ public override void OnPull() } catch (Exception e) { - _completion.SetException(e); + subSink.Cancel(); + FailStage(e); + _completion.TrySetException(e); } } From 0639b775763e965212bc280fcdc4e7561987c181 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 8 Jun 2017 14:33:34 +0700 Subject: [PATCH 5/9] Port of https://github.com/akka/akka/pull/22657 --- .../Akka.Streams.Tests/IO/FileSinkSpec.cs | 47 ++++++++++++---- .../Akka.Streams.Tests/IO/FileSourceSpec.cs | 53 ++++++++++++++++++- src/core/Akka.Streams/Dsl/FileIO.cs | 15 +++--- .../Implementation/IO/FilePublisher.cs | 16 ++++-- .../Implementation/IO/FileSubscriber.cs | 17 ++++-- .../Akka.Streams/Implementation/IO/IOSinks.cs | 11 ++-- .../Implementation/IO/IOSources.cs | 15 ++++-- 7 files changed, 141 insertions(+), 33 deletions(-) diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index b3557781001..375221c85e2 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -149,6 +149,44 @@ public void SynchronousFileSink_should_allow_appending_to_file() }, _materializer); } + [Fact] + public void SynchronousFileSink_should_allow_writing_from_specific_position_to_the_file() + { + TargetFile(f => { + var testLinesCommon = new List + { + new string('a', 1000) + "\n", + new string('b', 1000) + "\n", + new string('c', 1000) + "\n", + new string('d', 1000) + "\n", + }; + + var commonByteString = ByteString.FromString(testLinesCommon.Join("")).Compact(); + var startPosition = commonByteString.Count; + + var testLinesPart2 = new List() + { + new string('x', 1000) + "\n", + new string('x', 1000) + "\n", + }; + + Func, long, Task> write = (lines, pos) => Source.From(lines) + .Select(ByteString.FromString) + .RunWith(FileIO.ToFile(f, fileMode:FileMode.Create, startPosition:pos), _materializer); + + var completion1 = write(_testLines, 0); + completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + var result1 = completion1.Result; + + var completion2 = write(testLinesPart2, startPosition); + completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + var result2 = completion2.Result; + + f.Length.ShouldBe(startPosition + result2.Count); + CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join("")); + }); + } + [Fact] public void SynchronousFileSink_should_use_dedicated_blocking_io_dispatcher_by_default() { @@ -215,15 +253,6 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sink() { this.AssertAllStagesStopped(() => { - TargetFile(f => { - var completion = Source.From(_testByteStrings); - completion.RunWith(Sink.LazySink>( - b => Task.FromResult(FileIO.ToFile(f)), - () => Task.FromResult(new IOResult(0, new Result(NotUsed.Instance))) - ), _materializer); - completion.MapMaterializedValue(); - - }); }, _materializer); } diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index 97fd3bf00dd..95655b33285 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -4,7 +4,7 @@ // Copyright (C) 2013-2016 Akka.NET project // //----------------------------------------------------------------------- -#if AKKAIO +//#if AKKAIO using System; using System.Collections.Generic; using System.IO; @@ -15,9 +15,11 @@ using Akka.IO; using Akka.Streams.Dsl; using Akka.Streams.Implementation; +using Akka.Streams.Implementation.Stages; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; using Akka.TestKit; +using Akka.Util.Internal; using FluentAssertions; using Xunit; using Xunit.Abstractions; @@ -99,6 +101,55 @@ public void FileSource_should_read_contents_from_a_file() }, _materializer); } + [Fact] + public void Filesource_could_read_partial_contents_from_a_file() + { + this.AssertAllStagesStopped(() => { + var chunkSize = 512; + var startPosition = 1000; + var bufferAttributes = Attributes.CreateInputBuffer(1, 2); + + var p = FileIO.FromFile(_testFilePath, chunkSize, startPosition) + .WithAttributes(bufferAttributes) + .RunWith(Sink.AsPublisher(false), _materializer); + + var c = this.CreateManualSubscriberProbe(); + p.Subscribe(c); + var sub = c.ExpectSubscription(); + + var remaining = _testText.Substring(1000); + + var nextChunk = new Func(() => { + string chunks; + + if (remaining.Length <= chunkSize) + { + chunks = remaining; + remaining = string.Empty; + } + else + { + chunks = remaining.Substring(0, chunkSize); + remaining = remaining.Substring(chunkSize); + } + + return chunks; + }); + + sub.Request(5000); + + var expectedChunk = nextChunk(); + for(int i=0; i<10; ++i) + { + var actual = c.ExpectNext().DecodeString(Encoding.UTF8); + actual.Should().Be(expectedChunk); + expectedChunk = nextChunk(); + } + c.ExpectComplete(); + + }, _materializer); + } + [Fact] public void FileSource_should_complete_only_when_all_contents_of_a_file_have_been_signalled() { diff --git a/src/core/Akka.Streams/Dsl/FileIO.cs b/src/core/Akka.Streams/Dsl/FileIO.cs index 9b6724e4823..6a0471a6cbc 100644 --- a/src/core/Akka.Streams/Dsl/FileIO.cs +++ b/src/core/Akka.Streams/Dsl/FileIO.cs @@ -33,12 +33,12 @@ public static class FileIO /// /// the File to read from /// the size of each read operation, defaults to 8192 + /// the start position to read from, defaults to 0 /// TBD - public static Source> FromFile(FileInfo f, int chunkSize = 8192) => - new Source>(new FileSource(f, chunkSize, DefaultAttributes.FileSource, + public static Source> FromFile(FileInfo f, int chunkSize = 8192, long startPosition = 0) => + new Source>(new FileSource(f, chunkSize, startPosition, DefaultAttributes.FileSource, new SourceShape(new Outlet("FileSource")))); - /// /// Creates a Sink which writes incoming elements to the given file and either overwrites /// or appends to it. @@ -49,11 +49,12 @@ public static Source> FromFile(FileInfo f, int chunkS /// This source is backed by an Actor which will use the dedicated "akka.stream.blocking-io-dispatcher", /// unless configured otherwise by using . /// - /// TBD - /// TBD + /// the file to write to + /// the write file mode, defaults to + /// the start position to write to, defaults to 0 /// TBD - public static Sink> ToFile(FileInfo f, FileMode? fileMode = null) => - new Sink>(new FileSink(f, fileMode ?? FileMode.OpenOrCreate, DefaultAttributes.FileSink, + public static Sink> ToFile(FileInfo f, FileMode? fileMode = null, long startPosition = 0) => + new Sink>(new FileSink(f, startPosition, fileMode ?? FileMode.OpenOrCreate, DefaultAttributes.FileSink, new SinkShape(new Inlet("FileSink")))); } } diff --git a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs index c1087edfbd9..1d414264c2d 100644 --- a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs +++ b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs @@ -31,6 +31,7 @@ internal class FilePublisher : Actors.ActorPublisher /// TBD /// TBD /// TBD + /// TBD /// TBD /// TBD /// @@ -38,22 +39,25 @@ internal class FilePublisher : Actors.ActorPublisher /// ///
    ///
  • The specified is less than or equal to zero.
  • + ///
  • The specified is less than zero
  • ///
  • The specified is less than or equal to zero.
  • ///
  • The specified is less than the specified .
  • ///
///
/// TBD public static Props Props(FileInfo f, TaskCompletionSource completionPromise, int chunkSize, - int initialBuffer, int maxBuffer) + long startPosition, int initialBuffer, int maxBuffer) { if (chunkSize <= 0) throw new ArgumentException($"chunkSize must be > 0 (was {chunkSize})", nameof(chunkSize)); + if(startPosition < 0) + throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition)); if (initialBuffer <= 0) throw new ArgumentException($"initialBuffer must be > 0 (was {initialBuffer})", nameof(initialBuffer)); if (maxBuffer < initialBuffer) throw new ArgumentException($"maxBuffer must be >= initialBuffer (was {maxBuffer})", nameof(maxBuffer)); - return Actor.Props.Create(() => new FilePublisher(f, completionPromise, chunkSize, maxBuffer)) + return Actor.Props.Create(() => new FilePublisher(f, completionPromise, chunkSize, startPosition, maxBuffer)) .WithDeploy(Deploy.Local); } @@ -65,6 +69,7 @@ private struct Continue : IDeadLetterSuppression private readonly FileInfo _f; private readonly TaskCompletionSource _completionPromise; private readonly int _chunkSize; + private readonly long _startPosition; private readonly int _maxBuffer; private readonly byte[] _buffer; private readonly ILoggingAdapter _log; @@ -79,12 +84,14 @@ private struct Continue : IDeadLetterSuppression /// TBD /// TBD /// TBD + /// TBD /// TBD - public FilePublisher(FileInfo f, TaskCompletionSource completionPromise, int chunkSize, int maxBuffer) + public FilePublisher(FileInfo f, TaskCompletionSource completionPromise, int chunkSize, long startPosition, int maxBuffer) { _f = f; _completionPromise = completionPromise; _chunkSize = chunkSize; + _startPosition = startPosition; _maxBuffer = maxBuffer; _log = Context.GetLogger(); @@ -101,6 +108,9 @@ protected override void PreStart() try { _chan = _f.Open(FileMode.Open, FileAccess.Read); + if (_startPosition > 0) { + _chan.Position = _startPosition; + } } catch (Exception ex) { diff --git a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs index b8e60e1b7fb..16e5f03278f 100644 --- a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs +++ b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs @@ -28,19 +28,23 @@ internal class FileSubscriber : ActorSubscriber /// TBD /// TBD /// TBD + /// TBD /// TBD /// TBD /// TBD - public static Props Props(FileInfo f, TaskCompletionSource completionPromise, int bufferSize, FileMode fileMode) + public static Props Props(FileInfo f, TaskCompletionSource completionPromise, int bufferSize, long startPosition, FileMode fileMode) { if(bufferSize <= 0) - throw new ArgumentException("Buffer size must be > 0"); + throw new ArgumentException($"bufferSize must be > 0 (was {bufferSize})", nameof(bufferSize)); + if(startPosition < 0) + throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition)); - return Actor.Props.Create(()=> new FileSubscriber(f, completionPromise, bufferSize, fileMode)).WithDeploy(Deploy.Local); + return Actor.Props.Create(()=> new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode)).WithDeploy(Deploy.Local); } private readonly FileInfo _f; private readonly TaskCompletionSource _completionPromise; + private readonly long _startPosition; private readonly FileMode _fileMode; private readonly ILoggingAdapter _log; private readonly WatermarkRequestStrategy _requestStrategy; @@ -53,11 +57,13 @@ public static Props Props(FileInfo f, TaskCompletionSource completionP /// TBD /// TBD /// TBD + /// TBD /// TBD - public FileSubscriber(FileInfo f, TaskCompletionSource completionPromise, int bufferSize, FileMode fileMode) + public FileSubscriber(FileInfo f, TaskCompletionSource completionPromise, int bufferSize, long startPosition, FileMode fileMode) { _f = f; _completionPromise = completionPromise; + _startPosition = startPosition; _fileMode = fileMode; _log = Context.GetLogger(); _requestStrategy = new WatermarkRequestStrategy(highWatermark: bufferSize); @@ -76,6 +82,9 @@ protected override void PreStart() try { _chan = _f.Open(_fileMode, FileAccess.Write); + if (_startPosition > 0) { + _chan.Position = _startPosition; + } base.PreStart(); } catch (Exception ex) diff --git a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs index ad020cca10e..ca7614a0249 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs @@ -24,18 +24,21 @@ namespace Akka.Streams.Implementation.IO internal sealed class FileSink : SinkModule> { private readonly FileInfo _f; + private readonly long _startPosition; private readonly FileMode _fileMode; /// /// TBD /// /// TBD + /// TBD /// TBD /// TBD /// TBD - public FileSink(FileInfo f, FileMode fileMode, Attributes attributes, SinkShape shape) : base(shape) + public FileSink(FileInfo f, long startPosition, FileMode fileMode, Attributes attributes, SinkShape shape) : base(shape) { _f = f; + _startPosition = startPosition; _fileMode = fileMode; Attributes = attributes; @@ -58,7 +61,7 @@ public FileSink(FileInfo f, FileMode fileMode, Attributes attributes, SinkShape< /// TBD /// TBD public override IModule WithAttributes(Attributes attributes) - => new FileSink(_f, _fileMode, attributes, AmendShape(attributes)); + => new FileSink(_f, _startPosition, _fileMode, attributes, AmendShape(attributes)); /// @@ -67,7 +70,7 @@ public override IModule WithAttributes(Attributes attributes) /// TBD /// TBD protected override SinkModule> NewInstance(SinkShape shape) - => new FileSink(_f, _fileMode, Attributes, shape); + => new FileSink(_f, _startPosition, _fileMode, Attributes, shape); /// /// TBD @@ -81,7 +84,7 @@ public override object Create(MaterializationContext context, out Task var settings = mat.EffectiveSettings(context.EffectiveAttributes); var ioResultPromise = new TaskCompletionSource(); - var props = FileSubscriber.Props(_f, ioResultPromise, settings.MaxInputBufferSize, _fileMode); + var props = FileSubscriber.Props(_f, ioResultPromise, settings.MaxInputBufferSize, _startPosition, _fileMode); var dispatcher = context.EffectiveAttributes.GetAttribute(DefaultAttributes.IODispatcher.AttributeList.First()) as ActorAttributes.Dispatcher; var actorRef = mat.ActorOf(context, props.WithDispatcher(dispatcher.Name)); diff --git a/src/core/Akka.Streams/Implementation/IO/IOSources.cs b/src/core/Akka.Streams/Implementation/IO/IOSources.cs index 1e85ffbd1d5..0af04d2a71f 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSources.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSources.cs @@ -24,23 +24,28 @@ internal sealed class FileSource : SourceModule> { private readonly FileInfo _f; private readonly int _chunkSize; + private readonly long _startPosition; /// /// TBD /// /// TBD /// TBD + /// TBD /// TBD /// TBD /// TBD /// TBD - public FileSource(FileInfo f, int chunkSize, Attributes attributes, SourceShape shape) : base(shape) + public FileSource(FileInfo f, int chunkSize, long startPosition, Attributes attributes, SourceShape shape) : base(shape) { if(chunkSize <= 0) - throw new ArgumentException("chunkSize must be greater than 0"); + throw new ArgumentException($"chunkSize must be > 0 (was {chunkSize})", nameof(chunkSize)); + if(startPosition < 0) + throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition)); _f = f; _chunkSize = chunkSize; + _startPosition = startPosition; Attributes = attributes; Label = $"FileSource({f}, {chunkSize})"; @@ -62,7 +67,7 @@ public FileSource(FileInfo f, int chunkSize, Attributes attributes, SourceShape< /// TBD /// TBD public override IModule WithAttributes(Attributes attributes) - => new FileSource(_f, _chunkSize, attributes, AmendShape(attributes)); + => new FileSource(_f, _chunkSize, _startPosition, attributes, AmendShape(attributes)); /// /// TBD @@ -70,7 +75,7 @@ public override IModule WithAttributes(Attributes attributes) /// TBD /// TBD protected override SourceModule> NewInstance(SourceShape shape) - => new FileSource(_f, _chunkSize, Attributes, shape); + => new FileSource(_f, _chunkSize, _startPosition, Attributes, shape); /// /// TBD @@ -85,7 +90,7 @@ public override IPublisher Create(MaterializationContext context, ou var settings = materializer.EffectiveSettings(context.EffectiveAttributes); var ioResultPromise = new TaskCompletionSource(); - var props = FilePublisher.Props(_f, ioResultPromise, _chunkSize, settings.InitialInputBufferSize, settings.MaxInputBufferSize); + var props = FilePublisher.Props(_f, ioResultPromise, _chunkSize, _startPosition, settings.InitialInputBufferSize, settings.MaxInputBufferSize); var dispatcher = context.EffectiveAttributes.GetAttribute(DefaultAttributes.IODispatcher.GetAttribute()); var actorRef = materializer.ActorOf(context, props.WithDispatcher(dispatcher.Name)); From 128418b73f91b139b8ed342a658fab3364291e5e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 8 Jun 2017 17:27:07 +0700 Subject: [PATCH 6/9] https://github.com/akka/akka/pull/22033, FileSinkSpec scala port to C# --- .../Akka.Streams.Tests.csproj | 2 +- src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs | 17 ++++++++++++++++- .../Akka.Streams.Tests/IO/FileSourceSpec.cs | 4 ++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj index 2b6ff7d2f87..bb9900a087f 100644 --- a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj +++ b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj @@ -39,7 +39,7 @@ - $(DefineConstants);SERIALIZATION;CONFIGURATION;UNSAFE_THREADING + TRACE;DEBUG;SERIALIZATION;CONFIGURATION;UNSAFE_THREADING;NET452;NET452 diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 375221c85e2..4b7edd4bca3 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -172,7 +172,7 @@ public void SynchronousFileSink_should_allow_writing_from_specific_position_to_t Func, long, Task> write = (lines, pos) => Source.From(lines) .Select(ByteString.FromString) - .RunWith(FileIO.ToFile(f, fileMode:FileMode.Create, startPosition:pos), _materializer); + .RunWith(FileIO.ToFile(f, fileMode:FileMode.OpenOrCreate, startPosition:pos), _materializer); var completion1 = write(_testLines, 0); completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); @@ -253,6 +253,21 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sink() { this.AssertAllStagesStopped(() => { + TargetFile(f => { + var lazySink = Sink.LazySink( + (ByteString _) => Task.FromResult(FileIO.ToFile(f)), + () => Task.FromResult(IOResult.Success(0))) + .MapMaterializedValue(t => { + t.Wait(TimeSpan.FromSeconds(3)); + return t.Result; + }); + + var completion = Source.From(new []{_testByteStrings.Head()}) + .RunWith(lazySink, _materializer); + + completion.Wait(TimeSpan.FromSeconds(3)); + CheckFileContent(f, _testLines.Head()); + }); }, _materializer); } diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index 95655b33285..d8a71cd8cfc 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -4,7 +4,7 @@ // Copyright (C) 2013-2016 Akka.NET project // //----------------------------------------------------------------------- -//#if AKKAIO +#if AKKAIO using System; using System.Collections.Generic; using System.IO; @@ -109,7 +109,7 @@ public void Filesource_could_read_partial_contents_from_a_file() var startPosition = 1000; var bufferAttributes = Attributes.CreateInputBuffer(1, 2); - var p = FileIO.FromFile(_testFilePath, chunkSize, startPosition) + var p = FileIO.FromFile(TestFile(), chunkSize, startPosition) .WithAttributes(bufferAttributes) .RunWith(Sink.AsPublisher(false), _materializer); From b663263a8163c5f311a497ccda0781bdf46521ad Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 8 Jun 2017 23:40:56 +0700 Subject: [PATCH 7/9] Update CoreAPISpec list --- .../Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 48ef4263411..d63f5ff3c23 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1056,8 +1056,8 @@ namespace Akka.Streams.Dsl } public class static FileIO { - public static Akka.Streams.Dsl.Source> FromFile(System.IO.FileInfo f, int chunkSize = 8192) { } - public static Akka.Streams.Dsl.Sink> ToFile(System.IO.FileInfo f, System.Nullable fileMode = null) { } + public static Akka.Streams.Dsl.Source> FromFile(System.IO.FileInfo f, int chunkSize = 8192, long startPosition = 0) { } + public static Akka.Streams.Dsl.Sink> ToFile(System.IO.FileInfo f, System.Nullable fileMode = null, long startPosition = 0) { } } public class static Flow { From fe63b3fc669af45c6925960516a363a4416de05c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 9 Jun 2017 06:13:15 +0700 Subject: [PATCH 8/9] Apply requested code formatting changes --- .../Akka.Streams.Tests/IO/FileSinkSpec.cs | 70 +++++++++---------- .../Akka.Streams.Tests/IO/FileSourceSpec.cs | 6 +- .../Implementation/IO/FilePublisher.cs | 3 +- .../Implementation/IO/FileSubscriber.cs | 3 +- src/core/Akka.Streams/Implementation/Sinks.cs | 9 +-- 5 files changed, 43 insertions(+), 48 deletions(-) diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 4b7edd4bca3..591408da429 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -152,39 +152,41 @@ public void SynchronousFileSink_should_allow_appending_to_file() [Fact] public void SynchronousFileSink_should_allow_writing_from_specific_position_to_the_file() { - TargetFile(f => { - var testLinesCommon = new List + this.AssertAllStagesStopped(() => + { + TargetFile(f => { - new string('a', 1000) + "\n", - new string('b', 1000) + "\n", - new string('c', 1000) + "\n", - new string('d', 1000) + "\n", - }; + var testLinesCommon = new List + { + new string('a', 1000) + "\n", + new string('b', 1000) + "\n", + new string('c', 1000) + "\n", + new string('d', 1000) + "\n", + }; - var commonByteString = ByteString.FromString(testLinesCommon.Join("")).Compact(); - var startPosition = commonByteString.Count; + var commonByteString = ByteString.FromString(testLinesCommon.Join("")).Compact(); + var startPosition = commonByteString.Count; - var testLinesPart2 = new List() - { - new string('x', 1000) + "\n", - new string('x', 1000) + "\n", - }; + var testLinesPart2 = new List() + { + new string('x', 1000) + "\n", + new string('x', 1000) + "\n", + }; - Func, long, Task> write = (lines, pos) => Source.From(lines) - .Select(ByteString.FromString) - .RunWith(FileIO.ToFile(f, fileMode:FileMode.OpenOrCreate, startPosition:pos), _materializer); + Task write(List lines, long pos) => Source.From(lines) + .Select(ByteString.FromString) + .RunWith(FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: pos), _materializer); - var completion1 = write(_testLines, 0); - completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - var result1 = completion1.Result; + var completion1 = write(_testLines, 0); + var result1 = completion1.AwaitResult(); - var completion2 = write(testLinesPart2, startPosition); - completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - var result2 = completion2.Result; + var completion2 = write(testLinesPart2, startPosition); + var result2 = completion2.AwaitResult(); - f.Length.ShouldBe(startPosition + result2.Count); - CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join("")); - }); + f.Length.ShouldBe(startPosition + result2.Count); + CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join("")); + }); + }, _materializer); } [Fact] @@ -248,24 +250,22 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att }, _materializer); } - // Needed help converting this test case [Fact] public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sink() { - this.AssertAllStagesStopped(() => { - TargetFile(f => { + this.AssertAllStagesStopped(() => + { + TargetFile(f => + { var lazySink = Sink.LazySink( (ByteString _) => Task.FromResult(FileIO.ToFile(f)), - () => Task.FromResult(IOResult.Success(0))) - .MapMaterializedValue(t => { - t.Wait(TimeSpan.FromSeconds(3)); - return t.Result; - }); + () => Task.FromResult(IOResult.Success(0))) + .MapMaterializedValue(t => t.AwaitResult()); var completion = Source.From(new []{_testByteStrings.Head()}) .RunWith(lazySink, _materializer); - completion.Wait(TimeSpan.FromSeconds(3)); + completion.AwaitResult(); CheckFileContent(f, _testLines.Head()); }); }, _materializer); diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index d8a71cd8cfc..9d8bccedf53 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -104,7 +104,8 @@ public void FileSource_should_read_contents_from_a_file() [Fact] public void Filesource_could_read_partial_contents_from_a_file() { - this.AssertAllStagesStopped(() => { + this.AssertAllStagesStopped(() => + { var chunkSize = 512; var startPosition = 1000; var bufferAttributes = Attributes.CreateInputBuffer(1, 2); @@ -141,8 +142,7 @@ public void Filesource_could_read_partial_contents_from_a_file() var expectedChunk = nextChunk(); for(int i=0; i<10; ++i) { - var actual = c.ExpectNext().DecodeString(Encoding.UTF8); - actual.Should().Be(expectedChunk); + c.ExpectNext().DecodeString(Encoding.UTF8).Should().Be(expectedChunk); expectedChunk = nextChunk(); } c.ExpectComplete(); diff --git a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs index 1d414264c2d..d60d81452df 100644 --- a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs +++ b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs @@ -108,9 +108,8 @@ protected override void PreStart() try { _chan = _f.Open(FileMode.Open, FileAccess.Read); - if (_startPosition > 0) { + if (_startPosition > 0) _chan.Position = _startPosition; - } } catch (Exception ex) { diff --git a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs index 16e5f03278f..1068a963300 100644 --- a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs +++ b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs @@ -82,9 +82,8 @@ protected override void PreStart() try { _chan = _f.Open(_fileMode, FileAccess.Write); - if (_startPosition > 0) { + if (_startPosition > 0) _chan.Position = _startPosition; - } base.PreStart(); } catch (Exception ex) diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index 46a6ab5976a..cae628e8e63 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -992,12 +992,9 @@ private void InitInternalSource(Sink sink, TIn firstElement) var matVal = Source.FromGraph(sourceOut.Source) .RunWith(sink, Interpreter.SubFusingMaterializer); _completion.TrySetResult(matVal); - } catch (Exception ex) { - /* - case NonFatal(ex) => - promise.tryFailure(ex) - failStage(ex) - */ + } + catch (Exception ex) + { _completion.TrySetException(ex); FailStage(ex); } From 6b2f726cc247d818894cc8cc9072b5dfc1c9cf2a Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Sun, 25 Jun 2017 11:28:41 +0200 Subject: [PATCH 9/9] fix merge issue --- src/core/Akka.Streams/ActorMaterializer.cs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/core/Akka.Streams/ActorMaterializer.cs b/src/core/Akka.Streams/ActorMaterializer.cs index 03dacfb45dc..4faa41d45bb 100644 --- a/src/core/Akka.Streams/ActorMaterializer.cs +++ b/src/core/Akka.Streams/ActorMaterializer.cs @@ -140,15 +140,6 @@ private static ActorSystem ActorSystemOf(IActorRefFactory context) /// public abstract TMat Materialize(IGraph runnable, Attributes initialAttributes); - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - public abstract TMat Materialize(IGraph runnable, Attributes initialAttributes); - /// /// TBD /// @@ -630,4 +621,4 @@ public static class ActorMaterializerExtensions public static ActorMaterializer Materializer(this IActorRefFactory context, ActorMaterializerSettings settings = null, string namePrefix = null) => ActorMaterializer.Create(context, settings, namePrefix); } -} \ No newline at end of file +}