@@ -31,32 +31,37 @@ impl PayloadQueuer for OutboundQueue {
3131}
3232
3333impl SocketDescriptorFlusher for OutboundQueue {
34- fn try_flush_one ( & mut self , descriptor : & mut impl SocketDescriptor ) -> bool {
34+ fn try_flush ( & mut self , descriptor : & mut impl SocketDescriptor ) -> bool {
3535 // Exit early if a previous full write failed and haven't heard that there may be more
3636 // room available
3737 if self . blocked {
3838 return false ;
3939 }
4040
41- let full_write_succeeded = match self . buffer . front ( ) {
42- None => true ,
43- Some ( next_buff) => {
44- let should_be_reading = self . buffer . len ( ) < self . soft_limit ;
45- let pending = & next_buff[ self . buffer_first_msg_offset ..] ;
46- let data_sent = descriptor. send_data ( pending, should_be_reading) ;
47- self . buffer_first_msg_offset += data_sent;
48- self . buffer_first_msg_offset == next_buff. len ( )
41+ loop {
42+ if self . buffer . is_empty ( ) {
43+ return true ;
4944 }
50- } ;
5145
52- if full_write_succeeded {
53- self . buffer_first_msg_offset = 0 ;
54- self . buffer . pop_front ( ) ;
55- } else {
56- self . blocked = true ;
46+ let full_write_succeeded = match self . buffer . front ( ) {
47+ None => true ,
48+ Some ( next_buff) => {
49+ let should_be_reading = self . buffer . len ( ) < self . soft_limit ;
50+ let pending = & next_buff[ self . buffer_first_msg_offset ..] ;
51+ let data_sent = descriptor. send_data ( pending, should_be_reading) ;
52+ self . buffer_first_msg_offset += data_sent;
53+ self . buffer_first_msg_offset == next_buff. len ( )
54+ }
55+ } ;
56+
57+ if full_write_succeeded {
58+ self . buffer_first_msg_offset = 0 ;
59+ self . buffer . pop_front ( ) ;
60+ } else {
61+ self . blocked = true ;
62+ return false ;
63+ }
5764 }
58-
59- full_write_succeeded
6065 }
6166
6267 fn unblock ( & mut self ) {
@@ -87,40 +92,40 @@ mod tests {
8792 use super :: * ;
8893 use ln:: peers:: test_util:: * ;
8994
90- // Test that a try_flush_one () call with no queued data doesn't write anything
95+ // Test that a try_flush () call with no queued data doesn't write anything
9196 #[ test]
9297 fn empty_does_not_write ( ) {
9398 let mut descriptor = SocketDescriptorMock :: new ( ) ;
9499 let mut empty = OutboundQueue :: new ( 10 ) ;
95100
96- assert ! ( empty. try_flush_one ( & mut descriptor) ) ;
101+ assert ! ( empty. try_flush ( & mut descriptor) ) ;
97102 descriptor. assert_called_with ( vec ! [ ] ) ;
98103
99104 }
100105
101- // Test that try_flush_one () sends the push_back
106+ // Test that try_flush () sends the push_back
102107 #[ test]
103108 fn push_back_drain ( ) {
104109 let mut descriptor = SocketDescriptorMock :: new ( ) ;
105110 let mut queue = OutboundQueue :: new ( 10 ) ;
106111
107112 queue. push_back ( vec ! [ 1 ] ) ;
108- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
113+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
109114
110115 descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , true ) ] ) ;
111116 }
112117
113- // Test that try_flush_one () sends just first push_back
118+ // Test that try_flush () sends all
114119 #[ test]
115120 fn push_back_push_back_drain_drain ( ) {
116121 let mut descriptor = SocketDescriptorMock :: new ( ) ;
117122 let mut queue = OutboundQueue :: new ( 10 ) ;
118123
119124 queue. push_back ( vec ! [ 1 ] ) ;
120125 queue. push_back ( vec ! [ 2 ] ) ;
121- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
126+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
122127
123- descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , true ) ] ) ;
128+ descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , true ) , ( vec! [ 2 ] , true ) ] ) ;
124129 }
125130
126131 // Test that descriptor that can't write all bytes returns valid response
@@ -130,27 +135,40 @@ mod tests {
130135 let mut queue = OutboundQueue :: new ( 10 ) ;
131136
132137 queue. push_back ( vec ! [ 1 , 2 , 3 ] ) ;
133- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
138+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
134139
135140 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 , 3 ] , true ) ] ) ;
136141 }
137142
143+ // Test that descriptor that can't write all bytes (in second pushed item) returns valid response
144+ #[ test]
145+ fn push_back_drain_partial_multiple_push ( ) {
146+ let mut descriptor = SocketDescriptorMock :: with_fixed_size ( 2 ) ;
147+ let mut queue = OutboundQueue :: new ( 10 ) ;
148+
149+ queue. push_back ( vec ! [ 1 ] ) ;
150+ queue. push_back ( vec ! [ 2 , 3 ] ) ;
151+ assert ! ( !queue. try_flush( & mut descriptor) ) ;
152+
153+ descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , true ) , ( vec![ 2 , 3 ] , true ) ] ) ;
154+ }
155+
138156 // Test the bookkeeping for multiple partial writes
139157 #[ test]
140- fn push_back_drain_partial_drain_partial_try_flush_one ( ) {
158+ fn push_back_drain_partial_drain_partial_try_flush ( ) {
141159 let mut descriptor = SocketDescriptorMock :: with_fixed_size ( 1 ) ;
142160 let mut queue = OutboundQueue :: new ( 10 ) ;
143161
144162 queue. push_back ( vec ! [ 1 , 2 , 3 ] ) ;
145- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
163+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
146164
147165 descriptor. make_room ( 1 ) ;
148166 queue. unblock ( ) ;
149- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
167+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
150168
151169 descriptor. make_room ( 1 ) ;
152170 queue. unblock ( ) ;
153- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
171+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
154172
155173 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 , 3 ] , true ) , ( vec![ 2 , 3 ] , true ) , ( vec![ 3 ] , true ) ] ) ;
156174 }
@@ -162,27 +180,27 @@ mod tests {
162180
163181 // Fail write and move to blocked state
164182 queue. push_back ( vec ! [ 1 , 2 ] ) ;
165- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
183+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
166184 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 ] , true ) ] ) ;
167185
168186 // Make room but don't signal
169187 descriptor. make_room ( 1 ) ;
170- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
188+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
171189 assert ! ( queue. is_blocked( ) ) ;
172190 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 ] , true ) ] ) ;
173191
174192 // Unblock and try again
175193 queue. unblock ( ) ;
176194
177195 // Partial write will succeed, but still move to blocked
178- assert ! ( !queue. try_flush_one ( & mut descriptor) ) ;
196+ assert ! ( !queue. try_flush ( & mut descriptor) ) ;
179197 assert ! ( queue. is_blocked( ) ) ;
180198 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 ] , true ) , ( vec![ 1 , 2 ] , true ) ] ) ;
181199
182200 // Make room and signal which will succeed in writing the final piece
183201 descriptor. make_room ( 1 ) ;
184202 queue. unblock ( ) ;
185- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
203+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
186204 assert ! ( !queue. is_blocked( ) ) ;
187205 descriptor. assert_called_with ( vec ! [ ( vec![ 1 , 2 ] , true ) , ( vec![ 1 , 2 ] , true ) , ( vec![ 2 ] , true ) ] ) ;
188206 }
@@ -194,7 +212,7 @@ mod tests {
194212 let mut queue = OutboundQueue :: new ( 1 ) ;
195213
196214 queue. push_back ( vec ! [ 1 ] ) ;
197- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
215+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
198216 descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , false ) ] ) ;
199217 }
200218
@@ -207,9 +225,7 @@ mod tests {
207225 queue. push_back ( vec ! [ 1 ] ) ;
208226 queue. push_back ( vec ! [ 2 ] ) ;
209227 queue. push_back ( vec ! [ 3 ] ) ;
210- assert ! ( queue. try_flush_one( & mut descriptor) ) ;
211- assert ! ( queue. try_flush_one( & mut descriptor) ) ;
212- assert ! ( queue. try_flush_one( & mut descriptor) ) ;
228+ assert ! ( queue. try_flush( & mut descriptor) ) ;
213229 descriptor. assert_called_with ( vec ! [ ( vec![ 1 ] , false ) , ( vec![ 2 ] , false ) , ( vec![ 3 ] , true ) ] ) ;
214230 }
215231
@@ -223,7 +239,7 @@ mod tests {
223239 queue. push_back ( vec ! [ 1 ] ) ;
224240 assert ! ( !queue. is_empty( ) ) ;
225241
226- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
242+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
227243 assert ! ( queue. is_empty( ) ) ;
228244 }
229245
@@ -244,12 +260,8 @@ mod tests {
244260 queue. push_back ( vec ! [ 2 ] ) ;
245261 assert_eq ! ( queue. queue_space( ) , 0 ) ;
246262
247- // at soft limit
248- assert ! ( queue. try_flush_one( & mut descriptor) ) ;
249- assert_eq ! ( queue. queue_space( ) , 0 ) ;
250-
251263 // below soft limt
252- assert ! ( queue. try_flush_one ( & mut descriptor) ) ;
264+ assert ! ( queue. try_flush ( & mut descriptor) ) ;
253265 assert_eq ! ( queue. queue_space( ) , 1 ) ;
254266 }
255267}
0 commit comments