Skip to content

Commit

Permalink
Optimize event serialization with pre-allocated buffer
Browse files Browse the repository at this point in the history
- Added event_buffer field to LlmpEventManager
- Used to_slice instead of to_allocvec
- Pre-allocated buffer size is 4KB

Fixes AFLplusplus#1082
  • Loading branch information
mzfr committed Dec 28, 2024
1 parent 5324799 commit c016c7f
Showing 1 changed file with 49 additions and 18 deletions.
67 changes: 49 additions & 18 deletions libafl/src/events/llmp/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ where
should_serialize_cnt: usize,
pub(crate) time_ref: Option<Handle<TimeObserver>>,
phantom: PhantomData<S>,
event_buffer: Vec<u8>,
}

impl LlmpEventManager<(), NopState<NopInput>, NopShMemProvider> {
Expand Down Expand Up @@ -165,6 +166,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(1024 * 4),
})
}

Expand Down Expand Up @@ -199,6 +201,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(1024 * 4),
})
}

Expand Down Expand Up @@ -233,6 +236,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(1024 * 4),
})
}

Expand Down Expand Up @@ -265,6 +269,7 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
time_ref,
phantom: PhantomData,
custom_buf_handlers: vec![],
event_buffer: Vec::with_capacity(1024 * 4),
})
}
}
Expand Down Expand Up @@ -410,6 +415,7 @@ where
+ EvaluatorObservers<E, Self, <S::Corpus as Corpus>::Input, S>
+ Evaluator<E, Self, <S::Corpus as Corpus>::Input, S>,
{
println!("Got event in client: {} from {:?}", event.name(), client_id);
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
Expand Down Expand Up @@ -520,24 +526,37 @@ where
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags = LLMP_FLAG_INITIALIZED;

match self.compressor.maybe_compress(&serialized) {
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
self.event_buffer.clear();
self.event_buffer.resize(self.event_buffer.capacity(), 0);

match postcard::to_slice(&event, &mut self.event_buffer) {
Ok(written) => {
let written_len = written.len();
match self
.compressor
.maybe_compress(&self.event_buffer[..written_len])
{
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
}
self.last_sent = current_time();
Ok(())
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Err(postcard::Error::SerializeBufferFull) => {
return Err(Error::serialize("Buffer full"));
}
Err(e) => Err(Error::from(e)),
}
self.last_sent = current_time();

Ok(())
}

#[cfg(not(feature = "llmp_compression"))]
Expand All @@ -546,11 +565,23 @@ where
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Ok(())
}
self.event_buffer.clear();
self.event_buffer.resize(self.event_buffer.capacity(), 0);

match postcard::to_slice(&event, &mut self.event_buffer) {
Ok(written) => {
let written_len = written.len();

self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;

self.last_sent = current_time();
Ok(())
}
Err(postcard::Error::SerializeBufferFull) => Err(Error::serialize("Buffer full")),
Err(e) => Err(Error::from(e)),
}
}
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
where
OT: ObserversTuple<Self::Input, Self::State> + Serialize,
Expand Down

0 comments on commit c016c7f

Please sign in to comment.