Skip to content

Commit

Permalink
Merge pull request #277 from rstudio/feature/stream-output-message
Browse files Browse the repository at this point in the history
Add a new message type to separate runtime output from stderr/stdout output
  • Loading branch information
jmcphers authored Mar 14, 2023
2 parents 2790c7f + 3af6502 commit babbb3b
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 72 deletions.
11 changes: 4 additions & 7 deletions extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,10 @@ export class LanguageRuntimeAdapter
id: message.msgId,
parent_id: message.originId,
when: message.when,
type: data.name === 'stderr' ?
positron.LanguageRuntimeMessageType.Error :
positron.LanguageRuntimeMessageType.Output,
data: {
'text/plain': data.text
} as any
} as positron.LanguageRuntimeOutput);
type: positron.LanguageRuntimeMessageType.Stream,
name: data.name,
text: data.text
} as positron.LanguageRuntimeStream);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* stream_capture.rs
*
Expand Down Expand Up @@ -32,7 +31,10 @@ impl StreamCapture {
/// Does not return.
pub fn listen(&self) {
if let Err(err) = Self::output_capture(self.iopub_tx.clone()) {
warn!("Error capturing output; stdout/stderr won't be forwarded: {}", err);
warn!(
"Error capturing output; stdout/stderr won't be forwarded: {}",
err
);
};
}

Expand Down Expand Up @@ -62,7 +64,7 @@ impl StreamCapture {
}
warn!("Error polling for stream data: {}", e);
continue;
}
},
};

// No data available; likely timed out waiting for data. Try again.
Expand All @@ -72,7 +74,6 @@ impl StreamCapture {

// See which stream has data available.
for poll_fd in poll_fds.iter() {

// Skip this fd if it doesn't have any new events.
let revents = match poll_fd.revents() {
Some(r) => r,
Expand All @@ -97,7 +98,7 @@ impl StreamCapture {
Self::fd_to_iopub(fd, stream, iopub_tx.clone());
}
}
};
}
warn!("Stream capture thread exiting after interrupt");
Ok(())
}
Expand All @@ -111,7 +112,7 @@ impl StreamCapture {
Err(e) => {
warn!("Error reading stream data: {}", e);
return;
}
},
};

// No bytes read? Nothing to send.
Expand All @@ -121,7 +122,10 @@ impl StreamCapture {

// Convert the UTF-8 bytes to a string.
let data = String::from_utf8_lossy(&buf[..count]).to_string();
let output = StreamOutput{stream, text: data };
let output = StreamOutput {
name: stream,
text: data,
};

// Create and send the IOPub
let message = IOPubMessage::Stream(output);
Expand All @@ -138,7 +142,7 @@ impl StreamCapture {
Ok((read, write)) => (read, write),
Err(e) => {
return Err(Error::SysError(format!("create socket for {}", fd), e));
}
},
};

// Redirect the stream into the write end of the pipe
Expand All @@ -149,7 +153,8 @@ impl StreamCapture {
// Make reads non-blocking on the read end of the pipe
if let Err(e) = nix::fcntl::fcntl(
read,
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK)) {
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK),
) {
return Err(Error::SysError(format!("set non-blocking for {}", fd), e));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
/// Represents a message from the front end to indicate stream output
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StreamOutput {
/// The stream for which output is being emitted
pub stream: Stream,
/// The name of the stream for which output is being emitted
pub name: Stream,

/// The output emitted on the stream
pub text: String,
Expand Down
51 changes: 19 additions & 32 deletions extensions/positron-r/amalthea/crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static mut CONSOLE_RECV: Option<Mutex<Receiver<Option<String>>>> = None;
static INIT: Once = Once::new();

pub unsafe fn process_events() {

// Process regular R events.
R_ProcessEvents();

Expand All @@ -97,11 +96,9 @@ pub unsafe fn process_events() {

// Render pending plots.
graphics_device::on_process_events();

}

fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) {

// TODO: What if the input is too large for the buffer?
input.push_str("\n");
if input.len() > buflen as usize {
Expand All @@ -113,7 +110,6 @@ fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) {
unsafe {
libc::strcpy(buf as *mut c_char, src.as_ptr());
}

}

/// Invoked by R to read console input from the user.
Expand Down Expand Up @@ -159,14 +155,11 @@ pub extern "C" fn r_read_console(
// descriptors that R has open and select() on those for
// available data?
loop {

// Release the R runtime lock while we're waiting for input.
unsafe { R_RUNTIME_LOCK_GUARD = None };

match receiver.recv_timeout(Duration::from_millis(200)) {

Ok(response) => {

// Take back the lock after we've received some console input.
unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };

Expand All @@ -178,37 +171,28 @@ pub extern "C" fn r_read_console(
}

return 1;

}
},

Err(error) => {

unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };

use RecvTimeoutError::*;
match error {

Timeout => {

// Process events.
unsafe { process_events() };

// Keep waiting for console input.
continue;

}
},

Disconnected => {

return 1;

}
},
}
}
},
}

}

}

