Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to nom7 #13

Merged
merged 3 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ anyhow = "1.0"
futures = "0.3"
tokio = { version = "1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec"] }
nom = "4"
nom = "7"

[dev-dependencies]
tokio = { version = "1", features = ["time", "macros", "rt-multi-thread"] }
8 changes: 7 additions & 1 deletion examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use tokio_stomp_2::*;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let conn = client::connect("127.0.0.1:61613", None, None).await?;
let conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;

tokio::time::sleep(Duration::from_millis(200)).await;

Expand Down
8 changes: 7 additions & 1 deletion examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ use tokio_stomp_2::*;
// `docker run -p 61613:61613 rmohr/activemq:latest`

async fn client(listens: &str, sends: &str, msg: &[u8]) -> Result<(), anyhow::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None).await?;
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;
conn.send(client::subscribe(listens, "myid")).await?;

loop {
Expand Down
11 changes: 8 additions & 3 deletions examples/receive_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ use tokio_stomp_2::FromServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None)
.await
.unwrap();
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();

conn.send(client::subscribe("queue.test", "custom-subscriber-id"))
.await
Expand Down
11 changes: 8 additions & 3 deletions examples/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ use tokio_stomp_2::ToServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None)
.await
.unwrap();
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();

conn.send(
ToServer::Send {
Expand Down
19 changes: 10 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::net::ToSocketAddrs;

use bytes::{Buf, BytesMut};
use futures::prelude::*;
use futures::sink::SinkExt;
Expand All @@ -16,29 +14,32 @@
/// Connect to a STOMP server via TCP, including the connection handshake.
/// If successful, returns a tuple of a message stream and a sender,
/// which may be used to receive and send messages respectively.
///
/// `virtualhost` If no specific virtualhost is desired, it is recommended
/// to set this to the same as the host name that the socket
/// was established against (i.e, the same as the server address).
pub async fn connect(
address: impl Into<String>,
server: impl tokio::net::ToSocketAddrs,
virtualhost: impl Into<String>,

Check warning on line 23 in src/client.rs

View check run for this annotation

Codecov / codecov/patch

src/client.rs#L22-L23

Added lines #L22 - L23 were not covered by tests
login: Option<String>,
passcode: Option<String>,
) -> Result<ClientTransport> {
let address = address.into();
let addr = address.as_str().to_socket_addrs().unwrap().next().unwrap();
let tcp = TcpStream::connect(&addr).await?;
let tcp = TcpStream::connect(server).await?;

Check warning on line 27 in src/client.rs

View check run for this annotation

Codecov / codecov/patch

src/client.rs#L27

Added line #L27 was not covered by tests
let mut transport = ClientCodec.framed(tcp);
client_handshake(&mut transport, address, login, passcode).await?;
client_handshake(&mut transport, virtualhost.into(), login, passcode).await?;

Check warning on line 29 in src/client.rs

View check run for this annotation

Codecov / codecov/patch

src/client.rs#L29

Added line #L29 was not covered by tests
Ok(transport)
}

async fn client_handshake(
transport: &mut ClientTransport,
host: String,
virtualhost: String,

Check warning on line 35 in src/client.rs

View check run for this annotation

Codecov / codecov/patch

src/client.rs#L35

Added line #L35 was not covered by tests
login: Option<String>,
passcode: Option<String>,
) -> Result<()> {
let connect = Message {
content: ToServer::Connect {
accept_version: "1.2".into(),
host,
host: virtualhost,

Check warning on line 42 in src/client.rs

View check run for this annotation

Codecov / codecov/patch

src/client.rs#L42

Added line #L42 was not covered by tests
login,
passcode,
heartbeat: None,
Expand Down
90 changes: 46 additions & 44 deletions src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
use anyhow::{anyhow, bail};
use bytes::{BufMut, BytesMut};

use nom::{
bytes::streaming::{is_not, tag, take, take_until},
character::streaming::{alpha1, line_ending, not_line_ending},
combinator::{complete, opt},
multi::{count, many0, many_till},
sequence::{delimited, separated_pair, terminated, tuple},
IResult, Parser,
};

use std::borrow::Cow;

use crate::{AckMode, FromServer, Message, Result, ToServer};

type HeaderTuple<'a> = (&'a [u8], Option<Cow<'a, [u8]>>);
type Header<'a> = (&'a [u8], Cow<'a, [u8]>);

#[derive(Debug)]
pub(crate) struct Frame<'a> {
command: &'a [u8],
// TODO use ArrayVec to keep headers on the stack
// (makes this object zero-allocation)
headers: Vec<(&'a [u8], Cow<'a, [u8]>)>,
headers: Vec<Header<'a>>,
body: Option<&'a [u8]>,
}

Expand Down Expand Up @@ -75,22 +84,6 @@ impl<'a> Frame<'a> {
}

// Nom definitions

named!(eol, preceded!(opt!(tag!("\r")), tag!("\n")));

named!(
parse_header<(&[u8], Cow<[u8]>)>,
pair!(
take_until_either!(":\n"),
preceded!(
tag!(":"),
map!(take_until_and_consume1!("\n"), |bytes| Cow::Borrowed(
strip_cr(bytes)
))
)
)
);

fn get_content_length(headers: &[(&[u8], Cow<[u8]>)]) -> Option<u32> {
for h in headers {
if h.0 == b"content-length" {
Expand All @@ -110,33 +103,42 @@ fn is_empty_slice(s: &[u8]) -> Option<&[u8]> {
}
}

named!(
pub(crate) parse_frame<Frame>,
do_parse!(
many0!(eol)
>> command: map!(take_until_and_consume!("\n"), strip_cr)
>> headers: many0!(parse_header)
>> eol
>> body: switch!(value!(get_content_length(&headers)),
Some(v) => map!(take!(v), Some) |
None => map!(take_until!("\x00"), is_empty_slice)
)
>> tag!("\x00")
>> many0!(complete!(eol))
>> (Frame {
command,
headers,
body,
})
)
);
pub(crate) fn parse_frame(input: &[u8]) -> IResult<&[u8], Frame> {
// read stream until header end
many_till(take(1_usize), count(line_ending, 2))(input)?;

fn strip_cr(buf: &[u8]) -> &[u8] {
if let Some(&b'\r') = buf.last() {
&buf[..buf.len() - 1]
} else {
buf
}
let (input, (command, headers)) = tuple((
delimited(opt(complete(line_ending)), alpha1, line_ending), // command
terminated(
many0(parse_header), // header
line_ending,
),
))(input)?;

let (input, body) = match get_content_length(&headers) {
None => take_until("\x00").map(is_empty_slice).parse(input)?,
Some(length) => take(length).map(Some).parse(input)?,
};

let (input, _) = tuple((tag("\x00"), opt(complete(line_ending))))(input)?;

Ok((
input,
Frame {
command,
headers,
body,
},
))
}

fn parse_header(input: &[u8]) -> IResult<&[u8], Header> {
complete(separated_pair(
is_not(":\r\n"),
tag(":"),
terminated(not_line_ending, line_ending).map(Cow::Borrowed),
))
.parse(input)
}

fn fetch_header<'a>(headers: &'a [(&'a [u8], Cow<'a, [u8]>)], key: &'a str) -> Option<String> {
Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! tokio-stomp - A library for asynchronous streaming of STOMP messages

#[macro_use]
extern crate nom;

use custom_debug_derive::CustomDebug;
use frame::Frame;

Expand Down