Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
WIP local queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita committed Feb 6, 2021
1 parent c31f2ff commit aa003eb
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
P: Processor + Send,
{
fn parse(&mut self, msg: &Message) -> Result<Url> {
let url= msg.parse(|data| {
let url = msg.parse(|data| {
let data = std::str::from_utf8(data)?;
Ok(Url::parse(data)?)
})?;
Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-agent/src/tasks/merge/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
verbose!("tmp dir reset");
utils::reset_tmp_dir(&tmp_dir).await?;
config.unique_inputs.sync_pull().await?;
let mut queue = QueueClient::new(config.input_queue.clone());
let queue = QueueClient::new(config.input_queue.clone());
if let Some(msg) = queue.pop().await? {
let input_url = msg.parse(utils::parse_url_data);
let input_url = match input_url {
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
} else {
warn!("no new candidate inputs found, sleeping");
delay_with_jitter(EMPTY_QUEUE_DELAY).await;
}
};
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn process_message(config: Arc<Config>, mut input_queue: QueueClient) -> R
utils::reset_tmp_dir(tmp_dir).await?;

if let Some(msg) = input_queue.pop().await? {
let input_url= msg.parse(|data| {
let input_url = msg.parse(|data| {
let data = std::str::from_utf8(data)?;
Ok(Url::parse(data)?)
});
Expand Down
20 changes: 10 additions & 10 deletions src/agent/onefuzz-supervisor/src/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,16 @@ impl WorkQueue {
}

pub async fn poll(&mut self) -> Result<Option<Message>> {
let mut msg = self.queue.pop().await;

// If we had an auth err, renew our registration and retry once, in case
// it was just due to a stale SAS URL.
if let Err(err) = &msg {
if is_auth_error(err) {
self.renew().await?;
msg = self.queue.pop().await;
}
}
let msg = self.queue.pop().await;

// // If we had an auth err, renew our registration and retry once, in case
// // it was just due to a stale SAS URL.
// if let Err(err) = &msg {
// if is_auth_error(err) {
// self.renew().await?;
// msg = self.queue.pop().await;
// }
// }

// Now we've had a chance to ensure our SAS URL is fresh. For any other
// error, including another auth error, bail.
Expand Down
2 changes: 1 addition & 1 deletion src/agent/storage-queue/src/azure_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl AzureQueueClient {
Ok(())
}

pub async fn pop(&mut self) -> Result<Option<AzureQueueMessage>> {
pub async fn pop(&self) -> Result<Option<AzureQueueMessage>> {
let response = self
.http
.get(self.messages_url.clone())
Expand Down
42 changes: 20 additions & 22 deletions src/agent/storage-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// Licensed under the MIT License.

use anyhow::{bail, Result};
use reqwest::{Client, Url};
use reqwest_retry::SendRetry;
use reqwest::Url;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::time::Duration;
use uuid::Uuid;
Expand All @@ -13,9 +12,11 @@ pub mod azure_queue;
pub mod local_queue;

use azure_queue::{AzureQueueClient, AzureQueueMessage};
use local_queue::{LocalQueueClient, LocalQueueMessage};

pub enum QueueClient {
AzureQueue(AzureQueueClient),
LocalQueue(LocalQueueClient),
}

impl QueueClient {
Expand All @@ -26,23 +27,28 @@ impl QueueClient {
pub async fn enqueue(&self, data: impl Serialize) -> Result<()> {
match self {
QueueClient::AzureQueue(queue_client) => queue_client.enqueue(data).await,
QueueClient::LocalQueue(queue_client) => queue_client.enqueue(data).await,
}
}

pub async fn pop(&mut self) -> Result<Option<Message>> {
pub async fn pop(&self) -> Result<Option<Message>> {
match self {
QueueClient::AzureQueue(queue_client) => {
let message = queue_client.pop().await?;
Ok(message.map(Message::QueueMessage))
}
QueueClient::LocalQueue(queue_client) => {
let message = queue_client.pop().await?;
Ok(message.map(Message::LocalQueueMessage))
}
}
}
}

// #[derive(Clone)]
pub enum Message {
QueueMessage(AzureQueueMessage),
// LocalQueueMessage(LocalQueueMessage<'a, T>)
LocalQueueMessage(LocalQueueMessage),
}

#[derive(Clone, Debug, Eq, PartialEq)]
Expand All @@ -61,39 +67,31 @@ impl Message {
let data = message.get()?;
Ok(data)
}
// Message::LocalQueueMessage(message) => {
// Ok(serde_json::from_slice(&*message.data)?)
// }
Message::LocalQueueMessage(message) => Ok(serde_json::from_slice(&*message.data)?),
}
}

pub async fn claim<T: DeserializeOwned>(self) -> Result<T> {
match self {
Message::QueueMessage(message) => Ok(message.claim().await?), // Message::LocalQueueMessage(message) => {
// let value = message.data.into_inner();
// Ok(serde_json::from_slice(value))

// }
Message::QueueMessage(message) => Ok(message.claim().await?),
Message::LocalQueueMessage(message) => Ok(serde_json::from_slice(&message.data)?),
}
}

pub async fn delete(self) -> Result<()> {
match self {
Message::QueueMessage(message) => Ok(message.delete().await?), // Message::LocalQueueMessage(message) => {
// let value = message.data.into_inner();
// Ok(serde_json::from_slice(value))

// }
Message::QueueMessage(message) => Ok(message.delete().await?),
Message::LocalQueueMessage(_) => {
// message.data.commit();
Ok(())
}
}
}

pub fn parse<T>(&self, parser: impl FnOnce(&[u8]) -> Result<T>) -> Result<T> {
match self {
Message::QueueMessage(message) => message.parse(parser), // Message::LocalQueueMessage(message) => {
// let value = message.data.into_inner();
// Ok(serde_json::from_slice(value))

// }
Message::QueueMessage(message) => message.parse(parser),
Message::LocalQueueMessage(message) => parser(&*message.data),
}
}

Expand Down
46 changes: 31 additions & 15 deletions src/agent/storage-queue/src/local_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,58 @@
use anyhow::{bail, Result};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::{borrow::Borrow, path::Path};
use std::{io::Read, time::Duration};
use tokio::sync::Mutex;
use tokio::time::delay_for;
use uuid::Uuid;

use yaque::{self, channel, queue::RecvGuard, Sender};

pub const EMPTY_QUEUE_DELAY: Duration = Duration::from_secs(10);

pub struct LocalQueueMessage<'a> {
pub data: RecvGuard<'a, Vec<u8>>,
pub struct LocalQueueMessage {
pub data: Vec<u8>,
}

pub struct LocalQueueClient {
sender: yaque::Sender,
receiver: yaque::Receiver,
sender: Mutex<yaque::Sender>,
receiver: Mutex<yaque::Receiver>,
}

impl LocalQueueClient {
pub fn new(queue_url: impl AsRef<Path>) -> Result<Self> {
let (sender, receiver) = yaque::channel(queue_url)?;
Ok(LocalQueueClient { sender, receiver })
Ok(LocalQueueClient {
sender: Mutex::new(sender),
receiver: Mutex::new(receiver),
})
}

pub async fn enqueue(&mut self, data: impl Serialize) -> Result<()> {
pub async fn enqueue(&self, data: impl Serialize) -> Result<()> {
let body = serde_xml_rs::to_string(&data).unwrap();
self.sender.send(body.as_bytes())?;
Ok(())
match self.sender.try_lock() {
Ok(ref mut sender) => {
sender.send(body.as_bytes())?;
Ok(())
}
Err(_) => bail!("cant enqueue"),
}
}

pub async fn pop(&mut self) -> Result<Option<RecvGuard<'_, Vec<u8>>>> {
let data = self
.receiver
.recv_timeout(tokio::time::delay_for(Duration::from_secs(1)))
.await?;
pub async fn pop(&self) -> Result<Option<LocalQueueMessage>> {
match self.receiver.try_lock() {
Ok(ref mut receiver) => {
let data = receiver
.recv_timeout(tokio::time::delay_for(Duration::from_secs(1)))
.await?;

Ok(data)
Ok(data.map(|data| LocalQueueMessage {
data: data.into_inner(),
}))
}
Err(_) => bail!("cant enqueue"),
}
}

// pub async fn pop(&mut self) -> Result<Option<Message>> {
Expand Down

0 comments on commit aa003eb

Please sign in to comment.