-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streams update 2.4.18 #2737
Streams update 2.4.18 #2737
Conversation
…dling logic to GotCompletionEvent function
Add try..catch block to LazySink
@Arkatufus you need to update the ApiApproval file because you've changed the public API. |
Done :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only a few small remarks, looks very good overall @Arkatufus 👍
@@ -149,6 +150,44 @@ public void SynchronousFileSink_should_allow_appending_to_file() | |||
} | |||
|
|||
[Fact] | |||
public void SynchronousFileSink_should_allow_writing_from_specific_position_to_the_file() | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.AssertAllStagesStopped(() => {
is missing
new string('x', 1000) + "\n", | ||
}; | ||
|
||
Func<List<string>, long, Task<IOResult>> write = (lines, pos) => Source.From(lines) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use local function, is easier to read ;-)
.RunWith(FileIO.ToFile(f, fileMode:FileMode.OpenOrCreate, startPosition:pos), _materializer); | ||
|
||
var completion1 = write(_testLines, 0); | ||
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a AwaitResult
overload that does the same as the scala version, so you can write var result1 = completion.AwaitResult();
3 seconds is the default timeout
var result1 = completion1.Result; | ||
|
||
var completion2 = write(testLinesPart2, startPosition); | ||
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
@@ -209,6 +248,29 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att | |||
}, _materializer); | |||
} | |||
|
|||
// Needed help converting this test case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed ? :)
() => Task.FromResult(IOResult.Success(0))) | ||
.MapMaterializedValue(t => { | ||
t.Wait(TimeSpan.FromSeconds(3)); | ||
return t.Result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more like t.ContinueWIth(r=>r.Result.Result)
but not sure if it matters, if the test passes leave it as it is ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure either, the origin code is a flatMap to ExecutionContexts, so the closest I can think of is to await the Task<Task<IOResult>>
TMat. It does work though, I ran all the tests.
Should I leave AKKAIO defined for Stream.Tests?
} catch (Exception ex) { | ||
/* | ||
case NonFatal(ex) => | ||
promise.tryFailure(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment can be removed
for(int i=0; i<10; ++i) | ||
{ | ||
var actual = c.ExpectNext().DecodeString(Encoding.UTF8); | ||
actual.Should().Be(expectedChunk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not simply .Be(nextChunk())
?
@@ -76,6 +82,9 @@ protected override void PreStart() | |||
try | |||
{ | |||
_chan = _f.Open(_fileMode, FileAccess.Write); | |||
if (_startPosition > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either no curly braces or in the next line, but no java style 😛
@@ -101,6 +108,9 @@ protected override void PreStart() | |||
try | |||
{ | |||
_chan = _f.Open(FileMode.Open, FileAccess.Read); | |||
if (_startPosition > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either no curly braces or in the next line, but no java style 😛
Sorry for the delay on this @Arkatufus - got @Silv3rcircl3 's upstream changes merged in and we're going to work on bringing this in too now. My fault for the delay; was preoccupied getting 1.2.1 out today. |
It's no problem, really :) |
Port of akka/akka#22657, akka/akka#22647 and akka/akka#22033.
Re-base from 59af0c2 for LazySource class.