Skip to content

Commit

Permalink
Simplify AsyncCacheCell (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Aug 4, 2020
1 parent 48cb77e commit a4c94a3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Remove `module Commands` convention from in examples
- Revise semantics of Cart Sample Command handling
- `Cosmos:` Removed [warmup call](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/1436)
- Simplify `AsyncCacheCell` [#229](https://github.com/jet/equinox/pull/229)

### Removed
### Fixed
Expand Down
72 changes: 38 additions & 34 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
@@ -1,47 +1,51 @@
namespace Equinox.Core

/// Asynchronous Lazy<'T> that guarantees workflow will be executed at most once.
/// Asynchronous Lazy<'T> used to gate a workflow to ensure at most once execution of a computation.
type AsyncLazy<'T>(workflow : Async<'T>) =
let task = lazy(Async.StartAsTask workflow)
let task = lazy (Async.StartAsTask workflow)

/// Await the outcome of the computation.
/// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this
member __.AwaitValue() = Async.AwaitTaskCorrect task.Value
member internal __.PeekInternalTask = task

/// Generic async lazy caching implementation that admits expiration/recomputation semantics
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) =
let mutable currentCell = AsyncLazy workflow
/// Synchronously check whether the value has been computed (and/or remains valid)
member this.IsValid(?isExpired) =
if not task.IsValueCreated then false else

let initializationFailed (value : System.Threading.Tasks.Task<_>) =
// for TMI on this, see https://stackoverflow.com/a/33946166/11635
value.IsCompleted && value.Status <> System.Threading.Tasks.TaskStatus.RanToCompletion
let value = task.Value
if not value.IsCompleted || value.IsFaulted then false else

let update cell = async {
// avoid unnecessary recomputation in cases where competing threads detect expiry;
// the first write attempt wins, and everybody else reads off that value
let _ = System.Threading.Interlocked.CompareExchange(&currentCell, AsyncLazy workflow, cell)
return! currentCell.AwaitValue()
}
match isExpired with
| Some f -> not (f value.Result)
| _ -> true

/// Enables callers to short-circuit the gate by checking whether a value has been computed
member __.PeekIsValid() =
let cell = currentCell
let currentState = cell.PeekInternalTask
if not currentState.IsValueCreated then false else
/// Used to rule out values where the computation yielded an exception or the result has now expired
member this.TryAwaitValid(?isExpired) : Async<'T option> = async {
// Determines if the last attempt completed, but failed; For TMI see https://stackoverflow.com/a/33946166/11635
if task.Value.IsFaulted then return None else

let value = currentState.Value
not (initializationFailed value)
&& (match isExpired with Some f -> not (f value.Result) | _ -> false)
let! result = this.AwaitValue()
match isExpired with
| Some f when f result -> return None
| _ -> return Some result
}

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
/// The first caller through the gate triggers a recomputation attempt if the previous attempt ended in failure
type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) =
let mutable cell = AsyncLazy workflow

