@@ -42,15 +42,17 @@ pub struct InternalListenerHandle {
4242 pub name : String ,
4343 pub connection_sender : mpsc:: UnboundedSender < InternalConnectionPair > ,
4444 listener_ref : Weak < ( ) > ,
45+ buffer_size_kb : Option < u32 > ,
4546}
4647
4748impl InternalListenerHandle {
4849 pub fn new (
4950 name : String ,
5051 connection_sender : mpsc:: UnboundedSender < InternalConnectionPair > ,
5152 listener_ref : Weak < ( ) > ,
53+ buffer_size_kb : Option < u32 > ,
5254 ) -> Self {
53- Self { name, connection_sender, listener_ref }
55+ Self { name, connection_sender, listener_ref, buffer_size_kb }
5456 }
5557
5658 pub fn is_alive ( & self ) -> bool {
@@ -64,7 +66,7 @@ impl InternalListenerHandle {
6466
6567 let metadata = InternalConnectionMetadata {
6668 listener_name : self . name . clone ( ) ,
67- buffer_size_kb : None ,
69+ buffer_size_kb : self . buffer_size_kb ,
6870 created_at : Instant :: now ( ) ,
6971 endpoint_id,
7072 } ;
@@ -185,12 +187,13 @@ impl InternalConnectionFactory {
185187 pub async fn register_listener (
186188 & self ,
187189 name : String ,
190+ buffer_size_kb : Option < u32 > ,
188191 ) -> Result < ( InternalListenerHandle , mpsc:: UnboundedReceiver < InternalConnectionPair > , Arc < ( ) > ) > {
189192 let ( connection_tx, connection_rx) = mpsc:: unbounded_channel ( ) ;
190193 let listener_ref = Arc :: new ( ( ) ) ;
191194 let weak_ref = Arc :: downgrade ( & listener_ref) ;
192195
193- let handle = InternalListenerHandle :: new ( name. clone ( ) , connection_tx, weak_ref) ;
196+ let handle = InternalListenerHandle :: new ( name. clone ( ) , connection_tx, weak_ref, buffer_size_kb ) ;
194197
195198 let mut listeners = self . listeners . write ( ) . await ;
196199
@@ -337,8 +340,14 @@ impl AsyncWrite for InternalStreamWrapper {
337340 }
338341}
339342
343+ const DEFAULT_BUFFER_SIZE : usize = 1024 ;
344+
340345fn create_internal_connection_pair ( metadata : InternalConnectionMetadata ) -> ( Arc < InternalStream > , Arc < InternalStream > ) {
341- let ( upstream_io, downstream_io) = tokio:: io:: duplex ( 1024 ) ;
346+ let buffer_size = metadata. buffer_size_kb
347+ . map ( |kb| ( kb as usize ) * 1024 )
348+ . unwrap_or ( DEFAULT_BUFFER_SIZE ) ;
349+
350+ let ( upstream_io, downstream_io) = tokio:: io:: duplex ( buffer_size) ;
342351
343352 let upstream = Arc :: new ( InternalStream :: new ( metadata. clone ( ) , upstream_io) ) ;
344353 let downstream = Arc :: new ( InternalStream :: new ( metadata, downstream_io) ) ;
@@ -368,22 +377,22 @@ mod tests {
368377 async fn test_listener_registration ( ) {
369378 let factory = InternalConnectionFactory :: new ( ) ;
370379
371- let result = factory. register_listener ( "test_listener" . to_string ( ) ) . await ;
380+ let result = factory. register_listener ( "test_listener" . to_string ( ) , None ) . await ;
372381 assert ! ( result. is_ok( ) ) ;
373382 let ( _handle, _rx, _listener_ref) = result. unwrap ( ) ;
374383
375384 let stats = factory. get_stats ( ) . await ;
376385 assert_eq ! ( stats. active_listeners, 1 ) ;
377386
378- let result = factory. register_listener ( "test_listener" . to_string ( ) ) . await ;
387+ let result = factory. register_listener ( "test_listener" . to_string ( ) , None ) . await ;
379388 assert ! ( result. is_err( ) ) ;
380389 }
381390
382391 #[ tokio:: test]
383392 async fn test_listener_unregistration ( ) {
384393 let factory = InternalConnectionFactory :: new ( ) ;
385394
386- let ( _handle, _rx, _listener_ref) = factory. register_listener ( "test_listener" . to_string ( ) ) . await . unwrap ( ) ;
395+ let ( _handle, _rx, _listener_ref) = factory. register_listener ( "test_listener" . to_string ( ) , None ) . await . unwrap ( ) ;
387396 let result = factory. unregister_listener ( "test_listener" ) . await ;
388397 assert ! ( result. is_ok( ) ) ;
389398
@@ -406,7 +415,7 @@ mod tests {
406415 async fn test_listener_lifecycle ( ) {
407416 let factory = InternalConnectionFactory :: new ( ) ;
408417
409- let ( handle, _rx, _listener_ref) = factory. register_listener ( "test_listener" . to_string ( ) ) . await . unwrap ( ) ;
418+ let ( handle, _rx, _listener_ref) = factory. register_listener ( "test_listener" . to_string ( ) , None ) . await . unwrap ( ) ;
410419
411420 assert ! ( factory. is_listener_active( "test_listener" ) . await ) ;
412421 assert ! ( handle. is_alive( ) ) ;
@@ -423,8 +432,8 @@ mod tests {
423432 let listeners = factory. list_listeners ( ) . await ;
424433 assert ! ( listeners. is_empty( ) ) ;
425434
426- let ( _handle1, _rx1, _listener_ref1) = factory. register_listener ( "listener1" . to_string ( ) ) . await . unwrap ( ) ;
427- let ( _handle2, _rx2, _listener_ref2) = factory. register_listener ( "listener2" . to_string ( ) ) . await . unwrap ( ) ;
435+ let ( _handle1, _rx1, _listener_ref1) = factory. register_listener ( "listener1" . to_string ( ) , None ) . await . unwrap ( ) ;
436+ let ( _handle2, _rx2, _listener_ref2) = factory. register_listener ( "listener2" . to_string ( ) , None ) . await . unwrap ( ) ;
428437
429438 let listeners = factory. list_listeners ( ) . await ;
430439 assert_eq ! ( listeners. len( ) , 2 ) ;
@@ -438,4 +447,27 @@ mod tests {
438447 let stats = factory. get_stats ( ) . await ;
439448 assert_eq ! ( stats. max_pooled_connections, 0 ) ;
440449 }
450+
451+ #[ tokio:: test]
452+ async fn test_buffer_size_configuration ( ) {
453+ let metadata_default = InternalConnectionMetadata {
454+ listener_name : "test" . to_string ( ) ,
455+ buffer_size_kb : None ,
456+ created_at : Instant :: now ( ) ,
457+ endpoint_id : None ,
458+ } ;
459+ let ( upstream, downstream) = create_internal_connection_pair ( metadata_default) ;
460+ assert ! ( upstream. is_active( ) ) ;
461+ assert ! ( downstream. is_active( ) ) ;
462+
463+ let metadata_custom = InternalConnectionMetadata {
464+ listener_name : "test" . to_string ( ) ,
465+ buffer_size_kb : Some ( 4 ) ,
466+ created_at : Instant :: now ( ) ,
467+ endpoint_id : None ,
468+ } ;
469+ let ( upstream_custom, downstream_custom) = create_internal_connection_pair ( metadata_custom) ;
470+ assert ! ( upstream_custom. is_active( ) ) ;
471+ assert ! ( downstream_custom. is_active( ) ) ;
472+ }
441473}
0 commit comments