Skip to content

Commit

Permalink
move to nom7 🥳
Browse files Browse the repository at this point in the history
  • Loading branch information
zuisong committed Dec 12, 2023
1 parent f82fec4 commit a54c6fb
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 61 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ anyhow = "1.0"
futures = "0.3"
tokio = { version = "1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec"] }
nom = "4"
nom = "7"

[dev-dependencies]
tokio = { version = "1", features = ["time", "macros", "rt-multi-thread"] }
8 changes: 7 additions & 1 deletion examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use tokio_stomp_2::*;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let conn = client::connect("127.0.0.1:61613", None, None).await?;
let conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;

tokio::time::sleep(Duration::from_millis(200)).await;

Expand Down
8 changes: 7 additions & 1 deletion examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ use tokio_stomp_2::*;
// `docker run -p 61613:61613 rmohr/activemq:latest`

async fn client(listens: &str, sends: &str, msg: &[u8]) -> Result<(), anyhow::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None).await?;
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;
conn.send(client::subscribe(listens, "myid")).await?;

loop {
Expand Down
11 changes: 8 additions & 3 deletions examples/receive_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ use tokio_stomp_2::FromServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None)
.await
.unwrap();
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();

conn.send(client::subscribe("queue.test", "custom-subscriber-id"))
.await
Expand Down
11 changes: 8 additions & 3 deletions examples/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ use tokio_stomp_2::ToServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None)
.await
.unwrap();
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();

conn.send(
ToServer::Send {
Expand Down
11 changes: 4 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::net::ToSocketAddrs;

use bytes::{Buf, BytesMut};
use futures::prelude::*;
use futures::sink::SinkExt;
Expand All @@ -17,15 +15,14 @@ use anyhow::{anyhow, bail};
/// If successful, returns a tuple of a message stream and a sender,
/// which may be used to receive and send messages respectively.
pub async fn connect(
address: impl Into<String>,
server: impl tokio::net::ToSocketAddrs,
host: impl Into<String>,
login: Option<String>,
passcode: Option<String>,
) -> Result<ClientTransport> {
let address = address.into();
let addr = address.as_str().to_socket_addrs().unwrap().next().unwrap();
let tcp = TcpStream::connect(&addr).await?;
let tcp = TcpStream::connect(server).await?;
let mut transport = ClientCodec.framed(tcp);
client_handshake(&mut transport, address, login, passcode).await?;
client_handshake(&mut transport, host.into(), login, passcode).await?;
Ok(transport)
}

Expand Down
86 changes: 44 additions & 42 deletions src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use anyhow::{anyhow, bail};
use bytes::{BufMut, BytesMut};

use nom::{
bytes::streaming::{is_not, tag, take, take_until},
character::streaming::{alpha1, line_ending, not_line_ending},
combinator::{complete, opt},
multi::{count, many0, many_till},
sequence::{delimited, separated_pair, terminated, tuple},
IResult, Parser,
};

use std::borrow::Cow;

use crate::{AckMode, FromServer, Message, Result, ToServer};
Expand Down Expand Up @@ -75,22 +84,6 @@ impl<'a> Frame<'a> {
}

// Nom definitions

named!(eol, preceded!(opt!(tag!("\r")), tag!("\n")));

named!(
parse_header<(&[u8], Cow<[u8]>)>,
pair!(
take_until_either!(":\n"),
preceded!(
tag!(":"),
map!(take_until_and_consume1!("\n"), |bytes| Cow::Borrowed(
strip_cr(bytes)
))
)
)
);

fn get_content_length(headers: &[(&[u8], Cow<[u8]>)]) -> Option<u32> {
for h in headers {
if h.0 == b"content-length" {
Expand All @@ -110,33 +103,42 @@ fn is_empty_slice(s: &[u8]) -> Option<&[u8]> {
}
}

named!(
pub(crate) parse_frame<Frame>,
do_parse!(
many0!(eol)
>> command: map!(take_until_and_consume!("\n"), strip_cr)
>> headers: many0!(parse_header)
>> eol
>> body: switch!(value!(get_content_length(&headers)),
Some(v) => map!(take!(v), Some) |
None => map!(take_until!("\x00"), is_empty_slice)
)
>> tag!("\x00")
>> many0!(complete!(eol))
>> (Frame {
command,
headers,
body,
})
)
);
pub(crate) fn parse_frame(input: &[u8]) -> IResult<&[u8], Frame> {
// read stream until header end
many_till(take(1_usize), count(line_ending, 2))(input)?;

fn strip_cr(buf: &[u8]) -> &[u8] {
if let Some(&b'\r') = buf.last() {
&buf[..buf.len() - 1]
} else {
buf
}
let (input, (command, headers)) = tuple((
delimited(opt(complete(line_ending)), alpha1, line_ending), // command
terminated(
many0(parse_header), // header
line_ending,
),
))(input)?;

let (input, body) = match get_content_length(&headers) {
None => take_until("\x00").map(is_empty_slice).parse(input)?,
Some(length) => take(length).map(Some).parse(input)?,
};

let (input, _) = tuple((tag("\x00"), opt(complete(line_ending))))(input)?;

Ok((
input,
Frame {
command,
headers,
body,
},
))
}

fn parse_header(input: &[u8]) -> IResult<&[u8], (&[u8], Cow<[u8]>)> {
complete(separated_pair(
is_not(":\r\n"),
tag(":"),
terminated(not_line_ending, line_ending).map(Cow::Borrowed),
))
.parse(input)
}

fn fetch_header<'a>(headers: &'a [(&'a [u8], Cow<'a, [u8]>)], key: &'a str) -> Option<String> {
Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! tokio-stomp - A library for asynchronous streaming of STOMP messages
#[macro_use]
extern crate nom;

use custom_debug_derive::CustomDebug;
use frame::Frame;

Expand Down

0 comments on commit a54c6fb

Please sign in to comment.