Skip to content

Commit

Permalink
Resolve native thread ID through SafeAccess
Browse files Browse the repository at this point in the history
  • Loading branch information
jbachorik committed Jan 9, 2025
1 parent fe6aba3 commit 6e8a436
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 40 deletions.
2 changes: 1 addition & 1 deletion ddprof-lib/src/main/cpp/vmStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class VMThread : VMStructs {

int osThreadId() {
const char *osthread = *(const char **)at(_thread_osthread_offset);
return *(int *)(osthread + _osthread_id_offset);
return (int)SafeAccess::load32((u32*)(osthread + _osthread_id_offset), (u32)0xffffffff);
}

int state() {
Expand Down
36 changes: 21 additions & 15 deletions ddprof-lib/src/main/cpp/wallClock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,26 @@ void WallClockJVMTI::timerLoop() {

jint threads_count = 0;
jthread* threads_ptr = nullptr;
jvmti->GetAllThreads(&threads_count, &threads_ptr);
jvmtiError err = jvmti->GetAllThreads(&threads_count, &threads_ptr);
if (err == JVMTI_ERROR_NONE && threads_ptr != nullptr) {
bool do_filter = Profiler::instance()->threadFilter()->enabled();
int self = OS::threadId();

bool do_filter = Profiler::instance()->threadFilter()->enabled();
int self = OS::threadId();

for (int i = 0; i < threads_count; i++) {
jthread thread = threads_ptr[i];
if (thread != nullptr) {
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
if (nThread == nullptr) {
continue;
}
int tid = nThread->osThreadId();
if (tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
threads.push_back({nThread, thread});
for (int i = 0; i < threads_count; i++) {
jthread thread = threads_ptr[i];
if (thread != nullptr) {
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
if (nThread == nullptr) {
continue;
}
int tid = VMThread::nativeThreadId(jni, thread);
if (tid != -1 && tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
threads.push_back({nThread, thread});
}
}
}
jvmti->Deallocate((unsigned char*)threads_ptr);
}
jvmti->Deallocate((unsigned char*)threads_ptr);
};

auto sampleThreads = [&](ThreadEntry& thread_entry, int& num_failures, int& threads_already_exited, int& permission_denied) {
Expand All @@ -206,6 +207,11 @@ void WallClockJVMTI::timerLoop() {
}
ExecutionEvent event;
VMThread* vm_thread = thread_entry.native;
if (vm_thread == nullptr) {
num_failures++;
return false;
}

int raw_thread_state = vm_thread->state();
bool is_initialized = raw_thread_state >= JVMJavaThreadState::_thread_in_native &&
raw_thread_state < JVMJavaThreadState::_thread_max_state;
Expand Down
51 changes: 27 additions & 24 deletions ddprof-lib/src/main/cpp/wallClock.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,38 @@ class BaseWallClock : public Engine {
ReservoirSampler<ThreadType> reservoir(reservoirSize);

while (_running.load(std::memory_order_relaxed)) {
u64 start_ts = OS::nanotime();
collectThreads(threads);
int size = threads.size();
if (threads.size() > 0) {
int num_failures = 0;
int threads_already_exited = 0;
int permission_denied = 0;
std::vector<ThreadType> sample = reservoir.sample(threads);
for (ThreadType thread : sample) {
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
continue;
}
}

int num_failures = 0;
int threads_already_exited = 0;
int permission_denied = 0;
std::vector<ThreadType> sample = reservoir.sample(threads);
for (ThreadType thread : sample) {
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
continue;
epoch.updateNumSamplableThreads(threads.size());
epoch.updateNumFailedSamples(num_failures);
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
epoch.updateNumExitedThreads(threads_already_exited);
epoch.updateNumPermissionDenied(permission_denied);
u64 endTime = TSC::ticks();
u64 duration = TSC::ticks_to_millis(endTime - startTime);
if (epoch.hasChanged() || duration >= 1000) {
epoch.endEpoch(duration);
Profiler::instance()->recordWallClockEpoch(self, &epoch);
epoch.newEpoch(endTime);
startTime = endTime;
} else {
epoch.clean();
}
}

epoch.updateNumSamplableThreads(threads.size());
epoch.updateNumFailedSamples(num_failures);
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
epoch.updateNumExitedThreads(threads_already_exited);
epoch.updateNumPermissionDenied(permission_denied);
u64 endTime = TSC::ticks();
u64 duration = TSC::ticks_to_millis(endTime - startTime);
if (epoch.hasChanged() || duration >= 1000) {
epoch.endEpoch(duration);
Profiler::instance()->recordWallClockEpoch(self, &epoch);
epoch.newEpoch(endTime);
startTime = endTime;
} else {
epoch.clean();
threads.clear();
}

threads.clear();
// Get a random sleep duration
// clamp the random interval to <1,2N-1>
// the probability of clamping is extremely small, close to zero
Expand Down

0 comments on commit 6e8a436

Please sign in to comment.