/**
Expand All @@ -218,7 +202,11 @@ pub extern "C" fn r_read_console(
pub extern "C" fn r_write_console(buf: *const c_char, _buflen: i32, otype: i32) {
let content = unsafe { CStr::from_ptr(buf) };
let mutex = unsafe { KERNEL.as_ref().unwrap() };
let stream = if otype == 1 { Stream::Stdout } else { Stream::Stderr };
let stream = if otype == 0 {
Stream::Stdout
} else {
Stream::Stderr
};
let mut kernel = mutex.lock().unwrap();
kernel.write_console(content.to_str().unwrap(), stream);
}
Expand All @@ -236,7 +224,7 @@ pub extern "C" fn r_show_message(buf: *const c_char) {
let kernel = mutex.lock().unwrap();

// Create an event representing the message
let event = PositronEvent::ShowMessage(ShowMessageEvent{
let event = PositronEvent::ShowMessage(ShowMessageEvent {
message: message.to_str().unwrap().to_string(),
});

Expand All @@ -255,24 +243,24 @@ pub extern "C" fn r_busy(which: i32) {
let kernel = mutex.lock().unwrap();

// Create an event representing the new busy state
let event = PositronEvent::Busy(BusyEvent{
busy: which != 0,
});
let event = PositronEvent::Busy(BusyEvent { busy: which != 0 });

// Have the kernel deliver the event to the front end
kernel.send_event(event);
}

#[no_mangle]
pub unsafe extern "C" fn r_polled_events() {

// Check for pending tasks.
let count = R_RUNTIME_LOCK_COUNT.load(std::sync::atomic::Ordering::Acquire);
if count == 0 {
return;
}

info!("{} thread(s) are waiting; the main thread is releasing the R runtime lock.", count);
info!(
"{} thread(s) are waiting; the main thread is releasing the R runtime lock.",
count
);
let now = SystemTime::now();

// Release the lock. This drops the lock, and gives other threads
Expand All @@ -282,8 +270,10 @@ pub unsafe extern "C" fn r_polled_events() {
// Take the lock back.
R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock());

info!("The main thread re-acquired the R runtime lock after {} milliseconds.", now.elapsed().unwrap().as_millis());

info!(
"The main thread re-acquired the R runtime lock after {} milliseconds.",
now.elapsed().unwrap().as_millis()
);
}

pub fn start_r(
Expand Down Expand Up @@ -313,7 +303,6 @@ pub fn start_r(
thread::spawn(move || listen(shell_request_rx, rprompt_rx));

unsafe {

let mut args = cargs!["ark", "--interactive"];
R_running_as_main_program = 1;
R_SignalHandlers = 0;
Expand Down Expand Up @@ -407,7 +396,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver<String>) {
// If the current prompt doesn't match the default prompt, assume that
// we're reading use input, e.g. via 'readline()'.
if prompt != default_prompt {

trace!("Got R prompt '{}', asking user for input", prompt);
if let Request::ExecuteCode(_, originator, _) = req {
kernel.request_input(originator, &prompt);
Expand All @@ -424,7 +412,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver<String>) {
// Default prompt, finishing request
trace!("Got R prompt '{}', completing execution", prompt);
return kernel.finish_request();

}

pub fn listen(exec_recv: Receiver<Request>, prompt_recv: Receiver<String>) {
Expand Down
Loading

0 comments on commit babbb3b

Please sign in to comment.