From 4de454c143139e0af80df866d9e529c9a4605496 Mon Sep 17 00:00:00 2001 From: JobJob Date: Mon, 23 Nov 2015 13:32:06 +0200 Subject: [PATCH] Faster polling IO improvements Instead of sleeping during the read loop to throttle the number of stream messages sent, we now continually read the read_stdout and read_stderr streams (whose buffers are limited to 64KB on OSX/Linux, 4KB Windows?) and add data read into our own IOBuffers for STDOUT/STDERR. The rate of sending is now controlled by a stream_interval parameter; a stream message is now sent at most once every stream_interval seconds (currently 0.1). The exception to this is if the buffer has reached the size specified in max_size (currently 10KB), and will then be sent immediately. This is to avoid overly large stream messages being sent. Improves flush() so that it will have a very high chance of flushing data already written to stdout/stderr. The above changes fix #372 #342 #238 #347 Adds timestamps to the debug logging and the task which the vprintln call is made from. Fixes using ?keyword (e.g. ?try) --- src/IJulia.jl | 5 +- src/execute_request.jl | 51 +++++++-------- src/msg.jl | 6 +- src/stdio.jl | 137 +++++++++++++++++++++++++++++++++-------- 4 files changed, 140 insertions(+), 59 deletions(-) diff --git a/src/IJulia.jl b/src/IJulia.jl index 250cad4d..1d02b320 100644 --- a/src/IJulia.jl +++ b/src/IJulia.jl @@ -133,6 +133,7 @@ include("handlers.jl") include("heartbeat.jl") function eventloop(socket) + task_local_storage(:IJulia_task, "write task") try while true msg = recv_ipython(socket) @@ -147,7 +148,7 @@ function eventloop(socket) if !isa(e, InterruptException) content = error_content(e, msg="KERNEL EXCEPTION") map(s -> println(orig_STDERR, s), content["traceback"]) - send_ipython(publish, + send_ipython(publish, execute_msg == nothing ? Msg([ "error" ], @compat(Dict("msg_id" => uuid4(), @@ -156,7 +157,7 @@ function eventloop(socket) "msg_type" => "error", "version" => "5.0")), content) : - msg_pub(execute_msg, "error", content)) + msg_pub(execute_msg, "error", content)) end finally send_status("idle", msg.header) diff --git a/src/execute_request.jl b/src/execute_request.jl index aa585f0a..c67cda44 100644 --- a/src/execute_request.jl +++ b/src/execute_request.jl @@ -23,7 +23,7 @@ metadata(x) = Dict() # return a AbstractString=>AbstractString dictionary of mimetype=>data # for passing to Jupyter display_data and execute_result messages. function display_dict(x) - data = @compat Dict{ASCIIString,ByteString}("text/plain" => + data = @compat Dict{ASCIIString,ByteString}("text/plain" => sprint(writemime, "text/plain", x)) if mimewritable(image_svg, x) data[string(image_svg)] = stringmime(image_svg, x) @@ -74,7 +74,7 @@ end function error_content(e; backtrace_top::Symbol=:execute_request_0x535c5df2, msg::AbstractString="") bt = catch_backtrace() tb = map(utf8, @compat(split(sprint(show_bt, - backtrace_top, + backtrace_top, bt, 1:typemax(Int)), "\n", keep=true))) if !isempty(tb) && ismatch(r"^\s*in\s+include_string\s+", tb[end]) @@ -131,8 +131,11 @@ function helpcode(code::AbstractString) else # new Base.Docs.@repl macro from julia@08663d4bb05c5b8805a57f46f4feacb07c7f2564 code_ = strip(code) # as in base/REPL.jl, special-case keywords so that they parse - return "Base.Docs.@repl " * (haskey(Docs.keywords, symbol(code_)) ? - ":"*code_ : code_) + if(haskey(Docs.keywords, symbol(code_))) + return "eval(:(Base.Docs.@repl \$(symbol(\"$code_\"))))" + else + return "Base.Docs.@repl $code_" + end end end @@ -148,20 +151,20 @@ function execute_request_0x535c5df2(socket, msg) if !silent _n += 1 - send_ipython(publish, + send_ipython(publish, msg_pub(msg, "execute_input", @compat Dict("execution_count" => _n, "code" => code))) end - + silent = silent || ismatch(r";\s*$", code) if store_history In[_n] = code end # "; ..." cells are interpreted as shell commands for run - code = replace(code, r"^\s*;.*$", - m -> string(replace(m, r"^\s*;", "Base.repl_cmd(`"), + code = replace(code, r"^\s*;.*$", + m -> string(replace(m, r"^\s*;", "Base.repl_cmd(`"), "`)"), 0) # a cell beginning with "? ..." is interpreted as a help request @@ -170,17 +173,20 @@ function execute_request_0x535c5df2(socket, msg) code = helpcode(hcode) end - try + try for hook in preexecute_hooks hook() end + + #run the code! ans = result = include_string(code, "In[$_n]") + if silent result = nothing elseif result != nothing if store_history if result != Out # workaround for Julia #3066 - Out[_n] = result + Out[_n] = result end end end @@ -194,33 +200,23 @@ function execute_request_0x535c5df2(socket, msg) hook() end - # flush pending stdio - flush_cstdio() # flush writes to stdout/stderr by external C code - yield() - send_stream(read_stdout, "stdout") - send_stream(read_stderr, "stderr") + # flush pending stdio + flush_all() undisplay(result) # dequeue if needed, since we display result in pyout display() # flush pending display requests if result != nothing - # Work around for Julia issue #265 (see # #7884 for context) # We have to explicitly invoke the correct metadata method. result_metadata = invoke(metadata, (typeof(result),), result) - send_ipython(publish, msg_pub(msg, "execute_result", @compat Dict("execution_count" => _n, "metadata" => result_metadata, "data" => display_dict(result)))) - - flush_cstdio() # flush writes to stdout/stderr by external C code - yield() - send_stream(read_stdout, "stdout") - send_stream(read_stderr, "stderr") + end - send_ipython(requests, msg_reply(msg, "execute_reply", @compat Dict("status" => "ok", @@ -230,10 +226,7 @@ function execute_request_0x535c5df2(socket, msg) catch e try # flush pending stdio - flush_cstdio() # flush writes to stdout/stderr by external C code - yield() - send_stream(read_stdout, "stdout") - send_stream(read_stderr, "stderr") + flush_all() for hook in posterror_hooks hook() end @@ -255,9 +248,7 @@ end # output only when new output is available, for minimal flickering. function clear_output(wait=false) # flush pending stdio - flush_cstdio() # flush writes to stdout/stderr by external C code - send_stream(read_stdout, "stdout") - send_stream(read_stderr, "stderr") + flush_all() send_ipython(publish, msg_reply(execute_msg::Msg, "clear_output", @compat Dict("wait" => wait))) end diff --git a/src/msg.jl b/src/msg.jl index 34049986..85917d3a 100644 --- a/src/msg.jl +++ b/src/msg.jl @@ -21,7 +21,7 @@ end # of the spec yet" and is in practice currently ignored since "all # subscribers currently subscribe to all topics".] msg_pub(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) = - Msg([ msg_type == "stream" ? content["name"] : msg_type ], + Msg([ msg_type == "stream" ? content["name"] : msg_type ], @compat(Dict("msg_id" => uuid4(), "username" => m.header["username"], "session" => m.header["session"], @@ -30,7 +30,7 @@ msg_pub(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) = content, m.header, metadata) msg_reply(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) = - Msg(m.idents, + Msg(m.idents, @compat(Dict("msg_id" => uuid4(), "username" => m.header["username"], "session" => m.header["session"], @@ -41,7 +41,7 @@ msg_reply(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) = function show(io::IO, msg::Msg) print(io, "IPython Msg [ idents ") print_joined(io, msg.idents, ", ") - print(io, " ] {\n header = $(msg.header),\n metadata = $(msg.metadata),\n content = $(msg.content)\n}") + print(io, " ] {\n parent_header = $(msg.parent_header),\n header = $(msg.header),\n metadata = $(msg.metadata),\n content = $(msg.content)\n}") end function send_ipython(socket, m::Msg) diff --git a/src/stdio.jl b/src/stdio.jl index 5f0322d9..5a52f27c 100644 --- a/src/stdio.jl +++ b/src/stdio.jl @@ -4,13 +4,22 @@ # logging in verbose mode goes to original stdio streams. Use macros # so that we do not even evaluate the arguments in no-verbose modes + +function get_log_preface() + t = now() + taskname = get(task_local_storage(), :IJulia_task, "") + @sprintf("%02d:%02d:%02d(%s): ", Dates.hour(t),Dates.minute(t),Dates.second(t),taskname) +end + + macro vprintln(x...) quote if verbose::Bool - println(orig_STDOUT, $(x...)) + println(orig_STDOUT, get_log_preface(), $(x...)) end end end + macro verror_show(e, bt) quote if verbose::Bool @@ -19,27 +28,31 @@ macro verror_show(e, bt) end end -function send_stream(rd::IO, name::AbstractString) - nb = nb_available(rd) - if nb > 0 - d = readbytes(rd, nb) - s = try - bytestring(d) - catch - # FIXME: what should we do here? - string("") - end - send_ipython(publish, - msg_pub(execute_msg, "stream", - @compat Dict("name" => name, "text" => s))) - end -end +#name=>iobuffer for each stream ("stdout","stderr") so they can be sent in flush +const bufs = Dict{ASCIIString,IOBuffer}() +const stream_interval = 0.1 +const max_bytes = 10*1024 +"""Continually read from (size limited) Libuv/OS buffer into an (effectively unlimited) `IObuffer` +to avoid problems when the Libuv/OS buffer gets full (https://github.com/JuliaLang/julia/issues/8789). +Send data immediately when buffer contains more than `max_bytes` bytes. Otherwise, if data is available +it will be sent every `stream_interval` seconds (see the Timers set up in watch_stdio)""" function watch_stream(rd::IO, name::AbstractString) + task_local_storage(:IJulia_task, "read $name task") try + buf = IOBuffer() + bufs[name] = buf while !eof(rd) # blocks until something is available - send_stream(rd, name) - sleep(0.1) # a little delay to accumulate output + nb = nb_available(rd) + if nb > 0 + write(buf, readbytes(rd, nb)) + end + if buf.size > 0 + if buf.size >= max_bytes + #send immediately + send_stream(name) + end + end end catch e # the IPython manager may send us a SIGINT if the user @@ -52,6 +65,62 @@ function watch_stream(rd::IO, name::AbstractString) end end +function send_stdio(name) + if verbose::Bool && !haskey(task_local_storage(), :IJulia_task) + task_local_storage(:IJulia_task, "send $name task") + end + send_stream(name) +end + +send_stdout(t::Timer) = send_stdio("stdout") +send_stderr(t::Timer) = send_stdio("stderr") + +function send_stream(name::AbstractString) + buf = bufs[name] + if buf.size > 0 + d = takebuf_array(buf) + n = num_utf8_trailing(d) + dextra = d[end-(n-1):end] + resize!(d, length(d) - n) + s = UTF8String(d) + if isvalid(s) + write(buf, dextra) # assume that the rest of the string will be written later + length(d) == 0 && return + else + # fallback: base64-encode non-UTF8 binary data + sbuf = IOBuffer() + print(sbuf, "base64 binary data: ") + b64 = Base64EncodePipe(sbuf) + write(b64, d) + write(b64, dextra) + close(b64) + print(sbuf, '\n') + s = takebuf_string(sbuf) + end + send_ipython(publish, + msg_pub(execute_msg, "stream", + @compat Dict("name" => name, "text" => s))) + end +end + +""" +If `d` ends with an incomplete UTF8-encoded character, return the number of trailing incomplete bytes. +Otherwise, return `0`. +""" +function num_utf8_trailing(d::Vector{UInt8}) + i = length(d) + # find last non-continuation byte in d: + while i >= 1 && ((d[i] & 0xc0) == 0x80) + i -= 1 + end + i < 1 && return 0 + c = d[i] + # compute number of expected UTF-8 bytes starting at i: + n = c <= 0x7f ? 1 : c < 0xe0 ? 2 : c < 0xf0 ? 3 : 4 + nend = length(d) + 1 - i # num bytes from i to end + return nend == n ? 0 : nend +end + # this is hacky: we overload some of the I/O functions on pipe endpoints # in order to fix some interactions with stdio. if VERSION < v"0.4.0-dev+6987" # JuliaLang/julia#12739 @@ -87,20 +156,40 @@ function readline(io::StdioPipe) end function watch_stdio() - @async watch_stream(read_stdout, "stdout") + task_local_storage(:IJulia_task, "init task") + read_task = @async watch_stream(read_stdout, "stdout") + #send STDOUT stream msgs every stream_interval secs (if there is output to send) + Timer(send_stdout, stream_interval, stream_interval) if capture_stderr - @async watch_stream(read_stderr, "stderr") + readerr_task = @async watch_stream(read_stderr, "stderr") + #send STDERR stream msgs every stream_interval secs (if there is output to send) + Timer(send_stderr, stream_interval, stream_interval) end end +function flush_all() + flush_cstdio() # flush writes to stdout/stderr by external C code + flush(STDOUT) + flush(STDERR) +end + +function oslibuv_flush() + #refs: https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144505862 + # https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144605024 + @windows_only ccall(:SwitchToThread, stdcall, Void, ()) + yield() + yield() +end + import Base.flush function flush(io::StdioPipe) invoke(flush, (super(StdioPipe),), io) - # send any available bytes to IPython (don't use readavailable, - # since we don't want to block). if io == STDOUT - send_stream(read_stdout, "stdout") + oslibuv_flush() + send_stream("stdout") elseif io == STDERR - send_stream(read_stderr, "stderr") + oslibuv_flush() + send_stream("stderr") end end +