Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of file rotation / resource proxying #1667

Merged
merged 19 commits into from
Nov 4, 2019

Conversation

mpilquist
Copy link
Member

@mpilquist mpilquist commented Oct 16, 2019

Interested to hear thoughts on this. The ResourceProxy type is general enough for cats-effect but probably only really useful when used with Stream. AFAIK, this is the first resource & memory safe way to do file rotation with fs2. I've implemented versions of this through the years dating back to 0.8 but they all at least leaked a Release node in the algebra if nothing else.

Needs a lot of docs and tests and the signature of writeRotate isn't general enough probably. Should also make writeRotate behave like writeAll in that it respects the APPEND file option.

@SystemFw
Copy link
Collaborator

SystemFw commented Oct 16, 2019

I like the addition. As with many things I'd like to refine it here in fs2 before even considering it for cats-effect, especially given that as you say it appears mostly useful with Stream.

Is the big uncancelable there because this is a draft btw?

I collected the questions I originally had in the channel. It would be good to spec the behaviour out precisely (not necessarily in a test to begin with ofc, comments only are fine)
what are your desired semantics re: acquire of newValue vs release of current resource?

  • lazy or eager? (eager)
  • any sync requirement between new acquire vs old release?
  • does swap return immediately or waits for the new value to be acquired?
  • when do errors in acquisition materialise? (swap vs get vs nowhere and F[R] never returns)
  • the other question is when you would like to release the current resource: when the new one has finished allocating or immediately
  • and what happens if you interrupt a swap? interrupted exception on get, or old resource stays there

Looks like a cool idea though :)

@mpilquist
Copy link
Member Author

mpilquist commented Oct 16, 2019

Is the big uncancelable there because this is a draft btw?

Not sure. I was thinking that was a nice way to avoid dealing with cancellation occurring mid-swap. This has a nice symmetry with acquire being uncancelable -- swap is really an acquire + a release.

lazy or eager? (eager)

Eager

any sync requirement between new acquire vs old release?

New acquire completes and then old release occurs. This simplifies of bunch of cases -- e.g., no need for special error handling when acquiring fails as part of swap.

does swap return immediately or waits for the new value to be acquired?

Waits for new acquire, then waits for old release, then returns new R

when do errors in acquisition materialise? (swap vs get vs nowhere and F[R] never returns)

On swap, and the already acquired resource remains the target of the proxy

the other question is when you would like to release the current resource: when the new one has finished allocating or immediately

After successful acquisition of new resource

and what happens if you interrupt a swap? interrupted exception on get, or old resource stays there

Avoided this issue by making swap uncancellable.

@SystemFw
Copy link
Collaborator

Thanks for all the answers :)

This has a nice symmetry with acquire being uncancelable -- swap is really an acquire + a release.

A single acquire is uncancelable, but Resource isn't: if a resource is made of multiple steps, it can be canceled in between (and all the finalisers run)

@mpilquist
Copy link
Member Author

Oh good point. I tend to think of a resource as a single acquire. Any ideas on how to maintain cancellation in swap? Or alternatively, any use cases where uncancellable swap is problematic?

@SystemFw
Copy link
Collaborator

Any ideas on how to maintain cancellation in swap?

Not right now, but this is why I wanted the spec (which you provided above), so I can play with implementing that.

Or alternatively, any use cases where uncancellable swap is problematic?

well, my instinct is that Resource is becoming more and more widespread to tackle several aspects (not just simple "traditional" resources like files), so it's better to not assume too much from the call site (e.g. that we can turn off cancelation). Let's see if it's possible to implement with cancelation sanely are reassess. I am extremely busy at work this month but hopefully I can have a look during the weekend

@Daenyth
Copy link
Contributor

Daenyth commented Oct 17, 2019

It feels like this is similar to some work I've done previously; namely this: https://gist.github.com/Daenyth/28243952f1fcfac6e8ef838040e8638e

@mpilquist
Copy link
Member Author

I pushed WriteCursor and ReadCursor to this PR too.

@SystemFw
Copy link
Collaborator

SystemFw commented Oct 20, 2019

I have been doing some thinking about this, and I have the two followup questions:

  • Right now the semantics are "acquire previous resource, then release old one". Wouldn't this lead to potential deadlocks if the two Resource somehow contend over the same resource (e.g. a lock)? Is this even a valid use case?
  • What happens if two fibers call swap concurrently? I think the current implementation cannot really handle this case

