From 58c02e2ce4117369951d2ad85cdd089007d6996c Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Fri, 24 Aug 2018 16:29:00 -0400 Subject: [PATCH] spawn: supercharge the API - Permit IOBuffer as an input/output, and automatically create a Pipe to feed to/from it. - Improved handling for using CmdRedirect on fd >= 3 - Allow any number of stdio handles to be passed to the child (including zero), not just precisely 0-2 - Clearer ownership expectations (less code duplication) in `_spawn` calls, ensuring we always call `setup_stdio` early and exactly once - Stop capturing all IO handles in Process.in/.out/.err Since usually we only have the child side of the handle at the point we were trying to set this, while we want this to be the parent handle. Instead, we now handle this at the caller level in the `open` method, which has better information and intent given for this. (To preserve the old behavior, we also include the old heuristic in `run` for setting these handles.) - Ensure that most of the code won't need to be specialized on every (stdin, stdout, stderr) tuple combination that gets used - Implement `open(::OS_HANDLE)` for taking ownership of a raw `fd` handle, and the corresponding constructors `PipeEndpoint(::OS_HANDLE)` and `TCP(::OS_HANDLE)`. (such as may have been passed via CmdRedirect). --- base/process.jl | 431 ++++++++++++++++++++-------------- base/stream.jl | 91 ++++++- src/init.c | 14 +- src/jl_uv.c | 18 -- stdlib/Sockets/src/Sockets.jl | 14 +- test/spawn.jl | 12 +- test/testhelpers/FakePTYs.jl | 2 +- 7 files changed, 352 insertions(+), 230 deletions(-) diff --git a/base/process.jl b/base/process.jl index a61046d6fdd63..655d6157073c2 100644 --- a/base/process.jl +++ b/base/process.jl @@ -135,9 +135,10 @@ const STDOUT_NO = 1 const STDERR_NO = 2 struct FileRedirect - filename::AbstractString + filename::String append::Bool - function FileRedirect(filename, append) + FileRedirect(filename::AbstractString, append::Bool) = FileRedirect(convert(String, filename), append) + function FileRedirect(filename::String, append::Bool) if lowercase(filename) == (@static Sys.iswindows() ? "nul" : "/dev/null") @warn "For portability use devnull instead of a file redirect" maxlog=1 end @@ -145,6 +146,8 @@ struct FileRedirect end end +# setup_stdio ≈ cconvert +# rawhandle ≈ unsafe_convert rawhandle(::DevNull) = C_NULL rawhandle(x::OS_HANDLE) = x if OS_HANDLE !== RawFD @@ -158,19 +161,24 @@ struct CmdRedirect <: AbstractCmd cmd::AbstractCmd handle::Redirectable stream_no::Int + readable::Bool end +CmdRedirect(cmd, handle, stream_no) = CmdRedirect(cmd, handle, stream_no, stream_no == STDIN_NO) function show(io::IO, cr::CmdRedirect) print(io, "pipeline(") show(io, cr.cmd) print(io, ", ") if cr.stream_no == STDOUT_NO - print(io, "stdout=") + print(io, "stdout") elseif cr.stream_no == STDERR_NO - print(io, "stderr=") + print(io, "stderr") elseif cr.stream_no == STDIN_NO - print(io, "stdin=") + print(io, "stdin") + else + print(io, cr.stream_no) end + print(io, readable ? "<" : ">") show(io, cr.handle) print(io, ")") end @@ -294,7 +302,7 @@ run(pipeline(`ls`, "out.txt")) run(pipeline("out.txt", `grep xyz`)) ``` """ -pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...) +pipeline(a, b, c, d...) = pipeline(pipeline(a, b), c, d...) mutable struct Process <: AbstractPipe cmd::Cmd @@ -306,20 +314,8 @@ mutable struct Process <: AbstractPipe termsignal::Int32 exitnotify::Condition closenotify::Condition - function Process(cmd::Cmd, handle::Ptr{Cvoid}, - in::Union{Redirectable, Ptr{Cvoid}}, - out::Union{Redirectable, Ptr{Cvoid}}, - err::Union{Redirectable, Ptr{Cvoid}}) - if !isa(in, IO) - in = devnull - end - if !isa(out, IO) - out = devnull - end - if !isa(err, IO) - err = devnull - end - this = new(cmd, handle, in, out, err, + function Process(cmd::Cmd, handle::Ptr{Cvoid}) + this = new(cmd, handle, devnull, devnull, devnull, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), Condition(), Condition()) @@ -330,45 +326,21 @@ end pipe_reader(p::Process) = p.out pipe_writer(p::Process) = p.in -struct ProcessChain <: AbstractPipe +# Represents a whole pipeline of any number of related processes +# so the entire pipeline can be treated as one entity +mutable struct ProcessChain <: AbstractPipe processes::Vector{Process} - in::Redirectable - out::Redirectable - err::Redirectable - ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3]) + in::IO + out::IO + err::IO + function ProcessChain() + return new(Process[], devnull, devnull, devnull) + end end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in -function _jl_spawn(file, argv, cmd::Cmd, stdio) - loop = eventloop() - handles = Tuple{Cint, UInt}[ # assuming little-endian layout - let h = rawhandle(io) - h === C_NULL ? (0x00, UInt(0)) : - h isa OS_HANDLE ? (0x02, UInt(cconvert(@static(Sys.iswindows() ? Ptr{Cvoid} : Cint), h))) : - h isa Ptr{Cvoid} ? (0x04, UInt(h)) : - error("invalid spawn handle $h from $io") - end - for io in stdio] - proc = Libc.malloc(_sizeof_uv_process) - disassociate_julia_struct(proc) - error = ccall(:jl_spawn, Int32, - (Cstring, Ptr{Cstring}, Ptr{Cvoid}, Ptr{Cvoid}, - Ptr{Tuple{Cint, UInt}}, Int, - UInt32, Ptr{Cstring}, Cstring, Ptr{Cvoid}), - file, argv, loop, proc, - handles, length(handles), - cmd.flags, - cmd.env === nothing ? C_NULL : cmd.env, - isempty(cmd.dir) ? C_NULL : cmd.dir, - uv_jl_return_spawn::Ptr{Cvoid}) - if error != 0 - ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), proc) - throw(_UVError("could not spawn " * string(cmd), error)) - end - return proc -end - +# release ownership of the libuv handle function uvfinalize(proc::Process) if proc.handle != C_NULL disassociate_julia_struct(proc.handle) @@ -378,6 +350,7 @@ function uvfinalize(proc::Process) nothing end +# called when the process dies function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) data = ccall(:jl_uv_process_data, Ptr{Cvoid}, (Ptr{Cvoid},), p) data == C_NULL && return @@ -390,27 +363,93 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) nothing end +# called when the libuv handle is destroyed function _uv_hook_close(proc::Process) proc.handle = C_NULL notify(proc.closenotify) + nothing end -function _spawn(redirect::CmdRedirect, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - _spawn(redirect.cmd, - (redirect.stream_no == STDIN_NO ? redirect.handle : stdios[1], - redirect.stream_no == STDOUT_NO ? redirect.handle : stdios[2], - redirect.stream_no == STDERR_NO ? redirect.handle : stdios[3]), - chain=chain) +const SpawnIOs = Vector{Any} # convenience name for readability + +# handle marshalling of `Cmd` arguments from Julia to C +@noinline function _spawn_primitive(file, cmd::Cmd, stdio::SpawnIOs) + loop = eventloop() + iohandles = Tuple{Cint, UInt}[ # assuming little-endian layout + let h = rawhandle(io) + h === C_NULL ? (0x00, UInt(0)) : + h isa OS_HANDLE ? (0x02, UInt(cconvert(@static(Sys.iswindows() ? Ptr{Cvoid} : Cint), h))) : + h isa Ptr{Cvoid} ? (0x04, UInt(h)) : + error("invalid spawn handle $h from $io") + end + for io in stdio] + handle = Libc.malloc(_sizeof_uv_process) + disassociate_julia_struct(handle) # ensure that data field is set to C_NULL + error = ccall(:jl_spawn, Int32, + (Cstring, Ptr{Cstring}, Ptr{Cvoid}, Ptr{Cvoid}, + Ptr{Tuple{Cint, UInt}}, Int, + UInt32, Ptr{Cstring}, Cstring, Ptr{Cvoid}), + file, cmd.exec, loop, handle, + iohandles, length(iohandles), + cmd.flags, + cmd.env === nothing ? C_NULL : cmd.env, + isempty(cmd.dir) ? C_NULL : cmd.dir, + uv_jl_return_spawn::Ptr{Cvoid}) + if error != 0 + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually + throw(_UVError("could not spawn " * repr(cmd), error)) + end + pp = Process(cmd, handle) + associate_julia_struct(handle, pp) + return pp +end + +_spawn(cmds::AbstractCmd) = _spawn(cmds, Any[]) + +# optimization: we can spawn `Cmd` directly without allocating the ProcessChain +function _spawn(cmd::Cmd, stdios::SpawnIOs) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + pp = setup_stdios(stdios) do stdios + return _spawn_primitive(cmd.exec[1], cmd, stdios) + end + return pp end -function _spawn(cmds::OrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) +# assume that having a ProcessChain means that the stdio are setup +function _spawn(cmds::AbstractCmd, stdios::SpawnIOs) + pp = setup_stdios(stdios) do stdios + return _spawn(cmds, stdios, ProcessChain()) end + return pp +end + +# helper function for making a copy of a SpawnIOs, with replacement +function _stdio_copy(stdios::SpawnIOs, fd::Int, @nospecialize replace) + nio = max(fd, length(stdios)) + new = SpawnIOs(undef, nio) + copyto!(fill!(new, devnull), stdios) + new[fd] = replace + return new +end + +function _spawn(redirect::CmdRedirect, stdios::SpawnIOs, args...) + fdnum = redirect.stream_no + 1 + io, close_io = setup_stdio(redirect.handle, redirect.readable) + try + stdios = _stdio_copy(stdios, fdnum, io) + return _spawn(redirect.cmd, stdios, args...) + finally + close_io && close_stdio(io) + end +end + +function _spawn(cmds::OrCmds, stdios::SpawnIOs, chain::ProcessChain) in_pipe, out_pipe = link_pipe(false, false) try - _spawn(cmds.a, (stdios[1], out_pipe, stdios[3]), chain=chain) - _spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), chain=chain) + stdios_left = _stdio_copy(stdios, 2, out_pipe) + _spawn(cmds.a, stdios_left, chain) + stdios_right = _stdio_copy(stdios, 1, in_pipe) + _spawn(cmds.b, stdios_right, chain) finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) @@ -418,14 +457,13 @@ function _spawn(cmds::OrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothi return chain end -function _spawn(cmds::ErrOrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) - end +function _spawn(cmds::ErrOrCmds, stdios::SpawnIOs, chain::ProcessChain) in_pipe, out_pipe = link_pipe(false, false) try - _spawn(cmds.a, (stdios[1], stdios[2], out_pipe), chain=chain) - _spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), chain=chain) + stdios_left = _stdio_copy(stdios, 3, out_pipe) + _spawn(cmds.a, stdios_left, chain) + stdios_right = _stdio_copy(stdios, 1, in_pipe) + _spawn(cmds.b, stdios_right, chain) finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) @@ -433,12 +471,55 @@ function _spawn(cmds::ErrOrCmds, stdios::StdIOSet; chain::Union{ProcessChain, No return chain end +function _spawn(cmds::AndCmds, stdios::SpawnIOs, chain::ProcessChain) + _spawn(cmds.a, stdios, chain) + _spawn(cmds.b, stdios, chain) + return chain +end + +function _spawn(cmd::Cmd, stdios::SpawnIOs, chain::ProcessChain) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + pp = _spawn_primitive(cmd.exec[1], cmd, stdios) + push!(chain.processes, pp) + return chain +end + + +# open the child end of each element of `stdios`, and initialize the parent end +function setup_stdios(f, stdios::SpawnIOs) + nstdio = length(stdios) + open_io = Vector{Any}(undef, nstdio) + close_io = falses(nstdio) + try + for i in 1:nstdio + open_io[i], close_io[i] = setup_stdio(stdios[i], i == 1) + end + pp = f(open_io) + return pp + finally + for i in 1:nstdio + close_io[i] && close_stdio(open_io[i]) + end + end +end + function setup_stdio(stdio::PipeEndpoint, child_readable::Bool) if stdio.status == StatusInit + # if the PipeEndpoint isn't open, set it to the parent end + # and pass the other end to the child rd, wr = link_pipe(!child_readable, child_readable) - open_pipe!(stdio, child_readable ? wr : rd, !child_readable, child_readable) - return (child_readable ? rd : wr, true) + try + open_pipe!(stdio, child_readable ? wr : rd) + catch ex + close_pipe_sync(rd) + close_pipe_sync(wr) + rethrow(ex) + end + child = child_readable ? rd : wr + return (child, true) end + # if it's already open, assume that it's already the child end + # (since we can't do anything else) return (stdio, false) end @@ -471,92 +552,73 @@ function setup_stdio(stdio::FileRedirect, child_readable::Bool) return (io, true) end -function setup_stdio(io, child_readable::Bool) - # if there is no specialization, - # assume that rawhandle is defined for it - return (io, false) -end - -close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) -close_stdio(stdio::Nothing) = nothing -close_stdio(stdio) = close(stdio) - -function setup_stdio(anon::Function, stdio::StdIOSet) - in, close_in = setup_stdio(stdio[1], true) +# incrementally move data between an IOBuffer and a system Pipe +# TODO: probably more efficient (when valid) to use `stdio` directly as the +# PipeEndpoint buffer field in some cases +function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool) + parent = PipeEndpoint() + rd, wr = link_pipe(!child_readable, child_readable) try - out, close_out = setup_stdio(stdio[2], false) - try - err, close_err = setup_stdio(stdio[3], false) - try - anon((in, out, err)) + open_pipe!(parent, child_readable ? wr : rd) + catch ex + close_pipe_sync(rd) + close_pipe_sync(wr) + rethrow(ex) + end + child = child_readable ? rd : wr + try + let in = (child_readable ? parent : stdio), + out = (child_readable ? stdio : parent) + @async try + write(in, out) + catch ex + @warn "Process error" exception=(ex, catch_backtrace()) finally - close_err && close_stdio(err) + close(parent) end - finally - close_out && close_stdio(out) end - finally - close_in && close_stdio(in) + catch ex + close_pipe_sync(child) + rethrow(ex) end - nothing + return (child, true) end -function _spawn(cmd::Cmd, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if isempty(cmd.exec) - throw(ArgumentError("cannot spawn empty command")) - end - pp = Process(cmd, C_NULL, stdios[1], stdios[2], stdios[3]) - setup_stdio(stdios) do stdios - handle = _jl_spawn(cmd.exec[1], cmd.exec, cmd, stdios) - associate_julia_struct(handle, pp) - pp.handle = handle - end - if chain !== nothing - push!(chain.processes, pp) - end - return pp +function setup_stdio(io, child_readable::Bool) + # if there is no specialization, + # assume that rawhandle is defined for it + return (io, false) end -function _spawn(cmds::AndCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) - end - setup_stdio(stdios) do stdios - _spawn(cmds.a, stdios, chain=chain) - _spawn(cmds.b, stdios, chain=chain) - end - return chain -end +close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) +close_stdio(stdio) = close(stdio) # INTERNAL -# returns stdios: -# A set of up to 256 stdio instructions, where each entry can be either: -# | - An IO to be passed to the child -# | - devnull to pass /dev/null -# | - An Filesystem.File object to redirect the output to -# \ - A string specifying a filename to be opened - -spawn_opts_swallow(stdios::StdIOSet) = (stdios,) -spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull, args...) = - ((in, out, err), args...) -spawn_opts_inherit(stdios::StdIOSet) = (stdios,) +# pad out stdio to have at least three elements, +# passing either `devnull` or the corresponding `stdio` +# A Redirectable can be any of: +# - A system IO handle, to be passed to the child +# - An uninitialized pipe, to be created +# - devnull (to pass /dev/null for 0-2, or to leave undefined for fd > 2) +# - An Filesystem.File or IOStream object to redirect the output to +# - A FileRedirect, containing a string specifying a filename to be opened for the child + +spawn_opts_swallow(stdios::StdIOSet) = Any[stdios...] +spawn_opts_inherit(stdios::StdIOSet) = Any[stdios...] +spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) = + Any[in, out, err] # pass original descriptors to child processes by default, because we might # have already exhausted and closed the libuv object for our standard streams. -# this caused issue #8529. -spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2), args...) = - ((in, out, err), args...) - -_spawn(cmds::AbstractCmd, args...; chain::Union{ProcessChain, Nothing}=nothing) = - _spawn(cmds, spawn_opts_swallow(args...)...; chain=chain) +# ref issue #8529 +spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) = + Any[in, out, err] function eachline(cmd::AbstractCmd; keep::Bool=false) - _stdout = Pipe() - processes = _spawn(cmd, (devnull, _stdout, stderr)) - close(_stdout.in) - out = _stdout.out - # implicitly close after reading lines, since we opened - return EachLine(out, keep=keep, - ondone=()->(close(out); success(processes) || pipeline_error(processes)))::EachLine + out = PipeEndpoint() + processes = _spawn(cmd, Any[devnull, out, stderr]) + # if the user consumes all the data, also check process exit status for success + ondone = () -> (success(processes) || pipeline_error(processes); nothing) + return EachLine(out, keep=keep, ondone=ondone)::EachLine end function open(cmds::AbstractCmd, mode::AbstractString, other::Redirectable=devnull) @@ -573,34 +635,34 @@ end # return a Process object to read-to/write-from the pipeline """ - open(command, stdio=devnull; write::Bool = false, read::Bool = !write) + open(command, other=devnull; write::Bool = false, read::Bool = !write) -Start running `command` asynchronously, and return a `process` object. If `read` is -true, then `process` reads from the process's standard output and `stdio` optionally -specifies the process's standard input stream. If `write` is true, then `process` writes to -the process's standard input and `stdio` optionally specifies the process's standard output +Start running `command` asynchronously, and return a `process::IO` object. If `read` is +true, then reads from the process come from the process's standard output and `other` optionally +specifies the process's standard input stream. If `write` is true, then writes go to +the process's standard input and `other` optionally specifies the process's standard output stream. +The process's standard error stream is connected to the current global `stderr`. """ -function open(cmds::AbstractCmd, other::Redirectable=devnull; write::Bool = false, read::Bool = !write) +function open(cmds::AbstractCmd, other::Redirectable=devnull; write::Bool=false, read::Bool=!write) if read && write - other === devnull || throw(ArgumentError("no other stream can be specified in read-write mode")) - in = Pipe() - out = Pipe() - processes = _spawn(cmds, (in,out,stderr)) - close(in.out) - close(out.in) + other === devnull || throw(ArgumentError("no stream can be specified for `other` in read-write mode")) + in = PipeEndpoint() + out = PipeEndpoint() + processes = _spawn(cmds, Any[in, out, stderr]) + processes.in = in + processes.out = out elseif read - in = other - out = Pipe() - processes = _spawn(cmds, (in,out,stderr)) - close(out.in) + out = PipeEndpoint() + processes = _spawn(cmds, Any[other, out, stderr]) + processes.out = out elseif write - in = Pipe() - out = other - processes = _spawn(cmds, (in,out,stderr)) - close(in.out) + in = PipeEndpoint() + processes = _spawn(cmds, Any[in, other, stderr]) + processes.in = in else - processes = _spawn(cmds) + other === devnull || throw(ArgumentError("no stream can be specified for `other` in no-access mode")) + processes = _spawn(cmds, Any[devnull, devnull, stderr]) end return processes end @@ -660,10 +722,27 @@ Use [`pipeline`](@ref) to control I/O redirection. """ function run(cmds::AbstractCmd, args...; wait::Bool = true) if wait - ps = _spawn(cmds, spawn_opts_inherit(args...)...) + ps = _spawn(cmds, spawn_opts_inherit(args...)) success(ps) || pipeline_error(ps) else - ps = _spawn(cmds, spawn_opts_swallow(args...)...) + stdios = spawn_opts_swallow(args...) + ps = _spawn(cmds, stdios) + # for each stdio input argument, guess whether the user + # passed a `stdio` placeholder object as input, and thus + # might be able to use the return AbstractProcess as an IO object + # (this really only applies to PipeEndpoint, Pipe, TCPSocket, or an AbstractPipe wrapping one of those) + if length(stdios) > 0 + in = stdios[1] + isa(in, IO) && (ps.in = in) + if length(stdios) > 1 + out = stdios[2] + isa(out, IO) && (ps.out = out) + if length(stdios) > 2 + err = stdios[3] + isa(err, IO) && (ps.err = err) + end + end + end end return ps end @@ -681,7 +760,7 @@ function test_success(proc::Process) @assert process_exited(proc) if proc.exitcode < 0 #TODO: this codepath is not currently tested - throw(_UVError("could not start process $(string(proc.cmd))", proc.exitcode)) + throw(_UVError("could not start process " * repr(proc.cmd), proc.exitcode)) end return proc.exitcode == 0 && (proc.termsignal == 0 || proc.termsignal == SIGPIPE) end @@ -788,22 +867,18 @@ process_exited(s::ProcessChain) = process_exited(s.processes) process_signaled(s::Process) = (s.termsignal > 0) -#process_stopped (s::Process) = false #not supported by libuv. Do we need this? -#process_stop_signal(s::Process) = false #not supported by libuv. Do we need this? - function process_status(s::Process) - process_running(s) ? "ProcessRunning" : - process_signaled(s) ? "ProcessSignaled("*string(s.termsignal)*")" : - #process_stopped(s) ? "ProcessStopped("*string(process_stop_signal(s))*")" : - process_exited(s) ? "ProcessExited("*string(s.exitcode)*")" : - error("process status error") + return process_running(s) ? "ProcessRunning" : + process_signaled(s) ? "ProcessSignaled(" * string(s.termsignal) * ")" : + process_exited(s) ? "ProcessExited(" * string(s.exitcode) * ")" : + error("process status error") end ## implementation of `cmd` syntax ## -arg_gen() = String[] +arg_gen() = String[] arg_gen(x::AbstractString) = String[cstr(x)] -arg_gen(cmd::Cmd) = cmd.exec +arg_gen(cmd::Cmd) = cmd.exec function arg_gen(head) if isiterable(typeof(head)) diff --git a/base/stream.jl b/base/stream.jl index 3d54865c89732..e17e32b8004fc 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -148,6 +148,18 @@ function PipeEndpoint() return pipe end +function PipeEndpoint(fd::OS_HANDLE) + pipe = PipeEndpoint() + err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) + uv_error("pipe_open", err) + pipe.status = StatusOpen + return pipe +end +if OS_HANDLE != RawFD + PipeEndpoint(fd::RawFD) = PipeEndpoint(Libc._get_osfhandle(fd)) +end + + mutable struct TTY <: LibuvStream handle::Ptr{Cvoid} status::Int @@ -177,16 +189,17 @@ mutable struct TTY <: LibuvStream end end -function TTY(fd::RawFD; readable::Bool = false) +function TTY(fd::OS_HANDLE) tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) - # This needs to go after associate_julia_struct so that there - # is no garbage in the ->data field - err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, RawFD, Int32), - eventloop(), tty.handle, fd, readable) + err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32), + eventloop(), tty.handle, fd, 0) uv_error("TTY", err) tty.status = StatusOpen return tty end +if OS_HANDLE != RawFD + TTY(fd::RawFD) = TTY(Libc._get_osfhandle(fd)) +end show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(", _fd(stream), " ", @@ -240,6 +253,63 @@ function init_stdio(handle::Ptr{Cvoid}) end end +""" + open(fd::OS_HANDLE) -> IO + +Take a raw file descriptor wrap it in a Julia-aware IO type, +and take ownership of the fd handle. +Call `open(Libc.dup(fd))` to avoid the ownership capture +of the original handle. + +WARNING: do not call this on a handle that's already owned by +some other part of the system. +""" +function open(h::OS_HANDLE) + t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h) + if t == UV_FILE + @static if Sys.iswindows() + # TODO: Get ios.c to understand native handles + h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0) + end + # TODO: Get fdio to work natively with file descriptors instead of integers + return fdio(cconvert(Cint, h)) + elseif t == UV_TTY + return TTY(h) + elseif t == UV_TCP + Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) + return Sockets.TCPSocket(h) + elseif t == UV_NAMED_PIPE + pipe = PipeEndpoint(h) + @static if Sys.iswindows() + if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), pipe.handle) != 0 + # replace the Julia `PipeEndpoint` type with a `TTY` type, + # if we detect that this is a cygwin pty object + pipe_handle, pipe_status = pipe.handle, pipe.status + pipe.status = StatusClosed + pipe.handle = C_NULL + return TTY(pipe_handle, pipe_status) + end + end + return pipe + else + throw(ArgumentError("invalid stdio type: $t")) + end +end + +if OS_HANDLE != RawFD + function open(fd::RawFD) + h = Libc.dup(Libc._get_osfhandle(fd)) # make a dup to steal ownership away from msvcrt + try + io = open(h) + ccall(:_close, Cint, (RawFD,), fd) # on success, destroy the old libc handle + return io + catch ex + ccall(:CloseHandle, stdcall, Cint, (OS_HANDLE,), h) # on failure, destroy the new nt handle + rethrow(ex) + end + end +end + function isopen(x::Union{LibuvStream, LibuvServer}) if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$x is not initialized")) @@ -575,12 +645,12 @@ show(io::IO, stream::Pipe) = print(io, ## Functions for PipeEndpoint and PipeServer ## -function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE, readable::Bool, writable::Bool) +function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE) if p.status != StatusInit error("pipe is already in use or has been closed") end - err = ccall(:jl_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE, Cint, Cint), p.handle, handle, readable, writable) - uv_error("open_pipe", err) + err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle) + uv_error("pipe_open", err) p.status = StatusOpen return p end @@ -591,13 +661,13 @@ function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool, rd, wr = link_pipe(reader_supports_async, writer_supports_async) try try - open_pipe!(read_end, rd, true, false) + open_pipe!(read_end, rd) catch close_pipe_sync(rd) rethrow() end read_end.status = StatusOpen - open_pipe!(write_end, wr, false, true) + open_pipe!(write_end, wr) catch close_pipe_sync(wr) rethrow() @@ -891,6 +961,7 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) end _fd(x::IOStream) = RawFD(fd(x)) +_fd(x::Union{OS_HANDLE, RawFD}) = x function _fd(x::Union{LibuvStream, LibuvServer}) fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE) diff --git a/src/init.c b/src/init.c index cf3a420d76d98..c3e4ede715b56 100644 --- a/src/init.c +++ b/src/init.c @@ -309,11 +309,6 @@ void *jl_winsock_handle; uv_loop_t *jl_io_loop; -#ifndef _OS_WINDOWS_ -#define UV_STREAM_READABLE 0x20 /* The stream is readable */ -#define UV_STREAM_WRITABLE 0x40 /* The stream is writable */ -#endif - #ifdef _OS_WINDOWS_ int uv_dup(uv_os_fd_t fd, uv_os_fd_t* dupfd) { HANDLE current_process; @@ -368,7 +363,7 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable) switch(uv_guess_handle(fd)) { case UV_TTY: handle = malloc(sizeof(uv_tty_t)); - if ((err = uv_tty_init(jl_io_loop, (uv_tty_t*)handle, fd, readable))) { + if ((err = uv_tty_init(jl_io_loop, (uv_tty_t*)handle, fd, 0))) { jl_errorf("error initializing %s in uv_tty_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } ((uv_tty_t*)handle)->data = NULL; @@ -413,13 +408,6 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable) if ((err = uv_pipe_open((uv_pipe_t*)handle, fd))) { jl_errorf("error initializing %s in uv_pipe_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } -#ifndef _OS_WINDOWS_ - // remove flags set erroneously by libuv: - if (readable) - ((uv_pipe_t*)handle)->flags &= ~UV_STREAM_WRITABLE; - else - ((uv_pipe_t*)handle)->flags &= ~UV_STREAM_READABLE; -#endif ((uv_pipe_t*)handle)->data = NULL; break; case UV_TCP: diff --git a/src/jl_uv.c b/src/jl_uv.c index a98108e0953ce..6eb01267bdf52 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -210,24 +210,6 @@ JL_DLLEXPORT int jl_process_events(uv_loop_t *loop) else return 0; } -#ifndef _OS_WINDOWS_ -#define UV_STREAM_READABLE 0x20 /* The stream is readable */ -#define UV_STREAM_WRITABLE 0x40 /* The stream is writable */ -#endif - -JL_DLLEXPORT int jl_pipe_open(uv_pipe_t *pipe, uv_os_fd_t fd, int readable, int writable) -{ - int err = uv_pipe_open(pipe, fd); -#ifndef _OS_WINDOWS_ - // clear flags set erroneously by libuv: - if (!readable) - pipe->flags &= ~UV_STREAM_READABLE; - if (!writable) - pipe->flags &= ~UV_STREAM_WRITABLE; -#endif - return err; -} - static void jl_proc_exit_cleanup(uv_process_t *process, int64_t exit_status, int term_signal) { uv_close((uv_handle_t*)process, (uv_close_cb)&free); diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 6b39ae0a427e2..2965966fdd1c0 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -29,7 +29,7 @@ import Base: isless, show, print, parse, bind, convert, isreadable, iswritable, using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize, notify_error, stream_wait, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive, - uv_status_string, check_open, wait_connected, + uv_status_string, check_open, wait_connected, OS_HANDLE, RawFD, UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL, UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS, UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL, @@ -86,6 +86,18 @@ function TCPSocket(; delay=true) return tcp end +function TCPSocket(fd::OS_HANDLE) + tcp = TCPSocket() + err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) + uv_error("tcp_open", err) + tcp.status = StatusOpen + return tcp +end +if OS_HANDLE != RawFD + TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd)) +end + + mutable struct TCPServer <: LibuvServer handle::Ptr{Cvoid} status::Int diff --git a/test/spawn.jl b/test/spawn.jl index 0cfc23a8a02df..884806d333d6f 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -57,16 +57,10 @@ out = read(`$echocmd hello` & `$echocmd world`, String) # Test for SIGPIPE being treated as normal termination (throws an error if broken) Sys.isunix() && run(pipeline(yescmd, `head`, devnull)) -let a, p - a = Base.Condition() - t = @async begin - p = run(pipeline(yescmd,devnull), wait=false) - Base.notify(a,p) - @test !success(p) - end - p = wait(a) +let p = run(pipeline(yescmd, devnull), wait=false) + t = @async !success(p) kill(p) - wait(t) + @test fetch(t) end if valgrind_off diff --git a/test/testhelpers/FakePTYs.jl b/test/testhelpers/FakePTYs.jl index f53cc2ba339e5..4004d427be074 100644 --- a/test/testhelpers/FakePTYs.jl +++ b/test/testhelpers/FakePTYs.jl @@ -22,7 +22,7 @@ function open_fake_pty() # slave slave = RawFD(fds) - master = Base.TTY(RawFD(fdm); readable = true) + master = Base.TTY(RawFD(fdm)) slave, master end