Skip to content

Commit

Permalink
[Experimental] [WIP] Transparent Compiler (#15179)
Browse files Browse the repository at this point in the history
  • Loading branch information
0101 authored Jan 16, 2024
1 parent 1da7005 commit 2352770
Show file tree
Hide file tree
Showing 94 changed files with 10,694 additions and 2,128 deletions.
2 changes: 1 addition & 1 deletion eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
<MicroBuildCoreSentinelVersion>1.0.0</MicroBuildCoreSentinelVersion>
<MicroBuildPluginsSwixBuildVersion>1.1.33</MicroBuildPluginsSwixBuildVersion>
<!-- other packages -->
<BenchmarkDotNetVersion>0.13.2</BenchmarkDotNetVersion>
<BenchmarkDotNetVersion>0.13.10</BenchmarkDotNetVersion>
<FsCheckVersion>2.16.5</FsCheckVersion>
<FSharpDataTypeProvidersVersion>4.3.0.0</FSharpDataTypeProvidersVersion>
<MicrosoftCompositionVersion>1.0.31</MicrosoftCompositionVersion>
Expand Down
49 changes: 34 additions & 15 deletions src/Compiler/Driver/GraphChecking/Graph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,43 @@ module internal Graph =
|> Array.map (fun (KeyValue(k, v)) -> k, v)
|> readOnlyDict

let transitive<'Node when 'Node: equality> (graph: Graph<'Node>) : Graph<'Node> =
/// Find transitive dependencies of a single node.
let transitiveDeps (node: 'Node) =
let visited = HashSet<'Node>()
let nodes (graph: Graph<'Node>) : Set<'Node> =
graph.Values |> Seq.collect id |> Seq.append graph.Keys |> Set

/// Find transitive dependencies of a single node.
let transitiveDeps (node: 'Node) (graph: Graph<'Node>) =
let visited = HashSet<'Node>()

let rec dfs (node: 'Node) =
graph[node]
// Add direct dependencies.
// Use HashSet.Add return value semantics to filter out those that were added previously.
|> Array.filter visited.Add
|> Array.iter dfs
let rec dfs (node: 'Node) =
graph[node]
// Add direct dependencies.
// Use HashSet.Add return value semantics to filter out those that were added previously.
|> Array.filter visited.Add
|> Array.iter dfs

dfs node
visited |> Seq.toArray
dfs node
visited |> Seq.toArray

let transitive<'Node when 'Node: equality> (graph: Graph<'Node>) : Graph<'Node> =
graph.Keys
|> Seq.toArray
|> Array.Parallel.map (fun node -> node, transitiveDeps node)
|> Array.Parallel.map (fun node -> node, graph |> transitiveDeps node)
|> readOnlyDict

// TODO: optimize
/// Get subgraph of the given graph that contains only nodes that are reachable from the given node.
let subGraphFor node graph =
let allDeps = graph |> transitiveDeps node
let relevant n = n = node || allDeps |> Array.contains n

graph
|> Seq.choose (fun (KeyValue(src, deps)) ->
if relevant src then
Some(src, deps |> Array.filter relevant)
else
None)
|> make

/// Create a reverse of the graph
let reverse (originalGraph: Graph<'Node>) : Graph<'Node> =
originalGraph
Expand All @@ -69,7 +86,7 @@ module internal Graph =
let print (graph: Graph<'Node>) : unit =
printCustom graph (fun node -> node.ToString())

let serialiseToMermaid path (graph: Graph<FileIndex * string>) =
let serialiseToMermaid (graph: Graph<FileIndex * string>) =
let sb = StringBuilder()
let appendLine (line: string) = sb.AppendLine(line) |> ignore

Expand All @@ -84,8 +101,10 @@ module internal Graph =
appendLine $" %i{idx} --> %i{depIdx}"

appendLine "```"
sb.ToString()

let writeMermaidToFile path (graph: Graph<FileIndex * string>) =
use out =
FileSystem.OpenFileForWriteShim(path, fileMode = System.IO.FileMode.Create)

out.WriteAllText(sb.ToString())
graph |> serialiseToMermaid |> out.WriteAllText
8 changes: 7 additions & 1 deletion src/Compiler/Driver/GraphChecking/Graph.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ module internal Graph =
/// Build the graph.
val make: nodeDeps: seq<'Node * 'Node array> -> Graph<'Node> when 'Node: equality
val map<'T, 'U when 'U: equality> : f: ('T -> 'U) -> graph: Graph<'T> -> Graph<'U>
/// Get all nodes of the graph.
val nodes: graph: Graph<'Node> -> Set<'Node>
/// Create a transitive closure of the graph in O(n^2) time (but parallelize it).
/// The resulting graph contains edge A -> C iff the input graph contains a (directed) non-zero length path from A to C.
val transitive<'Node when 'Node: equality> : graph: Graph<'Node> -> Graph<'Node>
/// Get a sub-graph of the graph containing only the nodes reachable from the given node.
val subGraphFor: node: 'Node -> graph: Graph<'Node> -> Graph<'Node> when 'Node: equality
/// Create a reverse of the graph.
val reverse<'Node when 'Node: equality> : originalGraph: Graph<'Node> -> Graph<'Node>
/// Print the contents of the graph to the standard output.
val print: graph: Graph<'Node> -> unit
/// Create a simple Mermaid graph
val serialiseToMermaid: graph: Graph<FileIndex * string> -> string
/// Create a simple Mermaid graph and save it under the path specified.
val serialiseToMermaid: path: string -> graph: Graph<FileIndex * string> -> unit
val writeMermaidToFile: path: string -> graph: Graph<FileIndex * string> -> unit
136 changes: 135 additions & 1 deletion src/Compiler/Driver/GraphChecking/GraphProcessing.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module internal FSharp.Compiler.GraphChecking.GraphProcessing

open System.Threading
open FSharp.Compiler.GraphChecking
open System.Threading.Tasks
open System

/// Information about the node in a graph, describing its relation with other nodes.
type NodeInfo<'Item> =
Expand Down Expand Up @@ -32,6 +35,9 @@ type ProcessedNode<'Item, 'Result> =
Result: 'Result
}

type GraphProcessingException(msg, ex: System.Exception) =
inherit exn(msg, ex)

let processGraph<'Item, 'Result when 'Item: equality and 'Item: comparison>
(graph: Graph<'Item>)
(work: ('Item -> ProcessedNode<'Item, 'Result>) -> NodeInfo<'Item> -> 'Result)
Expand Down Expand Up @@ -150,7 +156,7 @@ let processGraph<'Item, 'Result when 'Item: equality and 'Item: comparison>
// If we stopped early due to an exception, reraise it.
match getExn () with
| None -> ()
| Some(item, ex) -> raise (System.Exception($"Encountered exception when processing item '{item}'", ex))
| Some(item, ex) -> raise (GraphProcessingException($"Encountered exception when processing item '{item}'", ex))

// All calculations succeeded - extract the results and sort in input order.
nodes.Values
Expand All @@ -162,3 +168,131 @@ let processGraph<'Item, 'Result when 'Item: equality and 'Item: comparison>
node.Info.Item, result)
|> Seq.sortBy fst
|> Seq.toArray

let processGraphAsync<'Item, 'Result when 'Item: equality and 'Item: comparison>
(graph: Graph<'Item>)
(work: ('Item -> ProcessedNode<'Item, 'Result>) -> NodeInfo<'Item> -> Async<'Result>)
: Async<('Item * 'Result)[]> =
async {
let transitiveDeps = graph |> Graph.transitive
let dependants = graph |> Graph.reverse
// Cancellation source used to signal either an exception in one of the items or end of processing.
let! parentCt = Async.CancellationToken
use localCts = new CancellationTokenSource()

let completionSignal = TaskCompletionSource()

use _ = parentCt.Register(fun () -> completionSignal.TrySetCanceled() |> ignore)

use cts = CancellationTokenSource.CreateLinkedTokenSource(parentCt, localCts.Token)

let makeNode (item: 'Item) : GraphNode<'Item, 'Result> =
let info =
let exists = graph.ContainsKey item

if
not exists
|| not (transitiveDeps.ContainsKey item)
|| not (dependants.ContainsKey item)
then
printfn $"Unexpected inconsistent state of the graph for item '{item}'"

{
Item = item
Deps = graph[item]
TransitiveDeps = transitiveDeps[item]
Dependants = dependants[item]
}

{
Info = info
Result = None
ProcessedDepsCount = IncrementableInt(0)
}

let nodes = graph.Keys |> Seq.map (fun item -> item, makeNode item) |> readOnlyDict

let lookupMany items =
items |> Array.map (fun item -> nodes[item])

let leaves =
nodes.Values |> Seq.filter (fun n -> n.Info.Deps.Length = 0) |> Seq.toArray

let getItemPublicNode item =
let node = nodes[item]

{
ProcessedNode.Info = node.Info
ProcessedNode.Result =
node.Result
|> Option.defaultWith (fun () -> failwith $"Results for item '{node.Info.Item}' are not yet available")
}

let processedCount = IncrementableInt(0)

let raiseExn (item, ex: exn) =
localCts.Cancel()

match ex with
| :? OperationCanceledException -> completionSignal.TrySetCanceled()
| _ ->
completionSignal.TrySetException(
GraphProcessingException($"[*] Encountered exception when processing item '{item}': {ex.Message}", ex)
)
|> ignore

let incrementProcessedNodesCount () =
if processedCount.Increment() = nodes.Count then
completionSignal.TrySetResult() |> ignore

let rec queueNode node =
Async.Start(
async {
let! res = processNode node |> Async.Catch

match res with
| Choice1Of2() -> ()
| Choice2Of2 ex -> raiseExn (node.Info.Item, ex)
},
cts.Token
)

and processNode (node: GraphNode<'Item, 'Result>) : Async<unit> =
async {

let info = node.Info

let! singleRes = work getItemPublicNode info
node.Result <- Some singleRes

let unblockedDependants =
node.Info.Dependants
|> lookupMany
// For every dependant, increment its number of processed dependencies,
// and filter dependants which now have all dependencies processed (but didn't before).
|> Array.filter (fun dependant ->
let pdc = dependant.ProcessedDepsCount.Increment()
// Note: We cannot read 'dependant.ProcessedDepsCount' again to avoid returning the same item multiple times.
pdc = dependant.Info.Deps.Length)

unblockedDependants |> Array.iter queueNode
incrementProcessedNodesCount ()
}

leaves |> Array.iter queueNode

// Wait for end of processing, an exception, or an external cancellation request.
do! completionSignal.Task |> Async.AwaitTask

// All calculations succeeded - extract the results and sort in input order.
return
nodes.Values
|> Seq.map (fun node ->
let result =
node.Result
|> Option.defaultWith (fun () -> failwith $"Unexpected lack of result for item '{node.Info.Item}'")

node.Info.Item, result)
|> Seq.sortBy fst
|> Seq.toArray
}
9 changes: 9 additions & 0 deletions src/Compiler/Driver/GraphChecking/GraphProcessing.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type ProcessedNode<'Item, 'Result> =
{ Info: NodeInfo<'Item>
Result: 'Result }

type GraphProcessingException =
inherit exn
new: msg: string * ex: System.Exception -> GraphProcessingException

/// <summary>
/// A generic method to generate results for a graph of work items in parallel.
/// Processes leaves first, and after each node has been processed, schedules any now unblocked dependants.
Expand All @@ -33,3 +37,8 @@ val processGraph<'Item, 'Result when 'Item: equality and 'Item: comparison> :
work: (('Item -> ProcessedNode<'Item, 'Result>) -> NodeInfo<'Item> -> 'Result) ->
parentCt: CancellationToken ->
('Item * 'Result)[]

val processGraphAsync<'Item, 'Result when 'Item: equality and 'Item: comparison> :
graph: Graph<'Item> ->
work: (('Item -> ProcessedNode<'Item, 'Result>) -> NodeInfo<'Item> -> Async<'Result>) ->
Async<('Item * 'Result)[]>
Loading

0 comments on commit 2352770

Please sign in to comment.