@mpilquist
Copy link
Member Author

Hm, I don't recall any obvious issues that occur when two fibers call swap concurrently. Last one in wins and all finalizers except for the winner's are run.

@SystemFw
Copy link
Collaborator

Actually you are right, apologies, I have made a mistake thinking about that access there :)

@SystemFw
Copy link
Collaborator

SystemFw commented Oct 21, 2019

I don't recall any obvious issues that occur when two fibers call swap concurrently.

First of all sorry for the endless questions, hopefully I'm not giving you too much of a hard time on this PR :)
What I mean by this, after more thought, is: what if something calls swap whilst something else is still using the Resource? It will not fail graciously.
I guess we can either say

  • "don't do that", but then it kinda makes any concurrent usage unsafe, or
  • "swap will wait until current use of the Resource is done" , which probably means changing the api to unify swap and get
  • we will try to interrupt current usage , which also probably requires unifying swap and get

If we do unify, it's still a bit unclear to me what the type should be, my first thought was to pass in a function representing the usage of the Resource, but it's then debatable whether the function should return IO , or Stream, for example


P.S: On a separate point, I'm making progress on the interruption aspect. I'm also tempted to write one version with the close old then open new semantics and another with the open new then close old and see which one we like best

@Daenyth
Copy link
Contributor

Daenyth commented Oct 21, 2019

changing the api to unify swap and get

This core problem is why I used the use shape in the CachedResource I linked

@mpilquist
Copy link
Member Author

We could replace get with useCurrent[A](f: R => F[A]): F[A] and then guard finalization by a semaphore?

@Daenyth
Copy link
Contributor

Daenyth commented Oct 21, 2019

That's effectively what I did, except using Ref+Deferred along the lines of Fabio's talk rather than using Semaphore

@Daenyth
Copy link
Contributor

Daenyth commented Oct 21, 2019

Notably, I was able to also guard incoming useCurrent calls against that same Deferred so that after invalidation, new calls wouldn't begin until the new resource was ready for them

@SystemFw
Copy link
Collaborator

I was able to also guard incoming useCurrent calls against that same Deferred so that after invalidation, new calls wouldn't begin until the new resource was ready for them

Yeah I was planning to do this as well.
In the meantime I'll see to finish the other mostly unrelated changes and push

@SystemFw
Copy link
Collaborator

I think the api will be:

trait ResourceProxy[F, A] {
  def swap(next: Resource[F, A]): Resource[F, A]
}
object ResourceProxy {
   def create[F[_]: Concurrent, A](initial: Resource[F, A]): Resource[F, ResourceProxy[F, A]]
}

With the following semantics: swap will wait until the previous consumer is done (which can be tracked with the type above), then will shutdown the current resource and acquire the new one.
I'm working on the implementation on this (I have it mostly done in my head :P). Let me know if the above makes sense

@mpilquist
Copy link
Member Author

mpilquist commented Oct 22, 2019 via email

@SystemFw
Copy link
Collaborator

How would you get access to the initial allocated resource?

This turned out to be easy with some slight changes to the api. However, the idea of tracking the consumer is not possible unfortunately: having swap take an A => F[B] does not work because you need to recurse in Pull, and having swap return a Resource, which would be the nicest option is also not possible because Pull is not powerful enough to embed a Resource (which makes sense since the whole point of this PR is remove Pull.bracket*).

That means that unavoidably is up to the user to ensure no consumer is using R when swap is called, lest the consumer will fail since R gets closed.

On the bright side, I have made swap interruptible without much effort.
I have not changed the order in which newAcquisition-oldFinalization happens, since the point above means that this combinator is not quite right for other concurrent use cases anyway. I do still fear some people might find it surprising, and I'll change it if anyone can sway me with a short argument

@mpilquist
Copy link
Member Author

Looks great! Think we're ready for merge soon?

One question on writeRotate -- what do you think of the F[Path] param versus something like a fold? S => F[(S, Path)]. My original thinking was that F[Path] is sufficient b/c you'd typically generate the next name based on current files on disk and you can always use a Ref if you want.

@SystemFw
Copy link
Collaborator

Think we're ready for merge soon?

I think so yeah. The only question is the order of finalisation-acquisition, in an ideal world someone would come in with a really strong argument one way or another.

Oh, and I guess more testing on the semantics of ResourceProxy in general wouldn't be out of place :P