/// Synchronously check the value remains valid (to short-circuit an Async AwaitValue step where value not required)
member __.IsValid() = cell.IsValid(?isExpired=isExpired)
/// Gets or asynchronously recomputes a cached value depending on expiry and availability
member __.AwaitValue() = async {
let cell = currentCell
let currentState = cell.PeekInternalTask
// If the last attempt completed, but failed, we need to treat it as expired
if currentState.IsValueCreated && initializationFailed currentState.Value then
return! update cell
else
let! current = cell.AwaitValue()
match isExpired with
| Some f when f current -> return! update cell
| _ -> return current
let current = cell
match! current.TryAwaitValid(?isExpired=isExpired) with
| Some res -> return res
| None ->
// avoid unnecessary recomputation in cases where competing threads detect expiry;
// the first write attempt wins, and everybody else reads off that value
let _ = System.Threading.Interlocked.CompareExchange(&cell, AsyncLazy workflow, current)
return! cell.AwaitValue()
}
2 changes: 1 addition & 1 deletion src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ type private ContainerWrapper(container : Container, ?initContainer : Container
let initGuard = initContainer |> Option.map (fun init -> AsyncCacheCell<unit>(init container))

member __.Container = container
member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None
member internal __.InitializationGate = match initGuard with Some g when not (g.IsValid()) -> Some g.AwaitValue | _ -> None

/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data
type Containers(categoryAndIdToDatabaseContainerStream : string -> string -> string*string*string, [<O; D(null)>]?disableInitialization) =
Expand Down
41 changes: 28 additions & 13 deletions tests/Equinox.Cosmos.Integration/CacheCellTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,39 @@

open Equinox.Core
open Swensen.Unquote
open System
open System.Threading
open Xunit
open System

[<Fact>]
let ``AsyncLazy correctness`` () = async {
// ensure that the encapsulated computation fires only once
let count = ref 0
let cell = AsyncLazy (async { return Interlocked.Increment count })
let! accessResult = [|1 .. 100|] |> Array.map (fun i -> cell.AwaitValue ()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 1) @> }
false =! cell.IsValid()
let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.AwaitValue()) |> Async.Parallel
true =! cell.IsValid()
test <@ accessResult |> Array.forall ((=) 1) @>
}

[<Fact>]
let ``AsyncCacheCell correctness`` () = async {
// ensure that the encapsulated computation fires only once and that expiry functions as expected
let state = ref 0
let expectedValue = ref 1
let cell = AsyncCacheCell (async { return Interlocked.Increment state }, fun value -> value <> !expectedValue)
false =! cell.IsValid()

let! accessResult = [|1 .. 100|] |> Array.map (fun _i -> cell.AwaitValue ()) |> Async.Parallel
let! accessResult = [|1 .. 100|] |> Array.map (fun _i -> cell.AwaitValue()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 1) @>
true =! cell.IsValid()

incr expectedValue

let! accessResult = [|1 .. 100|] |> Array.map (fun _i -> cell.AwaitValue ()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 2) @> }
let! accessResult = [|1 .. 100|] |> Array.map (fun _i -> cell.AwaitValue()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 2) @>
true =! cell.IsValid()
}

[<Theory; InlineData false; InlineData true>]
let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
Expand All @@ -44,33 +51,41 @@ let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
}

let cell = AsyncCacheCell (update, fun value -> value <> !expectedValue)
false =! cell.IsValid()

// If the runner is throwing, we want to be sure it doesn't place us in a failed state forever, per the semantics of Lazy<T>
// However, we _do_ want to be sure that the function only runs once
if initiallyThrowing then
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue () |> Async.Catch) |> Async.Parallel
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue() |> Async.Catch) |> Async.Parallel
test <@ accessResult |> Array.forall (function Choice2Of2 (:? InvalidOperationException) -> true | _ -> false) @>
throwing <- false
false =! cell.IsValid()
else
let! r = cell.AwaitValue()
true =! cell.IsValid()
test <@ 1 = r @>

incr expectedValue

let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.AwaitValue ()) |> Async.Parallel
let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.AwaitValue()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 2) @>
true =! cell.IsValid()

// invalidate the cached value
incr expectedValue
// but make the comptutation ultimately fail
false =! cell.IsValid()
// but make the computation ultimately fail
throwing <- true
// All share the failure
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue () |> Async.Catch) |> Async.Parallel
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue() |> Async.Catch) |> Async.Parallel
test <@ accessResult |> Array.forall (function Choice2Of2 (:? InvalidOperationException) -> true | _ -> false) @>
// Restore normality
throwing <- false
false =! cell.IsValid()

incr expectedValue

let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue ()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 4) @> }
let! accessResult = [|1 .. 10|] |> Array.map (fun _ -> cell.AwaitValue()) |> Async.Parallel
test <@ accessResult |> Array.forall ((=) 4) @>
true =! cell.IsValid()
}

0 comments on commit a4c94a3

Please sign in to comment.