Skip to content

Commit 39bd798

Browse files
committed
Binary serialization over pipes
We use `UnixStream::pair()` a few times and reimplement binary ser/de each time. Add an abstraction that does the serde steps (`DeSerialize`) and buffer ceremony (`BinPipe`) for us. Fixes #471.
1 parent 6d40763 commit 39bd798

File tree

5 files changed

+226
-85
lines changed

5 files changed

+226
-85
lines changed

src/common/bin_serde.rs

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
//! Binary serialization, and an implementation over Unix pipes.
2+
use sealed::DeSerializeBytes;
3+
use std::{
4+
io::{self, Read, Write},
5+
marker::PhantomData,
6+
os::{fd::AsRawFd, unix::net::UnixStream},
7+
};
8+
9+
mod sealed {
10+
pub trait DeSerializeBytes {
11+
fn zero_init() -> Self;
12+
fn as_mut_ref(&mut self) -> &mut [u8];
13+
}
14+
15+
impl<const N: usize> DeSerializeBytes for [u8; N] {
16+
fn zero_init() -> [u8; N] {
17+
[0; N]
18+
}
19+
fn as_mut_ref(&mut self) -> &mut [u8] {
20+
self.as_mut_slice()
21+
}
22+
}
23+
}
24+
25+
/// Serialization/deserialization trait using a byte array as storage.
26+
pub trait DeSerialize {
27+
/// Usually `[u8; std::mem::size_of::<Self>()]`.
28+
type Bytes: sealed::DeSerializeBytes;
29+
fn serialize(&self) -> Self::Bytes;
30+
fn deserialize(bytes: Self::Bytes) -> Self;
31+
}
32+
33+
/// A binary pipe that can send and recieve typed messages.
34+
///
35+
/// By default, if only one generic is included,
36+
/// the types of the [BinPipe::write()] and [BinPipe::read()] messages
37+
/// are the same.
38+
pub struct BinPipe<R: DeSerialize, W: DeSerialize = R> {
39+
sock: UnixStream,
40+
_read_marker: PhantomData<R>,
41+
_write_marker: PhantomData<W>,
42+
}
43+
44+
impl<R: DeSerialize, W: DeSerialize> BinPipe<R, W> {
45+
/// A pipe abstracting over a [UnixStream] with easier
46+
/// binary serialization, to help with the buffer sizes and ser/de steps.
47+
/// Uses [UnixStream::pair()].
48+
pub fn pair() -> io::Result<(BinPipe<R, W>, BinPipe<W, R>)> {
49+
let (first, second) = UnixStream::pair()?;
50+
Ok((
51+
BinPipe {
52+
sock: first,
53+
_read_marker: PhantomData::<R>,
54+
_write_marker: PhantomData::<W>,
55+
},
56+
// R and W are inverted here since the type of what's written in one
57+
// pipe is read in the other, and vice versa.
58+
BinPipe {
59+
sock: second,
60+
_read_marker: PhantomData::<W>,
61+
_write_marker: PhantomData::<R>,
62+
},
63+
))
64+
}
65+
66+
/// Read a `R` from the pipe.
67+
pub fn read(&mut self) -> io::Result<R> {
68+
let mut bytes = R::Bytes::zero_init();
69+
self.sock.read_exact(bytes.as_mut_ref())?;
70+
Ok(R::deserialize(bytes))
71+
}
72+
73+
/// Write a `W` to the pipe.
74+
pub fn write(&mut self, bytes: &W) -> io::Result<()> {
75+
self.sock.write_all(bytes.serialize().as_mut_ref())?;
76+
Ok(())
77+
}
78+
79+
/// Calls [std::net::TcpStream::set_nonblocking] on the underlying socket.
80+
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
81+
self.sock.set_nonblocking(nonblocking)
82+
}
83+
}
84+
85+
impl<R: DeSerialize, W: DeSerialize> AsRawFd for BinPipe<R, W> {
86+
fn as_raw_fd(&self) -> std::os::fd::RawFd {
87+
self.sock.as_raw_fd()
88+
}
89+
}
90+
91+
impl DeSerialize for i32 {
92+
type Bytes = [u8; std::mem::size_of::<Self>()];
93+
94+
fn serialize(&self) -> Self::Bytes {
95+
self.to_ne_bytes()
96+
}
97+
fn deserialize(bytes: Self::Bytes) -> Self {
98+
Self::from_ne_bytes(bytes)
99+
}
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use super::*;
105+
106+
#[test]
107+
pub fn single_type() {
108+
let (mut tx, mut rx) = BinPipe::pair().unwrap();
109+
tx.write(&42i32).unwrap();
110+
assert_eq!(rx.read().unwrap(), 42);
111+
rx.write(&23i32).unwrap();
112+
assert_eq!(tx.read().unwrap(), 23);
113+
}
114+
115+
#[test]
116+
pub fn different_types() {
117+
impl DeSerialize for u8 {
118+
type Bytes = [u8; std::mem::size_of::<Self>()];
119+
fn serialize(&self) -> [u8; 1] {
120+
self.to_ne_bytes()
121+
}
122+
fn deserialize(bytes: [u8; 1]) -> Self {
123+
Self::from_ne_bytes(bytes)
124+
}
125+
}
126+
127+
let (mut tx, mut rx) = BinPipe::pair().unwrap();
128+
tx.write(&42i32).unwrap();
129+
assert_eq!(rx.read().unwrap(), 42);
130+
rx.write(&23u8).unwrap();
131+
assert_eq!(tx.read().unwrap(), 23);
132+
}
133+
}

src/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub use command::CommandAndArguments;
55
pub use context::Context;
66
pub use error::Error;
77

8+
pub mod bin_serde;
89
pub mod command;
910
pub mod context;
1011
pub mod error;

src/exec/no_pty.rs

+13-17
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
use std::{
2-
ffi::c_int,
3-
io::{self, Read, Write},
4-
os::unix::{net::UnixStream, process::CommandExt},
5-
process::Command,
6-
};
1+
use std::{ffi::c_int, io, os::unix::process::CommandExt, process::Command};
72

83
use super::{
94
event::PollEvent,
105
event::{EventRegistry, Process, StopReason},
116
io_util::was_interrupted,
127
terminate_process, ExitReason, HandleSigchld, ProcessOutput,
138
};
14-
use crate::system::signal::{
15-
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber, SignalSet,
16-
SignalStream,
9+
use crate::{
10+
common::bin_serde::BinPipe,
11+
system::signal::{
12+
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber,
13+
SignalSet, SignalStream,
14+
},
1715
};
1816
use crate::{
1917
exec::{handle_sigchld, opt_fmt, signal_fmt},
@@ -46,7 +44,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
4644
// FIXME (ogsudo): Some extra config happens here if selinux is available.
4745

4846
// Use a pipe to get the IO error if `exec` fails.
49-
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
47+
let (mut errpipe_tx, errpipe_rx) = BinPipe::pair()?;
5048

5149
// Don't close the error pipe as we need it to retrieve the error code if the command execution
5250
// fails.
@@ -72,7 +70,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
7270
// If `exec` returns, it means that executing the command failed. Send the error to the
7371
// monitor using the pipe.
7472
if let Some(error_code) = err.raw_os_error() {
75-
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
73+
errpipe_tx.write(&error_code).ok();
7674
}
7775

7876
return Ok(ProcessOutput::ChildExit);
@@ -108,7 +106,7 @@ struct ExecClosure {
108106
command_pid: Option<ProcessId>,
109107
sudo_pid: ProcessId,
110108
parent_pgrp: ProcessId,
111-
errpipe_rx: UnixStream,
109+
errpipe_rx: BinPipe<i32>,
112110
signal_stream: &'static SignalStream,
113111
signal_handlers: [SignalHandler; ExecClosure::SIGNALS.len()],
114112
}
@@ -122,7 +120,7 @@ impl ExecClosure {
122120
fn new(
123121
command_pid: ProcessId,
124122
sudo_pid: ProcessId,
125-
errpipe_rx: UnixStream,
123+
errpipe_rx: BinPipe<i32>,
126124
registry: &mut EventRegistry<Self>,
127125
) -> io::Result<Self> {
128126
registry.register_event(&errpipe_rx, PollEvent::Readable, |_| ExecEvent::ErrPipe);
@@ -287,13 +285,11 @@ impl Process for ExecClosure {
287285
match event {
288286
ExecEvent::Signal => self.on_signal(registry),
289287
ExecEvent::ErrPipe => {
290-
let mut buf = 0i32.to_ne_bytes();
291-
match self.errpipe_rx.read_exact(&mut buf) {
288+
match self.errpipe_rx.read() {
292289
Err(err) if was_interrupted(&err) => { /* Retry later */ }
293290
Err(err) => registry.set_break(err),
294-
Ok(_) => {
291+
Ok(error_code) => {
295292
// Received error code from the command, forward it to the parent.
296-
let error_code = i32::from_ne_bytes(buf);
297293
registry.set_break(io::Error::from_raw_os_error(error_code));
298294
}
299295
}

0 commit comments

Comments
 (0)