One question on writeRotate -- what do you think of the F[Path] param versus something like a fold? S => F[(S, Path)]

This is where I left off yesterday, I'm actually not sure. I'm leaning towards F[Path] being enough but we should probably change the argument name to computePath and add more scaladocs with examples

@mpilquist
Copy link
Member Author

I'm hoping to make some progress on this today, time allowing.

The current API is kind of awkward given that ResourceProxy is really only useful with Stream. Every usage would need to follow this pattern:

Stream.resource(ResourceProxy.create[F, R]).flatMap { p => 
  Stream.eval(p.swap(mkResource)).flatMap { r => 
      // use r, call p.swap again if needed
  }
}

That is:

  1. Create empty proxy
  2. Lift via Stream.resource and immediately flatMap
  3. Swap an initial target in to the proxy and eval + flatMap result of that
  4. Use the resource and proxy

Thoughts on adding this as a combinator? Something like:

Stream.resourceProxy(mkResource).flatMap { case (p, r) =>
  // use r, call p.swap again if needed
}

// On Stream
def resourceProxy[F[_], R](mk: Resource[F, R]): Stream[F, (ResourceProxy[F, R], R)] =
  Stream.resource(ResourceProxy.create[F, R]).flatMap { p =>
    Stream.eval(p.swap(mk)).flatMap { r =>
      Stream((p, r))
    }
  }

Note this has nicer type inference.

Also, I wonder if the "proxy" notion is intuitive? Maybe instead we should call it Hotswap or HotswapResource?

@SystemFw
Copy link
Collaborator

SystemFw commented Oct 30, 2019

I agree that it's slightly clunky, but not 100% sure that adding the entire combinator to Stream is called for.
How about adding another constructor to ResourceProxy like (name up for debate:

def initial(r: Resource[F, R]): Resource[F, (ResourceProxy[F, R], R)] = 
   create.evalMap(...)

and the boilerplate is reduced to Stream.resource(initial).flatmap { case (proxy, r) =>, which isn't any worse (I think?) than any other resource-based thing.


I do agree that the proxy name is not super intuitive though

EDIT: if you still think that Stream.resource(mkResource).flatMap is nicer than Stream.resource(ResourceProxy.initial(mkResource)).flatMap, I'm fine with with btw, not a big deal

@mpilquist
Copy link
Member Author

Yep good idea.

What do you think of Hotswap instead of ResourceProxy?

@SystemFw
Copy link
Collaborator

I like Hotswap yeah

@mpilquist
Copy link
Member Author

OK I think this is ready for review.

One question -- I think we should support a clear: F[Unit] operation which puts the ref back to the initial empty state. Thoughts?

@mpilquist mpilquist marked this pull request as ready for review October 31, 2019 15:59
@SystemFw
Copy link
Collaborator

OK I think this is ready for review.

Maybe I will add a couple of tests around interruption but we can merge, no need to hold off longer

One question -- I think we should support a clear: F[Unit] operation which puts the ref back to the initial empty state. Thoughts?

what's the use case? Early finalisation of something when you don't yet know when to swap?

@mpilquist
Copy link
Member Author

Yeah, was imagining a use case where you want to close a resource but you don't have the info needed to create a new one until later.

* Creates a new `Hotswap` initialized with the specified resource.
* The `Hotswap` instance and the initial resource are returned.
*/
def apply[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (Hotswap[F, R], R)] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how inconvenient do you think it would be to rename this to create or make, and create to empty? apply is pretty much as convenient as the Stream combinator you wanted, so if you really want that convenience, ignore this comment. My doubt stems from the fact that apply on tagless stuff is generally for summoning

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK, I like using apply for the default constructor. We have precedence for it too with things like Deferred and Socket and FileHandle.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, leave it

* r2 released |
* r3 released
* }}}
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty slick :)

@mpilquist
Copy link
Member Author

@SystemFw I added the clear method -- let me know what you think. If no objections, I'll merge today and do a release.

}

override def clear: F[Unit] =
swapFinalizer(().pure[F])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs uncancelable here :)

@SystemFw
Copy link
Collaborator

SystemFw commented Nov 4, 2019

Let's merge after the one comment is resolved :)

@mpilquist mpilquist merged commit c65bfc9 into typelevel:master Nov 4, 2019
@mpilquist mpilquist added this to the 2.1.0 milestone Nov 5, 2019
@mpilquist mpilquist deleted the wip/rproxy branch February 18, 2020 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants