Skip to content

Commit 6bbd0de

Browse files
committed
Call unblock functions using JNI in CExtInterrupter
* We cannot enter TruffleContext on a system thread, so that leaves us with two options: * Calling the unblock function with JNI, and expect the unblock function does not call back to Ruby (not the case for all unblock functions I saw). * Calling the unblock function on a new Ruby Thread per context and communicate via a queue, but this seems very high overhead for every guest safepoint. * Fixes #3013
1 parent dd46ed9 commit 6bbd0de

File tree

14 files changed

+144
-56
lines changed

14 files changed

+144
-56
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Bug fixes:
1313
* Fix some C API functions which were failing when called with Ruby values represented as Java primitives (#3352, @eregon).
1414
* Fix `IO.select([io], nil, [io])` on macOS, it was hanging due to a bug in macOS `poll(2)` (#3346, @eregon, @andrykonchin).
1515
* Run context cleanup such as showing the output of tools when `SignalException` and `Interrupt` escape (@eregon).
16+
* Fix using the `--cpusampler` profiler when there are custom unblock functions for `rb_thread_call_without_gvl()` (#3013, @eregon).
1617

1718
Compatibility:
1819

lib/cext/ABI_check.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
7
1+
8

lib/truffle/truffle/cext.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -1933,9 +1933,14 @@ def rb_thread_call_without_gvl(function, data1, unblock, data2)
19331933
end
19341934

19351935
private def rb_thread_call_without_gvl_inner(function, data1, unblock, data2)
1936+
if SULONG
1937+
Truffle::Interop.to_native(unblock)
1938+
Truffle::Interop.to_native(data2)
1939+
end
1940+
19361941
Primitive.call_with_unblocking_function(Thread.current,
19371942
POINTER_TO_POINTER_WRAPPER, function, data1,
1938-
POINTER_TO_VOID_WRAPPER, unblock, data2)
1943+
Truffle::Interop.as_pointer(unblock), Truffle::Interop.as_pointer(data2))
19391944
end
19401945

19411946
def rb_iterate(iteration, iterated_object, callback, callback_arg)
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sulong(would need to enter the context to call the unblock function):C-API Thread function rb_thread_call_without_gvl runs a C function with the global lock unlocked and can be woken by a signal
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler
2+
slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler
3+
sulong(safepoint inside C code instead):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler
4+
sulong(would need to enter the context to call the unblock function):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
slow:Unimplemented functions in the C-API abort the process and show an error including the function name
2+
sulong(different behavior):Unimplemented functions in the C-API abort the process and show an error including the function name

spec/truffle/capi/ext/truffleruby_thread_spec.c

+85
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
#include "ruby/thread.h"
1212
#include "rubyspec.h"
1313

14+
#include <errno.h>
15+
#include <time.h>
16+
#include <stdio.h>
17+
1418
#ifdef __cplusplus
1519
extern "C" {
1620
#endif
@@ -21,9 +25,90 @@ static VALUE thread_spec_rb_thread_call_without_gvl_native_function(VALUE self)
2125
return LONG2FIX(ret);
2226
}
2327

28+
static void* call_check_ints(void* arg) {
29+
rb_thread_check_ints();
30+
return NULL;
31+
}
32+
33+
static void* block_sleep(void* arg) {
34+
struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 };
35+
while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) {
36+
// Similar to how ossl_pkey.c does it
37+
rb_thread_call_with_gvl(call_check_ints, NULL);
38+
}
39+
return (void*) Qtrue;
40+
}
41+
42+
static VALUE thread_spec_rb_thread_call_without_gvl_unblock_signal(VALUE self) {
43+
return (VALUE) rb_thread_call_without_gvl(block_sleep, NULL, RUBY_UBF_IO, NULL);
44+
}
45+
46+
static void* block(void* arg) {
47+
int fd = *(int*)arg;
48+
char buffer = ' ';
49+
ssize_t r;
50+
51+
while (true) {
52+
ssize_t r = read(fd, &buffer, 1);
53+
if (r == 1) {
54+
if (buffer == 'D') { // done
55+
return (void*) Qtrue;
56+
} else if (buffer == 'U') { // unblock
57+
// Similar to how ossl_pkey.c does it
58+
rb_thread_call_with_gvl(call_check_ints, NULL);
59+
continue;
60+
} else {
61+
return (void*) rb_str_new(&buffer, 1);
62+
}
63+
} else {
64+
perror("read() in blocking function returned != 1");
65+
return (void*) Qfalse;
66+
}
67+
}
68+
}
69+
70+
static void unblock(void* arg) {
71+
int fd = *(int*)arg;
72+
char buffer = 'U';
73+
while (write(fd, &buffer, 1) == -1 && errno == EINTR) {
74+
// retry
75+
}
76+
}
77+
78+
static VALUE finish(void* arg) {
79+
int fd = *(int*)arg;
80+
81+
// Wait 1 second
82+
struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 };
83+
while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) {
84+
// Sleep the remaining amount
85+
}
86+
87+
char buffer = 'D';
88+
while (write(fd, &buffer, 1) == -1 && errno == EINTR) {
89+
// retry
90+
}
91+
return Qtrue;
92+
}
93+
94+
static VALUE thread_spec_rb_thread_call_without_gvl_unblock_custom_function(VALUE self) {
95+
int fds[2];
96+
if (pipe(fds) == -1) {
97+
rb_raise(rb_eRuntimeError, "could not create pipe");
98+
}
99+
100+
VALUE thread = rb_funcall(rb_block_proc(), rb_intern("call"), 1, INT2FIX(fds[1]));
101+
102+
rb_thread_call_without_gvl(block, &fds[0], unblock, &fds[1]);
103+
104+
return rb_funcall(thread, rb_intern("join"), 0);
105+
}
106+
24107
void Init_truffleruby_thread_spec(void) {
25108
VALUE cls = rb_define_class("CApiTruffleRubyThreadSpecs", rb_cObject);
26109
rb_define_method(cls, "rb_thread_call_without_gvl_native_function", thread_spec_rb_thread_call_without_gvl_native_function, 0);
110+
rb_define_method(cls, "rb_thread_call_without_gvl_unblock_signal", thread_spec_rb_thread_call_without_gvl_unblock_signal, 0);
111+
rb_define_method(cls, "rb_thread_call_without_gvl_unblock_custom_function", thread_spec_rb_thread_call_without_gvl_unblock_custom_function, 0);
27112
}
28113

