Skip to content

Stream.consumeAsFlow() finally block not getting called #1825

@josephlbarnett

Description

@josephlbarnett

We're using java.util.stream.Stream.consumeAsFlow() to process a stream as a flow.

our usage looked like:

val stream = jooqQuery.fetchLazy().stream().map { /* map result to data class */}
return stream.consumeAsFlow().flowOn(Dispatchers.IO)

Occasionally, it appears that the finally { stream.close() } block is not getting called (possibly on certain cancellation / error conditions?), resulting in connection leaks. However, it appears that if we add an onCompletion {} to the returned flow, that block does get called even when the finally gets skipped:

val stream = jooqQuery.fetchLazy().stream().map { /* map result to data class */}
return stream.consumeAsFlow().flowOn(Dispatchers.IO)
       .onCompletion { stream.close() } // should not need this, as the finally block in the StreamFlow should close it

Have verified this by duplicating the StreamFlow class and adding logging to the finally block, as well as having logging in the .onCompletion block above, and seen a few instances of the completion log firing without the finally log firing, but can't easily reproduce this (usually both fire).

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions