Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
Proxy protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Dec 13, 2018
1 parent d1c10ca commit 0418a05
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion distributed-fly/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ rusoto_kms = "0.35"
rusoto_credential = "0.14"
base64 = "0.10"
config = "0.9"
sha2 = "0.8"
sha2 = "0.8"
bytes = "*"
79 changes: 21 additions & 58 deletions distributed-fly/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ extern crate lazy_static;
#[macro_use]
extern crate log;

#[macro_use]
extern crate futures;

// use fly::dns_server::DnsServer;

use std::time::Duration;
Expand Down Expand Up @@ -42,6 +45,7 @@ use rusoto_credential::{AwsCredentials, EnvironmentProvider, ProvideAwsCredentia
use tokio_openssl::SslAcceptorExt;

mod cert;
mod proxy;

use r2d2_redis::RedisConnectionManager;

Expand Down Expand Up @@ -76,16 +80,6 @@ fn main() {

release::start_new_release_check();

// let port: u16 = match GLOBAL_SETTINGS.read().unwrap().proxy_dns_port {
// Some(port) => port,
// None => 8053,
// };

// let dns_server = DnsServer::new(
// SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
// &*SELECTOR,
// );

let tls_addr = {
let s = GLOBAL_SETTINGS.read().unwrap();
format!(
Expand All @@ -97,26 +91,14 @@ fn main() {
.parse()
.unwrap();

// let http_listener = tokio::net::TcpListener::bind(&addr).unwrap();
let http_listener = tokio::net::TcpListener::bind(&addr).unwrap();
let tls_listener = tokio::net::TcpListener::bind(&tls_addr).unwrap();

let mut tls_builder =
openssl::ssl::SslAcceptor::mozilla_intermediate(openssl::ssl::SslMethod::tls()).unwrap();

tls_builder.set_servername_callback(move |ssl_ref: &mut openssl::ssl::SslRef, _ssl_alert| {
let name = ssl_ref.servername(openssl::ssl::NameType::HOST_NAME);
println!("GOT NAME: {:?}", name);
println!("version: {:?}", ssl_ref.version_str());
match ssl_ref.current_cipher() {
Some(cipher) => println!(
"current cipher: {}, version: {}, desc: {}",
cipher.name(),
cipher.version(),
cipher.description()
),
None => println!("no cipher"),
};
match name {
match ssl_ref.servername(openssl::ssl::NameType::HOST_NAME) {
None => Err(openssl::ssl::SniError::NOACK),
Some(name) => match cert::get_ctx(name) {
Err(e) => {
Expand All @@ -142,27 +124,6 @@ fn main() {
});
tls_builder.set_cipher_list(CURVES).unwrap();

// let mut f = File::open("certs/default.crt").expect("could not open certificate file");
// let mut rsabuf: Vec<u8> = vec![];
// f.read_to_end(&mut rsabuf).unwrap();

// let mut f = File::open("certs/default.pem").expect("could not open private key file");
// let mut rsapembuf: Vec<u8> = vec![];
// f.read_to_end(&mut rsapembuf).unwrap();

// let cert_builder = openssl::x509::X509::builder()

// cert_builder.

// let certs = openssl::x509::X509::stack_from_pem(rsabuf.as_slice()).unwrap();
// println!("certs count: {}", certs.len());
// println!(
// "cert pem: {}",
// String::from_utf8(certs[0].to_pem().unwrap()).unwrap()
// );

// let pk = openssl::pkey::PKey::private_key_from_pem(rsabuf.as_slice()).unwrap();

let certs_path = {
match GLOBAL_SETTINGS.read().unwrap().certs_path {
Some(ref cp) => cp.clone(),
Expand Down Expand Up @@ -194,16 +155,15 @@ fn main() {
openssl::ssl::SslFiletype::PEM,
)
.unwrap();
// tls_builder.set_certificate(&certs[0]).unwrap();

let tls_acceptor = tls_builder.build();

let tls_stream = tls_listener
.incoming()
.map_err(|e| error!("error accepting conn: {}", e))
.and_then(|stream| proxy::ProxyTcpStream::peek(stream))
.map_err(|e| error!("error in stream: {}", e))
.for_each(move |stream| {
println!("got a conn.");
let remote_addr = stream.peer_addr().unwrap_or("0.0.0.0:0".parse().unwrap());
let remote_addr = stream.peer_addr().unwrap();
tokio::spawn(
tls_acceptor
.accept_async(stream)
Expand All @@ -220,6 +180,15 @@ fn main() {
Ok(())
});

let make_svc = make_service_fn(|conn: &proxy::ProxyTcpStream| {
let remote_addr = conn.peer_addr().unwrap_or("0.0.0.0:0".parse().unwrap());
service_fn(move |req| serve_http(false, req, &*SELECTOR, remote_addr))
});

let http_stream = http_listener
.incoming()
.and_then(|stream| proxy::ProxyTcpStream::peek(stream));

tokio::run(future::lazy(move || {
tokio::spawn(
Interval::new_interval(Duration::from_secs(30))
Expand All @@ -239,15 +208,9 @@ fn main() {

tokio::spawn(tls_stream);

let server = Server::bind(&addr)
.serve(make_service_fn(|conn: &AddrStream| {
let remote_addr = conn.remote_addr();
service_fn(move |req| serve_http(false, req, &*SELECTOR, remote_addr))
}))
.map_err(|e| eprintln!("server error: {}", e));

info!("Listening on http://{}", addr);

server
Server::builder(http_stream)
.serve(make_svc)
.map_err(|e| error!("error in hyper server: {}", e))
}));
}
86 changes: 86 additions & 0 deletions distributed-fly/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use bytes::{Buf, BufMut};
use futures::{future, Async, Future, Poll};
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;

use std::io::BufRead;

#[derive(Debug)]
pub struct ProxyTcpStream {
stream: TcpStream,
remote_addr: SocketAddr,
}

impl ProxyTcpStream {
pub fn peek(stream: TcpStream) -> impl Future<Item = Self, Error = io::Error> {
let mut bytes = [0; 107];
let mut stream = Some(stream);
future::poll_fn(move || {
let n = try_ready!(stream.as_mut().unwrap().poll_peek(&mut bytes));
// TODO: check bytes[..n] for PROXY line
let mut stream = stream.take().unwrap();
let mut remote_addr: SocketAddr = stream.peer_addr().unwrap();
let mut s = String::new();
match bytes.as_ref().read_line(&mut s) {
Ok(ln) => {
if s.starts_with("PROXY TCP") {
// read that line for real (no peeking!)
let mut v = vec![0; ln];
stream.read_exact(&mut v).unwrap();
let mut split = s.split(" ").skip(2);
let ip = split.next().unwrap();
let port = split.skip(1).next().unwrap();
remote_addr = format!("{}:{}", ip, port).parse().unwrap();
debug!("using proxy proto, remote addr: {}", remote_addr);
}
}
Err(e) => error!("error reading PROXY line: {}", e),
};
Ok(Async::Ready(ProxyTcpStream {
stream,
remote_addr,
}))
})
}

pub fn peer_addr(&self) -> io::Result<SocketAddr> {
Ok(self.remote_addr.clone())
}
}

impl Read for ProxyTcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}

impl Write for ProxyTcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stream.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}

impl AsyncRead for ProxyTcpStream {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.stream.prepare_uninitialized_buffer(buf)
}

fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&TcpStream>::read_buf(&mut &self.stream, buf)
}
}

impl AsyncWrite for ProxyTcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&TcpStream>::shutdown(&mut &self.stream)
}

fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&TcpStream>::write_buf(&mut &self.stream, buf)
}
}

0 comments on commit 0418a05

Please sign in to comment.