29114
#ifdef __cplusplus

spec/truffle/capi/thread_spec.rb

+17-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
require_relative '../../ruby/optional/capi/spec_helper'
1010

11-
load_extension("truffleruby_thread")
11+
extension_path = load_extension("truffleruby_thread")
1212

1313
describe "TruffleRuby C-API Thread function" do
1414
before :each do
@@ -19,5 +19,21 @@
1919
it "runs a native function with the global lock unlocked" do
2020
@t.rb_thread_call_without_gvl_native_function.should == Process.pid
2121
end
22+
23+
it "is unblocked with RUBY_UBF_IO when using CPUSampler" do
24+
code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_signal"
25+
out = ruby_exe(code, options: '--cpusampler')
26+
out.should.include?('rb_thread_call_without_gvl_unblock_signal')
27+
out.should.include?('rb_thread_call_without_gvl')
28+
out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints
29+
end
30+
31+
it "is unblocked with a custom unblock function when using CPUSampler" do
32+
code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_custom_function { |fd| Thread.new { sleep 1; IO.for_fd(fd, autoclose: false).write 'D' } }"
33+
out = ruby_exe(code, options: '--cpusampler')
34+
out.should.include?('rb_thread_call_without_gvl_unblock_custom_function')
35+
out.should.include?('rb_thread_call_without_gvl')
36+
out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints
37+
end
2238
end
2339
end

spec/truffle/capi/unimplemented_spec.rb

+4-6
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111
extension_path = load_extension("unimplemented")
1212

1313
describe "Unimplemented functions in the C-API" do
14-
guard_not -> { Truffle::Boot.get_option('cexts-sulong') } do
15-
it "abort the process and show an error including the function name" do
16-
expected_status = platform_is(:darwin) ? :SIGABRT : 127
17-
out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status)
18-
out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/
19-
end
14+
it "abort the process and show an error including the function name" do
15+
expected_status = platform_is(:darwin) ? :SIGABRT : 127
16+
out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status)
17+
out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/
2018
end
2119
end

spec/truffleruby.mspec

+4
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ class MSpecScript
129129
else
130130
excludes << 'jvm'
131131
end
132+
133+
if Truffle::Boot.get_option('cexts-sulong')
134+
excludes << 'sulong'
135+
end
132136
end
133137

134138
if windows?

