Skip to content

Commit f7f2cde

Browse files
author
Stjepan Glavina
authored
Clean up Stream wrapper (#21)
Clean up Stream wrapper
2 parents 431fe43 + 85c6518 commit f7f2cde

File tree

2 files changed

+42
-89
lines changed

2 files changed

+42
-89
lines changed

examples/client.rs

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
14
use async_h1::client;
2-
use async_std::{io, net, task};
5+
use async_std::io::{self, Read, Write};
6+
use async_std::net::{self, TcpStream};
7+
use async_std::task::{self, Context, Poll};
38
use http_types::{Method, Request, Url};
49

510
fn main() -> Result<(), async_h1::Exception> {
611
task::block_on(async {
7-
let tcp_stream = net::TcpStream::connect("127.0.0.1:8080").await?;
8-
println!("connecting to {}", tcp_stream.peer_addr()?);
12+
let stream = net::TcpStream::connect("127.0.0.1:8080").await?;
13+
println!("connecting to {}", stream.peer_addr()?);
14+
15+
// TODO: Delete this line when we implement `Clone` for `TcpStream`.
16+
let stream = Stream(Arc::new(stream));
917

10-
let stream = Stream::new(tcp_stream);
1118
for i in 0usize..2 {
1219
println!("making request {}/2", i + 1);
1320

@@ -23,59 +30,29 @@ fn main() -> Result<(), async_h1::Exception> {
2330
})
2431
}
2532

26-
use async_std::{
27-
io::{Read, Write},
28-
net::TcpStream,
29-
task::{Context, Poll},
30-
};
31-
use std::{
32-
pin::Pin,
33-
sync::{Arc, Mutex},
34-
};
35-
36-
struct Stream {
37-
internal: Arc<Mutex<TcpStream>>,
38-
}
39-
40-
impl Stream {
41-
fn new(internal: TcpStream) -> Self {
42-
Stream {
43-
internal: Arc::new(Mutex::new(internal)),
44-
}
45-
}
46-
}
47-
48-
impl Clone for Stream {
49-
fn clone(&self) -> Self {
50-
Stream {
51-
internal: self.internal.clone(),
52-
}
53-
}
54-
}
33+
#[derive(Clone)]
34+
struct Stream(Arc<TcpStream>);
5535

5636
impl Read for Stream {
5737
fn poll_read(
58-
mut self: Pin<&mut Self>,
38+
self: Pin<&mut Self>,
5939
cx: &mut Context,
6040
buf: &mut [u8],
6141
) -> Poll<io::Result<usize>> {
62-
<TcpStream as Read>::poll_read(Pin::new(&mut self.internal.lock().unwrap()), cx, buf)
42+
Pin::new(&mut &*self.0).poll_read(cx, buf)
6343
}
6444
}
45+
6546
impl Write for Stream {
66-
fn poll_write(
67-
mut self: Pin<&mut Self>,
68-
cx: &mut Context,
69-
buf: &[u8],
70-
) -> Poll<io::Result<usize>> {
71-
<TcpStream as Write>::poll_write(Pin::new(&mut self.internal.lock().unwrap()), cx, buf)
47+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
48+
Pin::new(&mut &*self.0).poll_write(cx, buf)
7249
}
7350

74-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
75-
<TcpStream as Write>::poll_flush(Pin::new(&mut self.internal.lock().unwrap()), cx)
51+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
52+
Pin::new(&mut &*self.0).poll_flush(cx)
7653
}
7754

78-
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
79-
<TcpStream as Write>::poll_close(Pin::new(&mut self.internal.lock().unwrap()), cx)
55+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
56+
Pin::new(&mut &*self.0).poll_close(cx)
8057
}
8158
}

examples/server.rs

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
14
use async_h1::server;
2-
use async_std::net;
5+
use async_std::io::{self, Read, Write};
6+
use async_std::net::{self, TcpStream};
37
use async_std::prelude::*;
4-
use async_std::task;
8+
use async_std::task::{self, Context, Poll};
59
use http_types::{Response, StatusCode};
610

711
fn main() -> Result<(), async_h1::Exception> {
@@ -15,7 +19,9 @@ fn main() -> Result<(), async_h1::Exception> {
1519
let stream = stream?;
1620
println!("starting new connection from {}", stream.peer_addr()?);
1721

18-
let stream = Stream::new(stream);
22+
// TODO: Delete this line when we implement `Clone` for `TcpStream`.
23+
let stream = Stream(Arc::new(stream));
24+
1925
server::connect(stream.clone(), stream, |_| {
2026
async { Ok(Response::new(StatusCode::Ok)) }
2127
})
@@ -26,59 +32,29 @@ fn main() -> Result<(), async_h1::Exception> {
2632
})
2733
}
2834

29-
use async_std::{
30-
io::{self, Read, Write},
31-
net::TcpStream,
32-
task::{Context, Poll},
33-
};
34-
use std::{
35-
pin::Pin,
36-
sync::{Arc, Mutex},
37-
};
38-
39-
struct Stream {
40-
internal: Arc<Mutex<TcpStream>>,
41-
}
42-
43-
impl Stream {
44-
fn new(internal: TcpStream) -> Self {
45-
Stream {
46-
internal: Arc::new(Mutex::new(internal)),
47-
}
48-
}
49-
}
50-
51-
impl Clone for Stream {
52-
fn clone(&self) -> Self {
53-
Stream {
54-
internal: self.internal.clone(),
55-
}
56-
}
57-
}
35+
#[derive(Clone)]
36+
struct Stream(Arc<TcpStream>);
5837

5938
impl Read for Stream {
6039
fn poll_read(
61-
mut self: Pin<&mut Self>,
40+
self: Pin<&mut Self>,
6241
cx: &mut Context,
6342
buf: &mut [u8],
6443
) -> Poll<io::Result<usize>> {
65-
<TcpStream as Read>::poll_read(Pin::new(&mut self.internal.lock().unwrap()), cx, buf)
44+
Pin::new(&mut &*self.0).poll_read(cx, buf)
6645
}
6746
}
47+
6848
impl Write for Stream {
69-
fn poll_write(
70-
mut self: Pin<&mut Self>,
71-
cx: &mut Context,
72-
buf: &[u8],
73-
) -> Poll<io::Result<usize>> {
74-
<TcpStream as Write>::poll_write(Pin::new(&mut self.internal.lock().unwrap()), cx, buf)
49+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
50+
Pin::new(&mut &*self.0).poll_write(cx, buf)
7551
}
7652

77-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
78-
<TcpStream as Write>::poll_flush(Pin::new(&mut self.internal.lock().unwrap()), cx)
53+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
54+
Pin::new(&mut &*self.0).poll_flush(cx)
7955
}
8056

81-
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
82-
<TcpStream as Write>::poll_close(Pin::new(&mut self.internal.lock().unwrap()), cx)
57+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
58+
Pin::new(&mut &*self.0).poll_close(cx)
8359
}
8460
}

0 commit comments

Comments
 (0)