From 84cf9214c28acfb0251c7741902fe8ab6153c4e6 Mon Sep 17 00:00:00 2001 From: Sergio Alejandro Vargas Date: Mon, 1 Aug 2022 11:24:31 -0500 Subject: [PATCH] Format WorkspaceManager - Prefer quote blocks to colon quote expressions - Prefer if blocks to ternary operators - Remove single pipes - Remove trailing whitespace - Minor changes to docstrings and comments --- src/evaluation/WorkspaceManager.jl | 272 ++++++++++++++++------------- 1 file changed, 154 insertions(+), 118 deletions(-) diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index e972505e54..93f97eac9c 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -8,7 +8,11 @@ import ..Pluto.ExpressionExplorer: FunctionName import ..PlutoRunner import Distributed -"Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` (it cannot `eval` code in this case)." +""" +Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. +Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` +(it cannot `eval` code in this case). +""" Base.@kwdef mutable struct Workspace pid::Integer notebook_id::UUID @@ -22,21 +26,26 @@ Base.@kwdef mutable struct Workspace original_ACTIVE_PROJECT::Union{Nothing,String}=nothing end +const SN = Tuple{ServerSession, Notebook} + "These expressions get evaluated whenever a new `Workspace` process is created." -process_preamble() = quote +const process_preamble = quote ccall(:jl_exit_on_sigint, Cvoid, (Cint,), 0) include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) ENV["GKSwstype"] = "nul" ENV["JULIA_REVISE_WORKER_ONLY"] = "1" end +const Distributed_expr = quote + Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] +end + const workspaces = Dict{UUID,Task}() + "Set of notebook IDs that we will never make a process for again." const discarded_workspaces = Set{UUID}() -const SN = Tuple{ServerSession,Notebook} - -"""Create a workspace for the notebook, optionally in the main process.""" +"Create a workspace for the notebook, optionally in the main process." function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false)::Workspace is_offline_renderer || (notebook.process_status = ProcessStatus.starting) @@ -60,33 +69,34 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false pid end - Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id))) - + Distributed.remotecall_eval(Main, [pid], quote + PlutoRunner.notebook_id[] = $(notebook.notebook_id) + end) + remote_log_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(quote - channel = Channel{Any}(10) Main.PlutoRunner.setup_plutologger( - $($(notebook.notebook_id)), - channel; + $($(notebook.notebook_id)), + channel; make_global=$($(use_distributed)) ) - channel end), $pid) end) - + run_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid) end) + module_name = create_emptyworkspacemodule(pid) - + original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) - + workspace = Workspace(; pid, notebook_id=notebook.notebook_id, - remote_log_channel, + remote_log_channel, module_name, original_LOAD_PATH, original_ACTIVE_PROJECT, @@ -106,26 +116,22 @@ end function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) enabled = notebook.nbpkg_ctx !== nothing - if workspace.nbpkg_was_active == enabled - return - end - + workspace.nbpkg_was_active == enabled && return + workspace = workspace !== nothing ? workspace : get_workspace((session, notebook)) - if workspace.discarded - return - end - + workspace.discarded && return + workspace.nbpkg_was_active = enabled if workspace.pid != Distributed.myid() new_LP = enabled ? ["@", "@stdlib"] : workspace.original_LOAD_PATH new_AP = enabled ? PkgCompat.env_dir(notebook.nbpkg_ctx) : workspace.original_ACTIVE_PROJECT - + Distributed.remotecall_eval(Main, [workspace.pid], quote copy!(LOAD_PATH, $(new_LP)) Base.ACTIVE_PROJECT[] = $(new_AP) end) else - # uhmmmmmm TODO + # TODO end end @@ -146,10 +152,10 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel::Distr end function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.RemoteChannel) - update_throttled, flush_throttled = Pluto.throttled(0.1) do + update_throttled, flush_throttled = Pluto.throttled(0.1) do Pluto.send_notebook_changes!(Pluto.ClientRequest(session=session, notebook=notebook)) end - + while true try next_log::Dict{String,Any} = take!(log_channel) @@ -157,39 +163,37 @@ function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.R fn = next_log["file"] match = findfirst("#==#", fn) - # We always show the log at the currently running cell, which is given by + # Show the log at the currently running cell, which is given by running_cell_id = next_log["cell_id"]::UUID # Some logs originate from outside of the running code, through function calls. Some code here to deal with that: begin source_cell_id = if match !== nothing - # the log originated from within the notebook - + # The log originated from within the notebook UUID(fn[match[end]+1:end]) else - # the log originated from a function call defined outside of the notebook - - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function call defined outside of the notebook. + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 running_cell_id end - + if running_cell_id != source_cell_id - # the log originated from a function in another cell of the notebook - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function in another cell of the notebook + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 end end - + source_cell = get(notebook.cells_dict, source_cell_id, nothing) running_cell = get(notebook.cells_dict, running_cell_id, nothing) - + display_cell = if running_cell === nothing || (source_cell !== nothing && source_cell.output.has_pluto_hook_features) source_cell else running_cell end - + @assert !isnothing(display_cell) maybe_max_log = findfirst(((key, _),) -> key == "maxlog", next_log["kwargs"]) @@ -224,7 +228,7 @@ end "Call `cd(\$path)` inside the workspace. This is done when creating a workspace, and whenever the notebook changes path." function cd_workspace(workspace, path::AbstractString) eval_in_workspace(workspace, quote - cd($(path |> dirname)) + cd(dirname($path)) end) end @@ -239,21 +243,18 @@ end function possible_bond_values(session_notebook::SN, n::Symbol; get_length::Bool=false) workspace = get_workspace(session_notebook) - pid = workspace.pid - Distributed.remotecall_eval(Main, pid, quote + Distributed.remotecall_eval(Main, workspace.pid, quote PlutoRunner.possible_bond_values($(QuoteNode(n)); get_length=$(get_length)) end) end function create_emptyworkspacemodule(pid::Integer)::Symbol - Distributed.remotecall_eval(Main, pid, :(PlutoRunner.increment_current_module())) + Distributed.remotecall_eval(Main, pid, quote + PlutoRunner.increment_current_module() + end) end -const Distributed_expr = :( - Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] -) - # NOTE: this function only start a worker process using given # compiler options, it does not resolve paths for notebooks # compiler configurations passed to it should be resolved before this @@ -264,15 +265,16 @@ function create_workspaceprocess(;compiler_options=CompilerOptions())::Integer $(Distributed_expr).addprocs(1; exeflags=$(_convert_to_flags(compiler_options))) |> first end) - Distributed.remotecall_eval(Main, [pid], process_preamble()) + Distributed.remotecall_eval(Main, [pid], process_preamble) # so that we NEVER break the workspace with an interrupt 🤕 - @async Distributed.remotecall_eval(Main, [pid], - :(while true + @async Distributed.remotecall_eval(Main, [pid], quote + while true try wait() catch end - end)) + end + end) pid end @@ -288,15 +290,16 @@ function get_workspace(session_notebook::SN; allow_creation::Bool=true)::Union{N @debug "This should not happen" notebook.process_status error("Cannot run code in this notebook: it has already shut down.") end - - task = !allow_creation ? - get(workspaces, notebook.notebook_id, nothing) : + + task = if !allow_creation + get(workspaces, notebook.notebook_id, nothing) + else get!(workspaces, notebook.notebook_id) do Task(() -> make_workspace(session_notebook)) end - + end + isnothing(task) && return nothing - istaskstarted(task) || schedule(task) fetch(task) end @@ -343,15 +346,12 @@ function distributed_exception_result(ex::Base.IOError, workspace::Workspace) ) end - - function distributed_exception_result(exs::CompositeException, workspace::Workspace) - ex = exs.exceptions |> first + ex = first(exs.exceptions) if ex isa Distributed.RemoteException && ex.pid == workspace.pid && ex.captured.ex isa InterruptException - ( output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])), errored=true, @@ -386,9 +386,12 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp end -"Evaluate expression inside the workspace - output is fetched and formatted, errors are caught and formatted. Returns formatted output and error flags. +""" +Evaluate expression inside the workspace - output is fetched and formatted, +errors are caught and formatted. Returns formatted output and error flags. -`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`." +`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`. +""" function eval_format_fetch_in_workspace( session_notebook::Union{SN,Workspace}, expr::Expr, @@ -402,7 +405,7 @@ function eval_format_fetch_in_workspace( )::PlutoRunner.FormattedCellResult workspace = get_workspace(session_notebook) - + is_on_this_process = workspace.pid == Distributed.myid() # if multiple notebooks run on the same process, then we need to `cd` between the different notebook paths @@ -412,41 +415,47 @@ function eval_format_fetch_in_workspace( end use_nbpkg_environment(session_notebook, workspace) end - - # run the code 🏃‍♀️ - - # a try block (on this process) to catch an InterruptException + + # Run the code 🏃 + + # A try block (on this process) to catch an InterruptException take!(workspace.dowork_token) early_result = try - # we use [pid] instead of pid to prevent fetching output - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression( - getfield(Main, $(QuoteNode(workspace.module_name))), - $(QuoteNode(expr)), - $(workspace.notebook_id), - $cell_id, - $function_wrapped_info, - $forced_expr_id; - user_requested_run=$user_requested_run, - capture_stdout=$(capture_stdout && !is_on_this_process), - ))) + # Use [pid] instead of pid to prevent fetching output + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.run_expression( + getfield(Main, $(QuoteNode(workspace.module_name))), + $(QuoteNode(expr)), + $(workspace.notebook_id), + $cell_id, + $function_wrapped_info, + $forced_expr_id; + user_requested_run=$user_requested_run, + capture_stdout=$(capture_stdout && !is_on_this_process), + ) + end) put!(workspace.dowork_token) nothing - catch exs - # We don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. + catch e + # Don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. put!(workspace.dowork_token) - distributed_exception_result(exs, workspace) + distributed_exception_result(e, workspace) end - early_result === nothing ? - format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) : + if early_result === nothing + format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) + else early_result + end end "Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use." function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - - Distributed.remotecall_eval(Main, [workspace.pid], :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + + Distributed.remotecall_eval(Main, [workspace.pid], quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) nothing end @@ -459,42 +468,42 @@ function format_fetch_in_workspace( )::PlutoRunner.FormattedCellResult workspace = get_workspace(session_notebook) - # instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output. + # Instead of fetching the output value (which might not make sense in our context, + # since the user can define structs, types, functions, etc), + # we format the cell output on the worker, and fetch the formatted output. withtoken(workspace.dowork_token) do try - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.formatted_result_of( - $(workspace.notebook_id), - $cell_id, - $ends_with_semicolon, - $known_published_objects, - $showmore_id, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) - catch ex - distributed_exception_result(CompositeException([ex]), workspace) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.formatted_result_of( + $(workspace.notebook_id), + $cell_id, + $ends_with_semicolon, + $known_published_objects, + $showmore_id, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) + catch e + distributed_exception_result(CompositeException([e]), workspace) end end end function collect_soft_definitions(session_notebook::SN, modules::Set{Expr}) workspace = get_workspace(session_notebook) - module_name = workspace.module_name - ex = quote - PlutoRunner.collect_soft_definitions($module_name, $modules) - end - - Distributed.remotecall_eval(Main, workspace.pid, ex) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.collect_soft_definitions($(workspace.module_name), $modules) + end) end - function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macrocall, cell_uuid, module_name = nothing)::Tuple{Bool, Any} workspace = get_workspace(session_notebook) module_name = module_name === nothing ? workspace.module_name : module_name Distributed.remotecall_eval(Main, workspace.pid, quote try - (true, PlutoRunner.try_macroexpand($(module_name), $(cell_uuid), $(macrocall |> QuoteNode))) + (true, PlutoRunner.try_macroexpand($module_name, $cell_uuid, $(QuoteNode(macrocall)))) catch error # We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments. # which normally would be very useful for debugging, but we can't serialize it! @@ -511,26 +520,51 @@ end "Evaluate expression inside the workspace - output is returned. For internal use." function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - - Distributed.remotecall_eval(Main, workspace.pid, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + + Distributed.remotecall_eval(Main, workspace.pid, quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) end function do_reimports(session_notebook::Union{SN,Workspace}, module_imports_to_move::Set{Expr}) workspace = get_workspace(session_notebook) - workspace_name = workspace.module_name - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.do_reimports($(workspace_name), $module_imports_to_move))) + + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.do_reimports($(workspace.module_name), $module_imports_to_move) + end) end -"Move variables to a new module. A given set of variables to be 'deleted' will not be moved to the new module, making them unavailable. " -function move_vars(session_notebook::Union{SN,Workspace}, old_workspace_name::Symbol, new_workspace_name::Union{Nothing,Symbol}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) +""" +Move variables to a new module. A given set of variables to be 'deleted' will +not be moved to the new module, making them unavailable. +""" +function move_vars( + session_notebook::Union{SN,Workspace}, + old_workspace_name::Symbol, + new_workspace_name::Union{Nothing,Symbol}, + to_delete::Set{Symbol}, + methods_to_delete::Set{Tuple{UUID,FunctionName}}, + module_imports_to_move::Set{Expr}, + invalidated_cell_uuids::Set{UUID}; + kwargs...) + workspace = get_workspace(session_notebook) new_workspace_name = something(new_workspace_name, workspace.module_name) - - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move, $invalidated_cell_uuids))) + + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.move_vars( + $(QuoteNode(old_workspace_name)), + $(QuoteNode(new_workspace_name)), + $to_delete, + $methods_to_delete, + $module_imports_to_move, + $invalidated_cell_uuids, + ) + end) end move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) = -move_vars(session_notebook, bump_workspace_module(session_notebook)..., to_delete, methods_to_delete, module_imports_to_move, invalidated_cell_uuids; kwargs...) + move_vars(session_notebook, bump_workspace_module(session_notebook)..., to_delete, methods_to_delete, module_imports_to_move, invalidated_cell_uuids; kwargs...) # TODO: delete me @deprecate( @@ -586,7 +620,7 @@ end "Force interrupt (SIGINT) a workspace, return whether successful" function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true)::Bool workspace = get_workspace(session_notebook; allow_creation=false) - + if !(workspace isa Workspace) # verbose && @info "Can't interrupt this notebook: it is not running." return false @@ -603,10 +637,12 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true https://docs.microsoft.com/en-us/windows/wsl" return false end + if workspace.pid == Distributed.myid() verbose && @warn """Cells in this workspace can't be stopped, because it is not running in a separate workspace. Use `ENV["PLUTO_WORKSPACE_USE_DISTRIBUTED"]` to control whether future workspaces are generated in a separate process.""" return false end + if isready(workspace.dowork_token) verbose && @info "Tried to stop idle workspace - ignoring." return true @@ -626,7 +662,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true end verbose && println("Still running... starting sequence") - while !isready(workspace.dowork_token) + while !isready(workspace.dowork_token) for _ in 1:5 verbose && print(" 🔥 ") Distributed.interrupt(workspace.pid) @@ -640,10 +676,10 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true verbose && println() verbose && println("Cell interrupted!") true - catch ex - if !(ex isa KeyError) + catch e + if !(e isa KeyError) @warn "Interrupt failed for unknown reason" - showerror(ex, stacktrace(catch_backtrace())) + showerror(e, stacktrace(catch_backtrace())) end false end