-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topic/scope #974
Topic/scope #974
Conversation
…/fs2 into topic/scope # Conflicts: # core/shared/src/main/scala/fs2/internal/Algebra.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pchlupacek I'm generally good with this approach. I added some minor style comments.
Let me know when you have all the tests passing. I don't understand some of the changes from the old algorithm to this one, but maybe once all tests pass, things will be clearer to me.
* Note that acquisition of the scope is synchronous, so it is safe to await its evaluation. | ||
* | ||
*/ | ||
def acquire: F[Option[F[Unit]]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this something else? I think the name is too close to the resource management functions above (e.g. beginAcquire)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about acquireScope?
, children: ListMap[Token, Scope[F]] | ||
) { | ||
|
||
def open_? : Boolean = closed.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style thing: we don't use the _?
syntax anywhere else in the project so I'd prefer to call these isOpen
, isClosed
, isClosing
, isClosed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* | ||
* @tparam F | ||
*/ | ||
final private[Scope] case class ScopeState[F[_]]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is defined in Scope
companion, can we name it State
? Also, no need for the [Scope]
modifier on private -- Scope
can see private members of its companion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, sure.
|
||
object SyncRef { | ||
|
||
def apply[F[_] : Sync, A](a: A): SyncRef[F, A] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should return F[SyncRef[F,A]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sure, if we do it public
* Think of it as an java AtomicReference wrapped in `F` | ||
* | ||
*/ | ||
final class SyncRef[F[_], A] (private val ar: AtomicReference[A])(implicit F: Sync[F]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've wanted this type a number of times and I've seen people re-invent it. Perhaps we should consider putting a more polished version of it in fs2.async
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, is a bit of awkward to put in async :-) beacuse it is sync, but I may just add it there and see if that works ?
|
||
final class Scope[F[_]] private (private val id: Token, private val parent: Option[Scope[F]])(implicit F: Sync[F]) { self => | ||
|
||
private val state: SyncRef[F, ScopeState[F]] = SyncRef(ScopeState.initial.asInstanceOf[ScopeState[F]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change initial
to be polymorphic effect and remove this cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I wanted to have this inital as val, so it won't get recreated with every new Scope acquisition. and I wanted SyncRef to be not +F? I may put the initial to be def, and put case somehow there if you think is better style ? Or would you be so kind and suggest something instead?
@mpilquist thanks so much. Will keep you updated. |
I haven't had any time to go over this, but I have a question: does this open the possibility to have a resource safe |
- Resources now track leases - Scope code much more simplified - Catenable got some more combinators (collect, flatMap, delete)
…/fs2 into topic/scope
@SystemFw just missed your question. I think sort of by definition of IO this is not possible. This change is sort just escaping from imperative code and monitors to |
@mpilquist just pushed changes. I sort of reverted to your previous model with ExportResources, however ImportResources is not necessary. Couple of notes:
Also, I think as result of this changes runFold may not need Effect/EC and may be now defined only with Sync. However there is still caveat (which doesn't seem to be related to my changes) and that is when code in eval_() fails, in that case somehow the exception is not getting propagates as it should be. This seems to deadlock interpreter, and I am still investigating it. The last point where I see exception is here : Also I would like to extract SyncRef by another PR than this one ? |
@pchlupacek Yeah let's do the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about a third of the way through reviewing but I figured I'd release this batch of minor comments. I'd like to do an editing pass of the ScalaDoc at some point but that can wait until we're done (and I don't mind doing that in a separate PR that you can review).
@@ -1299,6 +1291,10 @@ object Stream { | |||
def iterateEval[F[_],A](start: A)(f: A => F[A]): Stream[F,A] = | |||
emit(start) ++ eval(f(start)).flatMap(iterateEval(_)(f)) | |||
|
|||
/** Allows to get current scope during evaluation of the stream **/ | |||
def getScope[F[_]]: Stream[F, Scope[F]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're making Scope
part of the public api here, we'll need a trait Scope[F[_]]
in the fs2
package. We can keep the implementation of that trait in fs2.internal
if that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense will do
case None => this | ||
} | ||
final class Token { | ||
override def toString: String = s"T[${hashCode.toHexString}]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to leave this?
* | ||
* 1. Resource is created and registered with the scope (Algebra.Acquire) | ||
* 2. Resource is acquired | ||
* 3. Resource `acquired` is invoked to conform acquisition of the resource. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/conform/confirm/
* the Resource and acts like (1). | ||
* (3) The `acquired` was evaluated after scope was `released` by either (1) or (2). In this case | ||
* resource will invoke the finalization immediately if the resource is not leased. | ||
* (4) The `cancelLEase` is invoked for previously leased resource. This will invoke resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/cancelLEase/cancelLease/
* (4) The `cancelLEase` is invoked for previously leased resource. This will invoke resource | ||
* finalization if resource was already acquired and released and there are no more outstanding leases. | ||
* | ||
* Resources may be leased to other scopes. in that case, each scope must lease resource with `lease` and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/in/In/
* Resources may be leased to other scopes. in that case, each scope must lease resource with `lease` and | ||
* when scope is closed (or when the resource lease is no longer required) release the lease with `cancelLease`. | ||
* | ||
* Note thate every method which as part of its invocation may potentially call resource finalizer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/thate/that/
*/ | ||
private[internal] sealed trait Resource[F[_]] { | ||
|
||
/** if of the resource **/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/if/id/ -- also, end the summary doc with a period to be consistent with other doc in project
* thus this may result in finalizer to fail. | ||
* | ||
*/ | ||
def cancelLease: F[Either[Throwable, Unit]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not be a good idea but we could encode the lease state machine directly in the types by changing lease
to be def lease: F[Option[Lease[F]]]
where:
trait Lease[F[_]] {
def cancel: F[Either[Throwable,Unit]]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps you right just lease: F[Lease[F]]. Not sure if we really need that option here.
} | ||
|
||
/** | ||
* Traverses supplued catenable with `f` that may produce failure, and collects theses failures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/supplued/supplied/
…/fs2 into topic/scope
+ Introduces CompositeFailure + Introduces Lease[F] as type wraping resource leases + Introduces StreamScope[F] a public API to Scope[F]
* In these scenarios, COmposite failure assures that user won't lose any intermediate failures. | ||
*/ | ||
class CompositeFailure( | ||
first: Throwable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps use cats.data.NonEmptyList
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
class CompositeFailure( | ||
first: Throwable | ||
, others: List[Throwable] | ||
) extends Throwable(s"Resources failed to finalize: [${first.getClass.getName}: ${first.getMessage}] + ${others.size} others", first) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend reorganizing this message to the "+ N others" part of the string shows up in the prefix and is hence unambiguous as to whether it is part of the first.getMessage. Something like: if (exceptions.tail.headOption.isEmpty) exceptions.head.getMessage else s"Multiple exceptions were thrown (${exceptions.size}), first: ${exceptions.head.getMessage}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* | ||
* Scope's methods are used to perform low-level actions on stream interpretation, such as leasing the resources. | ||
*/ | ||
trait StreamScope[F[_]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to call this Scope
(and the one in fs2.internal
could be called ScopeImpl
or DefaultScope
or something). Keep the uglier name for the internals and the nicer name for the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so how about Scope as public and ScopeRunFold in implementation?
/** | ||
* Wraps leased resources from the scope of the other Stream. | ||
*/ | ||
trait Lease[F[_]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on this being in Scope
companion, to keep top-level package more concise?
object Scope {
trait Lease[F[_]] { ... }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
@mpilquist, @pchiusano, @SystemFw
This is an attempt to move Scope to
F
.The reasoning behind Scope move to
F
is to prevent any monitors (synchronisations) in fs2 codebase and to improve correctness and reasoning about scopes.I also added documentation to Scope for people to get in internals easier. Perhaps one day we shall add this to Algebra too.
Worth of note :
Introduction of SyncRef (internal only) that is essentially
F
wrapper around AtomicRef, providing the simplified API cosistent withRef
. It should provide less overhead than actor basedRef
and just needsSync[F]
instance which is available in runFold. Not sure if that shall be public, but perhaps it is useful.instead [Import/Export]Resources we have now possibility to acquire scope as resource to another stream. Doing so, shall prevent that scope to be released until stream that acquired that scope as resource will terminate and thus release its acquisition
The mechanism of scope-acquisition is based on midAcquires concept already present, where each acquisition is incrementing the mid-acquire (now renamed to refCount) value.
The scope can be now
visible
while evaluating the stream. I am not quite sure if that is useful, but I leaved this now public, perhaps we can make it fs2 private. One of the ideas I was playing here with was to use this to implement interruption of the downstream as discussed here Add primitive support for interruption #916Still have couple tests failing, so I will work on this during next few days, hopefully resolving all of them.
I don't expect this to have significant impact on the benchmarks and performance.
Any initial input is welcome.