From d44fd7d50d51ead533db234ce8aa6eda021c7e21 Mon Sep 17 00:00:00 2001 From: Sergio Alejandro Vargas Date: Mon, 1 Aug 2022 11:24:31 -0500 Subject: [PATCH] Format WorkspaceManager - Prefer quote blocks to colon quote expressions - Prefer if blocks to ternary operators - Remove single pipes - Minor changes to docstrings and comments --- src/evaluation/WorkspaceManager.jl | 232 +++++++++++++++++------------ 1 file changed, 134 insertions(+), 98 deletions(-) diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index e34d82df87..0fc39ff0e2 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -8,7 +8,11 @@ import ..Pluto.ExpressionExplorer: FunctionName import ..PlutoRunner import Distributed -"Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` (it cannot `eval` code in this case)." +""" +Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. +Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` +(it cannot `eval` code in this case). +""" Base.@kwdef mutable struct Workspace pid::Integer notebook_id::UUID @@ -22,21 +26,26 @@ Base.@kwdef mutable struct Workspace original_ACTIVE_PROJECT::Union{Nothing,String}=nothing end +const SN = Tuple{ServerSession, Notebook} + "These expressions get evaluated whenever a new `Workspace` process is created." -process_preamble() = quote +const process_preamble = quote ccall(:jl_exit_on_sigint, Cvoid, (Cint,), 0) include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) ENV["GKSwstype"] = "nul" ENV["JULIA_REVISE_WORKER_ONLY"] = "1" end +const Distributed_expr = quote + Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] +end + const workspaces = Dict{UUID,Task}() + "Set of notebook IDs that we will never make a process for again." const discarded_workspaces = Set{UUID}() -const SN = Tuple{ServerSession,Notebook} - -"""Create a workspace for the notebook, optionally in the main process.""" +"Create a workspace for the notebook, optionally in the main process." function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false)::Workspace is_offline_renderer || (notebook.process_status = ProcessStatus.starting) @@ -60,18 +69,18 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false pid end - Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id))) + Distributed.remotecall_eval(Main, [pid], quote + PlutoRunner.notebook_id[] = $(notebook.notebook_id) + end) remote_log_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(quote - channel = Channel{Any}(10) Main.PlutoRunner.setup_plutologger( - $($(notebook.notebook_id)), - channel; + $($(notebook.notebook_id)), + channel; make_global=$($(use_distributed)) ) - channel end), $pid) end) @@ -79,6 +88,7 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false run_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid) end) + module_name = create_emptyworkspacemodule(pid) original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) @@ -106,14 +116,10 @@ end function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) enabled = notebook.nbpkg_ctx !== nothing - if workspace.nbpkg_was_active == enabled - return - end + workspace.nbpkg_was_active == enabled && return workspace = workspace !== nothing ? workspace : get_workspace((session, notebook)) - if workspace.discarded - return - end + workspace.discarded && return workspace.nbpkg_was_active = enabled if workspace.pid != Distributed.myid() @@ -125,7 +131,7 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) Base.ACTIVE_PROJECT[] = $(new_AP) end) else - # uhmmmmmm TODO + # TODO end end @@ -157,26 +163,24 @@ function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.R fn = next_log["file"] match = findfirst("#==#", fn) - # We always show the log at the currently running cell, which is given by + # Show the log at the currently running cell, which is given by running_cell_id = next_log["cell_id"]::UUID # Some logs originate from outside of the running code, through function calls. Some code here to deal with that: begin source_cell_id = if match !== nothing - # the log originated from within the notebook - + # The log originated from within the notebook UUID(fn[match[end]+1:end]) else - # the log originated from a function call defined outside of the notebook - - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function call defined outside of the notebook. + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 running_cell_id end if running_cell_id != source_cell_id - # the log originated from a function in another cell of the notebook - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function in another cell of the notebook + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 end end @@ -224,7 +228,7 @@ end "Call `cd(\$path)` inside the workspace. This is done when creating a workspace, and whenever the notebook changes path." function cd_workspace(workspace, path::AbstractString) eval_in_workspace(workspace, quote - cd($(path |> dirname)) + cd(dirname($path)) end) end @@ -239,21 +243,18 @@ end function possible_bond_values(session_notebook::SN, n::Symbol; get_length::Bool=false) workspace = get_workspace(session_notebook) - pid = workspace.pid - Distributed.remotecall_eval(Main, pid, quote + Distributed.remotecall_eval(Main, workspace.pid, quote PlutoRunner.possible_bond_values($(QuoteNode(n)); get_length=$(get_length)) end) end function create_emptyworkspacemodule(pid::Integer)::Symbol - Distributed.remotecall_eval(Main, pid, :(PlutoRunner.increment_current_module())) + Distributed.remotecall_eval(Main, pid, quote + PlutoRunner.increment_current_module() + end) end -const Distributed_expr = :( - Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] -) - # NOTE: this function only start a worker process using given # compiler options, it does not resolve paths for notebooks # compiler configurations passed to it should be resolved before this @@ -264,15 +265,16 @@ function create_workspaceprocess(; compiler_options=CompilerOptions())::Integer $(Distributed_expr).addprocs(1; exeflags=$(_convert_to_flags(compiler_options))) |> first end) - Distributed.remotecall_eval(Main, [pid], process_preamble()) + Distributed.remotecall_eval(Main, [pid], process_preamble) # so that we NEVER break the workspace with an interrupt 🤕 - @async Distributed.remotecall_eval(Main, [pid], - :(while true + @async Distributed.remotecall_eval(Main, [pid], quote + while true try wait() catch end - end)) + end + end) pid end @@ -289,14 +291,15 @@ function get_workspace(session_notebook::SN; allow_creation::Bool=true)::Union{N error("Cannot run code in this notebook: it has already shut down.") end - task = !allow_creation ? - get(workspaces, notebook.notebook_id, nothing) : - get!(workspaces, notebook.notebook_id) do - Task(() -> make_workspace(session_notebook)) + task = if !allow_creation + get(workspaces, notebook.notebook_id, nothing) + else + get!(workspaces, notebook.notebook_id) do + Task(() -> make_workspace(session_notebook)) + end end isnothing(task) && return nothing - istaskstarted(task) || schedule(task) fetch(task) end @@ -343,15 +346,12 @@ function distributed_exception_result(ex::Base.IOError, workspace::Workspace) ) end - - function distributed_exception_result(exs::CompositeException, workspace::Workspace) - ex = exs.exceptions |> first + ex = first(exs.exceptions) if ex isa Distributed.RemoteException && - ex.pid == workspace.pid && - ex.captured.ex isa InterruptException - + ex.pid == workspace.pid && + ex.captured.ex isa InterruptException ( output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])), errored=true, @@ -386,9 +386,12 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp end -"Evaluate expression inside the workspace - output is fetched and formatted, errors are caught and formatted. Returns formatted output and error flags. +""" +Evaluate expression inside the workspace - output is fetched and formatted, +errors are caught and formatted. Returns formatted output and error flags. -`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`." +`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`. +""" function eval_format_fetch_in_workspace( session_notebook::Union{SN,Workspace}, expr::Expr, @@ -413,40 +416,46 @@ function eval_format_fetch_in_workspace( use_nbpkg_environment(session_notebook, workspace) end - # run the code 🏃‍♀️ + # Run the code 🏃 - # a try block (on this process) to catch an InterruptException + # A try block (on this process) to catch an InterruptException take!(workspace.dowork_token) early_result = try - # we use [pid] instead of pid to prevent fetching output - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression( - getfield(Main, $(QuoteNode(workspace.module_name))), - $(QuoteNode(expr)), - $(workspace.notebook_id), - $cell_id, - $function_wrapped_info, - $forced_expr_id; - user_requested_run=$user_requested_run, - capture_stdout=$(capture_stdout && !is_on_this_process), - ))) + # Use [pid] instead of pid to prevent fetching output + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.run_expression( + getfield(Main, $(QuoteNode(workspace.module_name))), + $(QuoteNode(expr)), + $(workspace.notebook_id), + $cell_id, + $function_wrapped_info, + $forced_expr_id; + user_requested_run=$user_requested_run, + capture_stdout=$(capture_stdout && !is_on_this_process), + ) + end) put!(workspace.dowork_token) nothing - catch exs - # We don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. + catch e + # Don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. put!(workspace.dowork_token) - distributed_exception_result(exs, workspace) + distributed_exception_result(e, workspace) end - early_result === nothing ? - format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) : - early_result + if early_result === nothing + format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) + else + early_result + end end "Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use." function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, [workspace.pid], :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + Distributed.remotecall_eval(Main, [workspace.pid], quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) nothing end @@ -459,42 +468,42 @@ function format_fetch_in_workspace( )::PlutoRunner.FormattedCellResult workspace = get_workspace(session_notebook) - # instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output. + # Instead of fetching the output value (which might not make sense in our context, + # since the user can define structs, types, functions, etc), + # we format the cell output on the worker, and fetch the formatted output. withtoken(workspace.dowork_token) do try - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.formatted_result_of( - $(workspace.notebook_id), - $cell_id, - $ends_with_semicolon, - $known_published_objects, - $showmore_id, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) - catch ex - distributed_exception_result(CompositeException([ex]), workspace) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.formatted_result_of( + $(workspace.notebook_id), + $cell_id, + $ends_with_semicolon, + $known_published_objects, + $showmore_id, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) + catch e + distributed_exception_result(CompositeException([e]), workspace) end end end function collect_soft_definitions(session_notebook::SN, modules::Set{Expr}) workspace = get_workspace(session_notebook) - module_name = workspace.module_name - - ex = quote - PlutoRunner.collect_soft_definitions($module_name, $modules) - end - Distributed.remotecall_eval(Main, workspace.pid, ex) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.collect_soft_definitions($(workspace.module_name), $modules) + end) end - function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macrocall, cell_id, module_name=nothing)::Tuple{Bool,Any} workspace = get_workspace(session_notebook) module_name = module_name === nothing ? workspace.module_name : module_name Distributed.remotecall_eval(Main, workspace.pid, quote try - (true, PlutoRunner.try_macroexpand($(module_name), $(workspace.notebook_id), $(cell_id), $(macrocall |> QuoteNode))) + (true, PlutoRunner.try_macroexpand($module_name, $(workspace.notebook_id), $cell_id, $(QuoteNode(macrocall)))) catch error # We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments. # which normally would be very useful for debugging, but we can't serialize it! @@ -512,21 +521,46 @@ end function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + Distributed.remotecall_eval(Main, workspace.pid, quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) end function do_reimports(session_notebook::Union{SN,Workspace}, module_imports_to_move::Set{Expr}) workspace = get_workspace(session_notebook) - workspace_name = workspace.module_name - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.do_reimports($(workspace_name), $module_imports_to_move))) + + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.do_reimports($(workspace.module_name), $module_imports_to_move) + end) end -"Move variables to a new module. A given set of variables to be 'deleted' will not be moved to the new module, making them unavailable. " -function move_vars(session_notebook::Union{SN,Workspace}, old_workspace_name::Symbol, new_workspace_name::Union{Nothing,Symbol}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) +""" +Move variables to a new module. A given set of variables to be 'deleted' will +not be moved to the new module, making them unavailable. +""" +function move_vars( + session_notebook::Union{SN,Workspace}, + old_workspace_name::Symbol, + new_workspace_name::Union{Nothing,Symbol}, + to_delete::Set{Symbol}, + methods_to_delete::Set{Tuple{UUID,FunctionName}}, + module_imports_to_move::Set{Expr}, + invalidated_cell_uuids::Set{UUID}; + kwargs...) + workspace = get_workspace(session_notebook) new_workspace_name = something(new_workspace_name, workspace.module_name) - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move, $invalidated_cell_uuids))) + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.move_vars( + $(QuoteNode(old_workspace_name)), + $(QuoteNode(new_workspace_name)), + $to_delete, + $methods_to_delete, + $module_imports_to_move, + $invalidated_cell_uuids, + ) + end) end move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) = @@ -603,10 +637,12 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true https://docs.microsoft.com/en-us/windows/wsl" return false end + if workspace.pid == Distributed.myid() verbose && @warn """Cells in this workspace can't be stopped, because it is not running in a separate workspace. Use `ENV["PLUTO_WORKSPACE_USE_DISTRIBUTED"]` to control whether future workspaces are generated in a separate process.""" return false end + if isready(workspace.dowork_token) verbose && @info "Tried to stop idle workspace - ignoring." return true @@ -640,10 +676,10 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true verbose && println() verbose && println("Cell interrupted!") true - catch ex - if !(ex isa KeyError) + catch e + if !(e isa KeyError) @warn "Interrupt failed for unknown reason" - showerror(ex, stacktrace(catch_backtrace())) + showerror(e, stacktrace(catch_backtrace())) end false end