@@ -20,13 +20,18 @@ use std::ops::DerefMut;
20
20
use std:: pin:: Pin ;
21
21
use std:: task:: { Context , Poll } ;
22
22
use tokio:: io:: AsyncRead ;
23
+ use tokio:: stream:: Stream ;
24
+ use tokio:: time:: { delay_for, Delay , Duration , Instant } ;
25
+
26
+ const MAX_READ_AMOUNT : usize = 500 * 1000 ; // 0.5MB
27
+ const GRACE_DURATION : Duration = Duration :: from_millis ( 1 ) ;
23
28
24
29
pub struct AvailableReader < ' a , R : AsyncRead + Unpin > {
25
- // TODO: come up with a way to avoid using RefCell (not sure if possible though)
30
+ // TODO: come up with a way to avoid using RefCell (not sure if possible though due to having to
31
+ // mutably borrow both inner reader and buffer at the same time)
26
32
buf : RefCell < BytesMut > ,
27
33
inner : RefCell < & ' a mut R > ,
28
- // idea for the future: tiny delay that allows to prevent unnecessary extra fragmentation
29
- // grace_period: Option<Delay>,
34
+ grace_period : Option < Delay > ,
30
35
}
31
36
32
37
impl < ' a , R > AvailableReader < ' a , R >
@@ -39,20 +44,15 @@ where
39
44
AvailableReader {
40
45
buf : RefCell :: new ( BytesMut :: with_capacity ( Self :: BUF_INCREMENT ) ) ,
41
46
inner : RefCell :: new ( reader) ,
42
- // grace_period: None ,
47
+ grace_period : Some ( delay_for ( GRACE_DURATION ) ) ,
43
48
}
44
49
}
45
50
}
46
51
47
- // TODO: change this guy to a stream? Seems waaay more appropriate considering
48
- // we're getting new Bytes items regularly rather than calling it once.
49
-
50
- impl < ' a , R : AsyncRead + Unpin > Future for AvailableReader < ' a , R > {
51
- type Output = io:: Result < ( Bytes , bool ) > ;
52
+ impl < ' a , R : AsyncRead + Unpin > Stream for AvailableReader < ' a , R > {
53
+ type Item = io:: Result < Bytes > ;
52
54
53
- // this SHOULD stay mutable, because we rely on runtime checks inside the method
54
- #[ allow( unused_mut) ]
55
- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
55
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
56
56
// if we have no space in buffer left - expand it
57
57
if !self . buf . borrow ( ) . has_remaining_mut ( ) {
58
58
self . buf . borrow_mut ( ) . reserve ( Self :: BUF_INCREMENT ) ;
@@ -68,19 +68,43 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> {
68
68
if self . buf . borrow ( ) . is_empty ( ) {
69
69
Poll :: Pending
70
70
} else {
71
+ // if exists - check grace period
72
+ if let Some ( grace_period) = self . grace_period . as_mut ( ) {
73
+ if Pin :: new ( grace_period) . poll ( cx) . is_pending ( ) {
74
+ return Poll :: Pending ;
75
+ }
76
+ }
77
+
71
78
let buf = self . buf . replace ( BytesMut :: new ( ) ) ;
72
- Poll :: Ready ( Ok ( ( buf. freeze ( ) , false ) ) )
79
+ Poll :: Ready ( Some ( Ok ( buf. freeze ( ) ) ) )
73
80
}
74
81
}
75
- Poll :: Ready ( Err ( err) ) => Poll :: Ready ( Err ( err) ) ,
82
+ Poll :: Ready ( Err ( err) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
76
83
Poll :: Ready ( Ok ( n) ) => {
84
+ // if exists - reset grace period
85
+ if let Some ( grace_period) = self . grace_period . as_mut ( ) {
86
+ let now = Instant :: now ( ) ;
87
+ grace_period. reset ( now + GRACE_DURATION ) ;
88
+ }
89
+
77
90
// if we read a non-0 amount, we're not done yet!
78
91
if n == 0 {
79
92
let buf = self . buf . replace ( BytesMut :: new ( ) ) ;
80
- Poll :: Ready ( Ok ( ( buf. freeze ( ) , true ) ) )
93
+ if buf. len ( ) > 0 {
94
+ Poll :: Ready ( Some ( Ok ( buf. freeze ( ) ) ) )
95
+ } else {
96
+ Poll :: Ready ( None )
97
+ }
81
98
} else {
82
99
// tell the waker we should be polled again!
83
100
cx. waker ( ) . wake_by_ref ( ) ;
101
+
102
+ // if we reached our maximum amount - return it
103
+ let read_bytes_len = self . buf . borrow ( ) . len ( ) ;
104
+ if read_bytes_len >= MAX_READ_AMOUNT {
105
+ let buf = self . buf . replace ( BytesMut :: new ( ) ) ;
106
+ return Poll :: Ready ( Some ( Ok ( buf. freeze ( ) ) ) ) ;
107
+ }
84
108
Poll :: Pending
85
109
}
86
110
}
@@ -91,31 +115,34 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> {
91
115
#[ cfg( test) ]
92
116
mod tests {
93
117
use super :: * ;
118
+ use futures:: poll;
94
119
use std:: io:: Cursor ;
95
120
use std:: time:: Duration ;
121
+ use tokio:: stream:: StreamExt ;
122
+ use tokio_test:: assert_pending;
96
123
97
124
#[ tokio:: test]
98
125
async fn available_reader_reads_all_available_data_smaller_than_its_buf ( ) {
99
126
let data = vec ! [ 42u8 ; 100 ] ;
100
127
let mut reader = Cursor :: new ( data. clone ( ) ) ;
101
128
102
- let available_reader = AvailableReader :: new ( & mut reader) ;
103
- let ( read_data, is_finished ) = available_reader. await . unwrap ( ) ;
129
+ let mut available_reader = AvailableReader :: new ( & mut reader) ;
130
+ let read_data = available_reader. next ( ) . await . unwrap ( ) . unwrap ( ) ;
104
131
105
132
assert_eq ! ( read_data, data) ;
106
- assert ! ( is_finished )
133
+ assert ! ( available_reader . next ( ) . await . is_none ( ) )
107
134
}
108
135
109
136
#[ tokio:: test]
110
137
async fn available_reader_reads_all_available_data_bigger_than_its_buf ( ) {
111
138
let data = vec ! [ 42u8 ; AvailableReader :: <Cursor <Vec <u8 >>>:: BUF_INCREMENT + 100 ] ;
112
139
let mut reader = Cursor :: new ( data. clone ( ) ) ;
113
140
114
- let available_reader = AvailableReader :: new ( & mut reader) ;
115
- let ( read_data, is_finished ) = available_reader. await . unwrap ( ) ;
141
+ let mut available_reader = AvailableReader :: new ( & mut reader) ;
142
+ let read_data = available_reader. next ( ) . await . unwrap ( ) . unwrap ( ) ;
116
143
117
144
assert_eq ! ( read_data, data) ;
118
- assert ! ( is_finished )
145
+ assert ! ( available_reader . next ( ) . await . is_none ( ) )
119
146
}
120
147
121
148
#[ tokio:: test]
@@ -129,11 +156,11 @@ mod tests {
129
156
. read ( & second_data_chunk)
130
157
. build ( ) ;
131
158
132
- let available_reader = AvailableReader :: new ( & mut reader_mock) ;
133
- let ( read_data, is_finished ) = available_reader. await . unwrap ( ) ;
159
+ let mut available_reader = AvailableReader :: new ( & mut reader_mock) ;
160
+ let read_data = available_reader. next ( ) . await . unwrap ( ) . unwrap ( ) ;
134
161
135
162
assert_eq ! ( read_data, first_data_chunk) ;
136
- assert ! ( !is_finished )
163
+ assert_pending ! ( poll! ( available_reader . next ( ) ) ) ;
137
164
}
138
165
139
166
#[ tokio:: test]
@@ -145,10 +172,40 @@ mod tests {
145
172
. read ( & data)
146
173
. build ( ) ;
147
174
148
- let available_reader = AvailableReader :: new ( & mut reader_mock) ;
149
- let ( read_data, is_finished ) = available_reader. await . unwrap ( ) ;
175
+ let mut available_reader = AvailableReader :: new ( & mut reader_mock) ;
176
+ let read_data = available_reader. next ( ) . await . unwrap ( ) . unwrap ( ) ;
150
177
151
178
assert_eq ! ( read_data, data) ;
152
- assert ! ( is_finished )
179
+ assert ! ( available_reader . next ( ) . await . is_none ( ) )
153
180
}
181
+
182
+ // perhaps the issue of tokio io builder will be resolved in tokio 0.3?
183
+ // #[tokio::test]
184
+ // async fn available_reader_will_wait_for_more_data_if_its_within_grace_period() {
185
+ // let first_data_chunk = vec![42u8; 100];
186
+ // let second_data_chunk = vec![123u8; 100];
187
+ //
188
+ // let combined_chunks: Vec<_> = first_data_chunk
189
+ // .iter()
190
+ // .cloned()
191
+ // .chain(second_data_chunk.iter().cloned())
192
+ // .collect();
193
+ //
194
+ // let mut reader_mock = tokio_test::io::Builder::new()
195
+ // .read(&first_data_chunk)
196
+ // .wait(Duration::from_millis(2))
197
+ // .read(&second_data_chunk)
198
+ // .build();
199
+ //
200
+ // let mut available_reader = AvailableReader {
201
+ // buf: RefCell::new(BytesMut::with_capacity(4096)),
202
+ // inner: RefCell::new(&mut reader_mock),
203
+ // grace_period: Some(delay_for(Duration::from_millis(5))),
204
+ // };
205
+ //
206
+ // let read_data = available_reader.next().await.unwrap().unwrap();
207
+ //
208
+ // assert_eq!(read_data, combined_chunks);
209
+ // assert!(available_reader.next().await.is_none())
210
+ // }
154
211
}
0 commit comments