Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DTLS encryption over UDP support to server #57

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde = { version= "1.0.88", features= [ "derive" ] }
url = "1.7.2"
num-derive = "0.2.4"
num-traits = "0.2.6"
openssl = "0.10"
log = "0.4.6"
regex = "1.1.0"
tokio = {version = "0.2", features = ["full"]}
Expand Down
100 changes: 94 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
net::{self, SocketAddr, ToSocketAddrs},
task::Context,
future::Future,
time::Duration,
};
use log::{debug, error};
use futures::{SinkExt, Stream, StreamExt, select, stream::FusedStream, task::Poll};
Expand All @@ -13,6 +14,11 @@ use tokio::{
net::UdpSocket
};
use tokio_util::udp::{UdpFramed};
use openssl::{
error::ErrorStack,
sha::Sha256,
ssl::{self, HandshakeError, Ssl, SslContextBuilder, SslMethod, SslStream, SslVerifyMode}
};

use super::message::{
packet::Packet,
Expand Down Expand Up @@ -44,6 +50,12 @@ pub enum Message {
Received(Packet, SocketAddr),
}

pub enum Encryption {
DTLS,
TLS,
SSL,
}

pub struct Server<'a, HandlerRet> where HandlerRet: Future<Output=Option<CoAPResponse>> {
server: CoAPServer,
observer: Observer,
Expand All @@ -52,10 +64,10 @@ pub struct Server<'a, HandlerRet> where HandlerRet: Future<Output=Option<CoAPRes

impl<'a, HandlerRet> Server<'a, HandlerRet> where HandlerRet: Future<Output=Option<CoAPResponse>> {
/// Creates a CoAP server listening on the given address.
pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Server<'a, HandlerRet>, io::Error> {
pub fn new<A: ToSocketAddrs + Copy>(addr: A, method: Option<Encryption>) -> Result<Server<'a, HandlerRet>, io::Error> {
let (tx, rx) = mpsc::unbounded_channel();
Ok(Server {
server: CoAPServer::new(addr, rx)?,
server: CoAPServer::new(addr, rx, method.unwrap())?,
observer: Observer::new(tx),
handler: None,
})
Expand Down Expand Up @@ -117,21 +129,89 @@ impl<'a, HandlerRet> Server<'a, HandlerRet> where HandlerRet: Future<Output=Opti
}
}

#[derive(Debug)]
struct UdpStream {
socket: std::net::UdpSocket,
connected: bool,
}

impl std::io::Read for UdpStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.connected {
self.socket.recv(buf)
} else {
match self.socket.recv_from(buf) {
Ok((bytes, addr)) => {
self.connect(addr)?;
Ok(bytes)
}
Err(e) => Err(e),
}
}
}

}

impl std::io::Write for UdpStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.socket.send(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

impl UdpStream {
/// Binds to the given address and sets timeouts to reasonable values.
pub fn new<A: ToSocketAddrs>(addr: A) -> std::io::Result<UdpStream> {
let socket = std::net::UdpSocket::bind(addr).unwrap();
let duration = Duration::from_millis(300);
socket.set_read_timeout(Some(duration))?;
socket.set_write_timeout(Some(duration))?;
Ok(UdpStream {
socket: socket,
connected: false,
})
}

/// Connects the UDP socket to the given address.
pub fn connect<A: ToSocketAddrs + Copy>(&mut self, addr: A) -> std::io::Result<()> {
self.socket.connect(addr)?;
self.connected = true;
Ok(())
}
}

pub struct CoAPServer {
receiver: MessageReceiver,
is_terminated: bool,
socket: UdpFramed<Codec>,
ssl_stream: Option<Result<SslStream<UdpStream>, HandshakeError<UdpStream>>>,
}

impl CoAPServer {
/// Creates a CoAP server listening on the given address.
pub fn new<A: ToSocketAddrs>(addr: A, receiver: MessageReceiver) -> Result<CoAPServer, io::Error> {
pub fn new<A: ToSocketAddrs + Copy>(addr: A, receiver: MessageReceiver, method: Encryption) -> Result<CoAPServer, io::Error> {
let socket = UdpSocket::from_std(net::UdpSocket::bind(addr).unwrap())?;

Ok(CoAPServer {
receiver,
is_terminated: false,
socket: UdpFramed::new(socket, Codec::new()),
ssl_stream: match method {
Encryption::DTLS => {
let mut context_builder = SslContextBuilder::new(SslMethod::dtls()).unwrap();
context_builder.set_verify(SslVerifyMode::PEER | SslVerifyMode::FAIL_IF_NO_PEER_CERT);
let ssl_context = context_builder.build();
let ssl = Ssl::new(&ssl_context).unwrap();
let socket = UdpStream::new(addr).unwrap();
match ssl.accept(socket) {
Ok(stream) => Some(Ok(stream)),
Err(e) => Some(Err(e))
}
}
Encryption::TLS => None,
Encryption::SSL => None
},
})
}

Expand Down Expand Up @@ -195,12 +275,12 @@ pub mod test {
use super::super::*;
use super::*;

pub fn spawn_server<F: FnMut(CoAPRequest) -> HandlerRet + Send + 'static, HandlerRet>(request_handler: F) -> mpsc::Receiver<u16> where HandlerRet: Future<Output=Option<CoAPResponse>> {
fn spawn_server<F: FnMut(CoAPRequest) -> HandlerRet + Send + 'static, HandlerRet>(request_handler: F, method: Encryption) -> mpsc::Receiver<u16> where HandlerRet: Future<Output=Option<CoAPResponse>> {
let (tx, rx) = mpsc::channel();

std::thread::Builder::new().name(String::from("server")).spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async move {
let mut server = server::Server::new("127.0.0.1:0").unwrap();
let mut server = server::Server::new("127.0.0.1:0", Some(method)).unwrap();

tx.send(server.socket_addr().unwrap().port()).unwrap();

Expand All @@ -210,6 +290,14 @@ pub mod test {

rx
}

pub fn spawn_udp(request_handler: F) {
spawn_server(request_handler, Encryption::DTLS);
}

pub fn spawn_dtls_over_udp(request_handler: F) {
spawn_server(request_handler, None);
}

async fn request_handler(req: CoAPRequest) -> Option<CoAPResponse> {
let uri_path_list = req.get_option(CoAPOption::UriPath).unwrap().clone();
Expand Down