Skip to content

Commit

Permalink
Faster polling IO improvements
Browse files Browse the repository at this point in the history
Instead of sleeping during the read loop to throttle the number of
stream messages sent, we now continually read the read_stdout and
read_stderr streams (whose buffers are limited to 64KB on OSX/Linux, 4KB
Windows?) and add data read into our own IOBuffers for STDOUT/STDERR.

The rate of sending is now controlled by a stream_interval parameter; a
stream message is now sent at most once every stream_interval seconds
(currently 0.1). The exception to this is if the buffer has reached the
size specified in max_size (currently 10KB), and will then be sent
immediately. This is to avoid overly large stream messages being sent.

Improves flush() so that it will have a very high chance of flushing data
already written to stdout/stderr.

The above changes fix JuliaLang#372 JuliaLang#342 JuliaLang#238 JuliaLang#347

Adds timestamps to the debug logging and the task which the vprintln
call is made from.

Fixes using ?keyword (e.g. ?try)
  • Loading branch information
JobJob committed Nov 27, 2015
1 parent ca35a63 commit 4de454c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 59 deletions.
5 changes: 3 additions & 2 deletions src/IJulia.jl
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ include("handlers.jl")
include("heartbeat.jl")

function eventloop(socket)
task_local_storage(:IJulia_task, "write task")
try
while true
msg = recv_ipython(socket)
Expand All @@ -147,7 +148,7 @@ function eventloop(socket)
if !isa(e, InterruptException)
content = error_content(e, msg="KERNEL EXCEPTION")
map(s -> println(orig_STDERR, s), content["traceback"])
send_ipython(publish,
send_ipython(publish,
execute_msg == nothing ?
Msg([ "error" ],
@compat(Dict("msg_id" => uuid4(),
Expand All @@ -156,7 +157,7 @@ function eventloop(socket)
"msg_type" => "error",
"version" => "5.0")),
content) :
msg_pub(execute_msg, "error", content))
msg_pub(execute_msg, "error", content))
end
finally
send_status("idle", msg.header)
Expand Down
51 changes: 21 additions & 30 deletions src/execute_request.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata(x) = Dict()
# return a AbstractString=>AbstractString dictionary of mimetype=>data
# for passing to Jupyter display_data and execute_result messages.
function display_dict(x)
data = @compat Dict{ASCIIString,ByteString}("text/plain" =>
data = @compat Dict{ASCIIString,ByteString}("text/plain" =>
sprint(writemime, "text/plain", x))
if mimewritable(image_svg, x)
data[string(image_svg)] = stringmime(image_svg, x)
Expand Down Expand Up @@ -74,7 +74,7 @@ end
function error_content(e; backtrace_top::Symbol=:execute_request_0x535c5df2, msg::AbstractString="")
bt = catch_backtrace()
tb = map(utf8, @compat(split(sprint(show_bt,
backtrace_top,
backtrace_top,
bt, 1:typemax(Int)),
"\n", keep=true)))
if !isempty(tb) && ismatch(r"^\s*in\s+include_string\s+", tb[end])
Expand Down Expand Up @@ -131,8 +131,11 @@ function helpcode(code::AbstractString)
else # new Base.Docs.@repl macro from julia@08663d4bb05c5b8805a57f46f4feacb07c7f2564
code_ = strip(code)
# as in base/REPL.jl, special-case keywords so that they parse
return "Base.Docs.@repl " * (haskey(Docs.keywords, symbol(code_)) ?
":"*code_ : code_)
if(haskey(Docs.keywords, symbol(code_)))
return "eval(:(Base.Docs.@repl \$(symbol(\"$code_\"))))"
else
return "Base.Docs.@repl $code_"
end
end
end

Expand All @@ -148,20 +151,20 @@ function execute_request_0x535c5df2(socket, msg)

if !silent
_n += 1
send_ipython(publish,
send_ipython(publish,
msg_pub(msg, "execute_input",
@compat Dict("execution_count" => _n,
"code" => code)))
end

silent = silent || ismatch(r";\s*$", code)
if store_history
In[_n] = code
end

# "; ..." cells are interpreted as shell commands for run
code = replace(code, r"^\s*;.*$",
m -> string(replace(m, r"^\s*;", "Base.repl_cmd(`"),
code = replace(code, r"^\s*;.*$",
m -> string(replace(m, r"^\s*;", "Base.repl_cmd(`"),
"`)"), 0)

# a cell beginning with "? ..." is interpreted as a help request
Expand All @@ -170,17 +173,20 @@ function execute_request_0x535c5df2(socket, msg)
code = helpcode(hcode)
end

try
try
for hook in preexecute_hooks
hook()
end

#run the code!
ans = result = include_string(code, "In[$_n]")

if silent
result = nothing
elseif result != nothing
if store_history
if result != Out # workaround for Julia #3066
Out[_n] = result
Out[_n] = result
end
end
end
Expand All @@ -194,33 +200,23 @@ function execute_request_0x535c5df2(socket, msg)
hook()
end

# flush pending stdio
flush_cstdio() # flush writes to stdout/stderr by external C code
yield()
send_stream(read_stdout, "stdout")
send_stream(read_stderr, "stderr")
# flush pending stdio
flush_all()

undisplay(result) # dequeue if needed, since we display result in pyout
display() # flush pending display requests

if result != nothing

# Work around for Julia issue #265 (see # #7884 for context)
# We have to explicitly invoke the correct metadata method.
result_metadata = invoke(metadata, (typeof(result),), result)

send_ipython(publish,
msg_pub(msg, "execute_result",
@compat Dict("execution_count" => _n,
"metadata" => result_metadata,
"data" => display_dict(result))))

flush_cstdio() # flush writes to stdout/stderr by external C code
yield()
send_stream(read_stdout, "stdout")
send_stream(read_stderr, "stderr")

end

send_ipython(requests,
msg_reply(msg, "execute_reply",
@compat Dict("status" => "ok",
Expand All @@ -230,10 +226,7 @@ function execute_request_0x535c5df2(socket, msg)
catch e
try
# flush pending stdio
flush_cstdio() # flush writes to stdout/stderr by external C code
yield()
send_stream(read_stdout, "stdout")
send_stream(read_stderr, "stderr")
flush_all()
for hook in posterror_hooks
hook()
end
Expand All @@ -255,9 +248,7 @@ end
# output only when new output is available, for minimal flickering.
function clear_output(wait=false)
# flush pending stdio
flush_cstdio() # flush writes to stdout/stderr by external C code
send_stream(read_stdout, "stdout")
send_stream(read_stderr, "stderr")
flush_all()
send_ipython(publish, msg_reply(execute_msg::Msg, "clear_output",
@compat Dict("wait" => wait)))
end
6 changes: 3 additions & 3 deletions src/msg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ end
# of the spec yet" and is in practice currently ignored since "all
# subscribers currently subscribe to all topics".]
msg_pub(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) =
Msg([ msg_type == "stream" ? content["name"] : msg_type ],
Msg([ msg_type == "stream" ? content["name"] : msg_type ],
@compat(Dict("msg_id" => uuid4(),
"username" => m.header["username"],
"session" => m.header["session"],
Expand All @@ -30,7 +30,7 @@ msg_pub(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) =
content, m.header, metadata)

msg_reply(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) =
Msg(m.idents,
Msg(m.idents,
@compat(Dict("msg_id" => uuid4(),
"username" => m.header["username"],
"session" => m.header["session"],
Expand All @@ -41,7 +41,7 @@ msg_reply(m::Msg, msg_type, content, metadata=Dict{AbstractString,Any}()) =
function show(io::IO, msg::Msg)
print(io, "IPython Msg [ idents ")
print_joined(io, msg.idents, ", ")
print(io, " ] {\n header = $(msg.header),\n metadata = $(msg.metadata),\n content = $(msg.content)\n}")
print(io, " ] {\n parent_header = $(msg.parent_header),\n header = $(msg.header),\n metadata = $(msg.metadata),\n content = $(msg.content)\n}")
end

function send_ipython(socket, m::Msg)
Expand Down
137 changes: 113 additions & 24 deletions src/stdio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@

# logging in verbose mode goes to original stdio streams. Use macros
# so that we do not even evaluate the arguments in no-verbose modes

function get_log_preface()
t = now()
taskname = get(task_local_storage(), :IJulia_task, "")
@sprintf("%02d:%02d:%02d(%s): ", Dates.hour(t),Dates.minute(t),Dates.second(t),taskname)
end


macro vprintln(x...)
quote
if verbose::Bool
println(orig_STDOUT, $(x...))
println(orig_STDOUT, get_log_preface(), $(x...))
end
end
end

macro verror_show(e, bt)
quote
if verbose::Bool
Expand All @@ -19,27 +28,31 @@ macro verror_show(e, bt)
end
end

function send_stream(rd::IO, name::AbstractString)
nb = nb_available(rd)
if nb > 0
d = readbytes(rd, nb)
s = try
bytestring(d)
catch
# FIXME: what should we do here?
string("<ERROR: invalid UTF8 data ", d, ">")
end
send_ipython(publish,
msg_pub(execute_msg, "stream",
@compat Dict("name" => name, "text" => s)))
end
end
#name=>iobuffer for each stream ("stdout","stderr") so they can be sent in flush
const bufs = Dict{ASCIIString,IOBuffer}()
const stream_interval = 0.1
const max_bytes = 10*1024

"""Continually read from (size limited) Libuv/OS buffer into an (effectively unlimited) `IObuffer`
to avoid problems when the Libuv/OS buffer gets full (https://github.com/JuliaLang/julia/issues/8789).
Send data immediately when buffer contains more than `max_bytes` bytes. Otherwise, if data is available
it will be sent every `stream_interval` seconds (see the Timers set up in watch_stdio)"""
function watch_stream(rd::IO, name::AbstractString)
task_local_storage(:IJulia_task, "read $name task")
try
buf = IOBuffer()
bufs[name] = buf
while !eof(rd) # blocks until something is available
send_stream(rd, name)
sleep(0.1) # a little delay to accumulate output
nb = nb_available(rd)
if nb > 0
write(buf, readbytes(rd, nb))
end
if buf.size > 0
if buf.size >= max_bytes
#send immediately
send_stream(name)
end
end
end
catch e
# the IPython manager may send us a SIGINT if the user
Expand All @@ -52,6 +65,62 @@ function watch_stream(rd::IO, name::AbstractString)
end
end

function send_stdio(name)
if verbose::Bool && !haskey(task_local_storage(), :IJulia_task)
task_local_storage(:IJulia_task, "send $name task")
end
send_stream(name)
end

send_stdout(t::Timer) = send_stdio("stdout")
send_stderr(t::Timer) = send_stdio("stderr")

function send_stream(name::AbstractString)
buf = bufs[name]
if buf.size > 0
d = takebuf_array(buf)
n = num_utf8_trailing(d)
dextra = d[end-(n-1):end]
resize!(d, length(d) - n)
s = UTF8String(d)
if isvalid(s)
write(buf, dextra) # assume that the rest of the string will be written later
length(d) == 0 && return
else
# fallback: base64-encode non-UTF8 binary data
sbuf = IOBuffer()
print(sbuf, "base64 binary data: ")
b64 = Base64EncodePipe(sbuf)
write(b64, d)
write(b64, dextra)
close(b64)
print(sbuf, '\n')
s = takebuf_string(sbuf)
end
send_ipython(publish,
msg_pub(execute_msg, "stream",
@compat Dict("name" => name, "text" => s)))
end
end

"""
If `d` ends with an incomplete UTF8-encoded character, return the number of trailing incomplete bytes.
Otherwise, return `0`.
"""
function num_utf8_trailing(d::Vector{UInt8})
i = length(d)
# find last non-continuation byte in d:
while i >= 1 && ((d[i] & 0xc0) == 0x80)
i -= 1
end
i < 1 && return 0
c = d[i]
# compute number of expected UTF-8 bytes starting at i:
n = c <= 0x7f ? 1 : c < 0xe0 ? 2 : c < 0xf0 ? 3 : 4
nend = length(d) + 1 - i # num bytes from i to end
return nend == n ? 0 : nend
end

# this is hacky: we overload some of the I/O functions on pipe endpoints
# in order to fix some interactions with stdio.
if VERSION < v"0.4.0-dev+6987" # JuliaLang/julia#12739
Expand Down Expand Up @@ -87,20 +156,40 @@ function readline(io::StdioPipe)
end

function watch_stdio()
@async watch_stream(read_stdout, "stdout")
task_local_storage(:IJulia_task, "init task")
read_task = @async watch_stream(read_stdout, "stdout")
#send STDOUT stream msgs every stream_interval secs (if there is output to send)
Timer(send_stdout, stream_interval, stream_interval)
if capture_stderr
@async watch_stream(read_stderr, "stderr")
readerr_task = @async watch_stream(read_stderr, "stderr")
#send STDERR stream msgs every stream_interval secs (if there is output to send)
Timer(send_stderr, stream_interval, stream_interval)
end
end

function flush_all()
flush_cstdio() # flush writes to stdout/stderr by external C code
flush(STDOUT)
flush(STDERR)
end

function oslibuv_flush()
#refs: https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144505862
# https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144605024
@windows_only ccall(:SwitchToThread, stdcall, Void, ())
yield()
yield()
end

import Base.flush
function flush(io::StdioPipe)
invoke(flush, (super(StdioPipe),), io)
# send any available bytes to IPython (don't use readavailable,
# since we don't want to block).
if io == STDOUT
send_stream(read_stdout, "stdout")
oslibuv_flush()
send_stream("stdout")
elseif io == STDERR
send_stream(read_stderr, "stderr")
oslibuv_flush()
send_stream("stderr")
end
end

0 comments on commit 4de454c

Please sign in to comment.