src/main/c/cext/call.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ void *rb_thread_call_with_gvl(gvl_call *function, void *data1) {
146146

147147
void* rb_thread_call_without_gvl(gvl_call *function, void *data1, rb_unblock_function_t *unblock_function, void *data2) {
148148
if (unblock_function == RUBY_UBF_IO) {
149-
unblock_function = (rb_unblock_function_t*) rb_tr_unwrap(Qnil);
149+
unblock_function = (rb_unblock_function_t*) NULL;
150150
}
151151

152152
return polyglot_invoke(RUBY_CEXT, "rb_thread_call_without_gvl", function, data1, unblock_function, data2);

src/main/c/rubysignal/src/rubysignal.c

+9
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,12 @@ JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_restoreSystemHa
5050
signal(signo, SIG_DFL);
5151
raise(signo);
5252
}
53+
54+
// Declaration copied from lib/cext/include/ruby/internal/intern/thread.h
55+
typedef void rb_unblock_function_t(void *);
56+
57+
JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_executeUnblockFunction(JNIEnv *env, jclass clazz, jlong function, jlong argument) {
58+
rb_unblock_function_t* unblock_function = (rb_unblock_function_t*) function;
59+
void* arg = (void*) argument;
60+
unblock_function(arg);
61+
}

src/main/java/org/truffleruby/core/thread/ThreadNodes.java

+8-46
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.TimeUnit;
4646

4747
import com.oracle.truffle.api.RootCallTarget;
48-
import com.oracle.truffle.api.TruffleContext;
4948
import com.oracle.truffle.api.TruffleSafepoint;
5049
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
5150
import com.oracle.truffle.api.dsl.Bind;
@@ -109,13 +108,13 @@
109108
import com.oracle.truffle.api.dsl.Cached;
110109
import com.oracle.truffle.api.dsl.ImportStatic;
111110
import com.oracle.truffle.api.dsl.Specialization;
112-
import com.oracle.truffle.api.interop.InteropException;
113111
import com.oracle.truffle.api.interop.InteropLibrary;
114112
import com.oracle.truffle.api.library.CachedLibrary;
115113
import com.oracle.truffle.api.nodes.Node;
116114
import com.oracle.truffle.api.object.Shape;
117115
import com.oracle.truffle.api.profiles.BranchProfile;
118116
import com.oracle.truffle.api.source.SourceSection;
117+
import org.truffleruby.signal.LibRubySignal;
119118

120119
import static org.truffleruby.language.SafepointPredicate.ALL_THREADS_AND_FIBERS;
121120

@@ -658,24 +657,18 @@ public abstract static class CallWithUnblockingFunctionNode extends PrimitiveArr
658657
@SuppressWarnings("truffle-neverdefault") // GR-43642
659658
@Specialization(limit = "getCacheLimit()")
660659
static Object call(
661-
RubyThread thread,
662-
Object wrapper,
663-
Object function,
664-
Object arg,
665-
Object unblockWrapper,
666-
Object unblocker,
667-
Object unblockerArg,
660+
RubyThread thread, Object wrapper, Object function, Object arg, long unblocker, long unblockerArg,
668661
@CachedLibrary("wrapper") InteropLibrary receivers,
669662
@Cached TranslateInteropExceptionNode translateInteropExceptionNode,
670663
@Bind("this") Node node,
671664
@Cached("new(node, receivers, translateInteropExceptionNode)") BlockingCallInterruptible blockingCallInterruptible) {
672665
var context = getContext(node);
673666
final ThreadManager threadManager = context.getThreadManager();
674667
final Interrupter interrupter;
675-
if (unblocker == nil) {
668+
if (unblocker == 0) {
676669
interrupter = threadManager.getNativeCallInterrupter();
677670
} else {
678-
interrupter = new CExtInterrupter(context, unblockWrapper, unblocker, unblockerArg);
671+
interrupter = new CExtInterrupter(unblocker, unblockerArg);
679672
}
680673

681674
final Object[] args = { function, arg };
@@ -690,48 +683,17 @@ static Object call(
690683

691684
private static final class CExtInterrupter implements Interrupter {
692685

693-
private final RubyContext context;
694-
private final Object wrapper;
695-
private final Object function;
696-
private final Object argument;
686+
private final long function;
687+
private final long argument;
697688

698-
public CExtInterrupter(RubyContext context, Object wrapper, Object function, Object argument) {
699-
assert InteropLibrary.getUncached().isExecutable(wrapper);
700-
this.context = context;
701-
this.wrapper = wrapper;
689+
public CExtInterrupter(long function, long argument) {
702690
this.function = function;
703691
this.argument = argument;
704692
}
705693

706694
@Override
707695
public void interrupt(Thread thread) {
708-
final TruffleContext truffleContext = context.getEnv().getContext();
709-
final boolean alreadyEntered = truffleContext.isEntered();
710-
Object prev = null;
711-
if (!alreadyEntered) {
712-
// We need to enter the context to execute this unblocking action, as it runs on Sulong.
713-
try {
714-
if (context.getOptions().SINGLE_THREADED) {
715-
throw new IllegalStateException("--single-threaded was passed");
716-
}
717-
prev = truffleContext.enter(null);
718-
} catch (IllegalStateException e) { // Multi threaded access denied from Truffle
719-
context.getLogger().severe(
720-
"could not unblock thread inside blocking call in C extension because " +
721-
"the context does not allow multithreading (" + e.getMessage() + ")");
722-
return;
723-
}
724-
}
725-
726-
try {
727-
InteropLibrary.getUncached().execute(wrapper, function, argument);
728-
} catch (InteropException e) {
729-
throw CompilerDirectives.shouldNotReachHere(e);
730-
} finally {
731-
if (!alreadyEntered) {
732-
truffleContext.leave(null, prev);
733-
}
734-
}
696+
LibRubySignal.executeUnblockFunction(function, argument);
735697
}
736698

737699
@Override

src/signal/java/org/truffleruby/signal/LibRubySignal.java

+2
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ public static void loadLibrary(String rubyHome, String libSuffix) {
2626

2727
public static native void restoreSystemHandlerAndRaise(int signalNumber);
2828

29+
public static native void executeUnblockFunction(long function, long argument);
30+
2931
}

0 commit comments

Comments
 (0)