Skip to content

Commit 33e8c58

Browse files
committed
Address Copilot feedback: remove dead code, make buffers configurable, add TODOs
Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent b6abbbf commit 33e8c58

File tree

4 files changed

+101
-33
lines changed

4 files changed

+101
-33
lines changed

orion-internal/src/connection.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ pub struct InternalListenerHandle {
4242
pub name: String,
4343
pub connection_sender: mpsc::UnboundedSender<InternalConnectionPair>,
4444
listener_ref: Weak<()>,
45+
buffer_size_kb: Option<u32>,
4546
}
4647

4748
impl InternalListenerHandle {
4849
pub fn new(
4950
name: String,
5051
connection_sender: mpsc::UnboundedSender<InternalConnectionPair>,
5152
listener_ref: Weak<()>,
53+
buffer_size_kb: Option<u32>,
5254
) -> Self {
53-
Self { name, connection_sender, listener_ref }
55+
Self { name, connection_sender, listener_ref, buffer_size_kb }
5456
}
5557

5658
pub fn is_alive(&self) -> bool {
@@ -64,7 +66,7 @@ impl InternalListenerHandle {
6466

6567
let metadata = InternalConnectionMetadata {
6668
listener_name: self.name.clone(),
67-
buffer_size_kb: None,
69+
buffer_size_kb: self.buffer_size_kb,
6870
created_at: Instant::now(),
6971
endpoint_id,
7072
};
@@ -185,12 +187,13 @@ impl InternalConnectionFactory {
185187
pub async fn register_listener(
186188
&self,
187189
name: String,
190+
buffer_size_kb: Option<u32>,
188191
) -> Result<(InternalListenerHandle, mpsc::UnboundedReceiver<InternalConnectionPair>, Arc<()>)> {
189192
let (connection_tx, connection_rx) = mpsc::unbounded_channel();
190193
let listener_ref = Arc::new(());
191194
let weak_ref = Arc::downgrade(&listener_ref);
192195

193-
let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref);
196+
let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref, buffer_size_kb);
194197

195198
let mut listeners = self.listeners.write().await;
196199

@@ -337,8 +340,12 @@ impl AsyncWrite for InternalStreamWrapper {
337340
}
338341
}
339342

343+
const DEFAULT_BUFFER_SIZE: usize = 8192;
344+
340345
fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc<InternalStream>, Arc<InternalStream>) {
341-
let (upstream_io, downstream_io) = tokio::io::duplex(1024);
346+
let buffer_size = metadata.buffer_size_kb.map(|kb| (kb as usize) * 1024).unwrap_or(DEFAULT_BUFFER_SIZE);
347+
348+
let (upstream_io, downstream_io) = tokio::io::duplex(buffer_size);
342349

343350
let upstream = Arc::new(InternalStream::new(metadata.clone(), upstream_io));
344351
let downstream = Arc::new(InternalStream::new(metadata, downstream_io));
@@ -368,22 +375,22 @@ mod tests {
368375
async fn test_listener_registration() {
369376
let factory = InternalConnectionFactory::new();
370377

371-
let result = factory.register_listener("test_listener".to_string()).await;
378+
let result = factory.register_listener("test_listener".to_string(), None).await;
372379
assert!(result.is_ok());
373380
let (_handle, _rx, _listener_ref) = result.unwrap();
374381

375382
let stats = factory.get_stats().await;
376383
assert_eq!(stats.active_listeners, 1);
377384

378-
let result = factory.register_listener("test_listener".to_string()).await;
385+
let result = factory.register_listener("test_listener".to_string(), None).await;
379386
assert!(result.is_err());
380387
}
381388

382389
#[tokio::test]
383390
async fn test_listener_unregistration() {
384391
let factory = InternalConnectionFactory::new();
385392

386-
let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap();
393+
let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap();
387394
let result = factory.unregister_listener("test_listener").await;
388395
assert!(result.is_ok());
389396

@@ -406,7 +413,7 @@ mod tests {
406413
async fn test_listener_lifecycle() {
407414
let factory = InternalConnectionFactory::new();
408415

409-
let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap();
416+
let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap();
410417

411418
assert!(factory.is_listener_active("test_listener").await);
412419
assert!(handle.is_alive());
@@ -423,8 +430,8 @@ mod tests {
423430
let listeners = factory.list_listeners().await;
424431
assert!(listeners.is_empty());
425432

426-
let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string()).await.unwrap();
427-
let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string()).await.unwrap();
433+
let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string(), None).await.unwrap();
434+
let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string(), None).await.unwrap();
428435

429436
let listeners = factory.list_listeners().await;
430437
assert_eq!(listeners.len(), 2);
@@ -438,4 +445,27 @@ mod tests {
438445
let stats = factory.get_stats().await;
439446
assert_eq!(stats.max_pooled_connections, 0);
440447
}
448+
449+
#[tokio::test]
450+
async fn test_buffer_size_configuration() {
451+
let metadata_default = InternalConnectionMetadata {
452+
listener_name: "test".to_string(),
453+
buffer_size_kb: None,
454+
created_at: Instant::now(),
455+
endpoint_id: None,
456+
};
457+
let (upstream, downstream) = create_internal_connection_pair(metadata_default);
458+
assert!(upstream.is_active());
459+
assert!(downstream.is_active());
460+
461+
let metadata_custom = InternalConnectionMetadata {
462+
listener_name: "test".to_string(),
463+
buffer_size_kb: Some(4),
464+
created_at: Instant::now(),
465+
endpoint_id: None,
466+
};
467+
let (upstream_custom, downstream_custom) = create_internal_connection_pair(metadata_custom);
468+
assert!(upstream_custom.is_active());
469+
assert!(downstream_custom.is_active());
470+
}
441471
}

orion-internal/tests/integration_tests.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async fn test_complete_internal_connection_flow() {
2323

2424
let listener_name = "test_integration_listener_global";
2525
let (_handle, mut connection_receiver, _listener_ref) =
26-
factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener");
26+
factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener");
2727

2828
assert!(factory.is_listener_active(listener_name).await);
2929
let listeners = factory.list_listeners().await;
@@ -67,7 +67,7 @@ async fn test_connection_pooling() {
6767

6868
let listener_name = "test_pooling_listener_global";
6969
let (_handle, mut connection_receiver, _listener_ref) =
70-
factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener");
70+
factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener");
7171

7272
let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None);
7373

@@ -94,11 +94,11 @@ async fn test_error_scenarios() {
9494
assert!(result.is_err());
9595

9696
let listener_name = "test_error_listener_global";
97-
let result1 = factory.register_listener(listener_name.to_string()).await;
97+
let result1 = factory.register_listener(listener_name.to_string(), None).await;
9898
assert!(result1.is_ok());
9999
let (_handle1, _rx1, _listener_ref1) = result1.unwrap();
100100

101-
let result2 = factory.register_listener(listener_name.to_string()).await;
101+
let result2 = factory.register_listener(listener_name.to_string(), None).await;
102102
assert!(result2.is_err());
103103

104104
let result = factory.unregister_listener("non_existent").await;
@@ -124,7 +124,7 @@ async fn test_global_factory() {
124124
let factory = global_internal_connection_factory();
125125

126126
let listener_name = "test_global_listener";
127-
let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string()).await.unwrap();
127+
let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string(), None).await.unwrap();
128128

129129
assert!(factory.is_listener_active(listener_name).await);
130130

@@ -141,7 +141,7 @@ async fn test_statistics_and_monitoring() {
141141
let listener1 = "test_stats_listener1_global";
142142

143143
let (_handle1, _rx1, _listener_ref1) =
144-
factory.register_listener(listener1.to_string()).await.expect("Failed to register listener1");
144+
factory.register_listener(listener1.to_string(), None).await.expect("Failed to register listener1");
145145

146146
let stats = factory.get_stats().await;
147147
assert!(stats.active_listeners >= 1);
@@ -172,7 +172,7 @@ async fn test_cluster_helpers() {
172172

173173
let factory = global_internal_connection_factory();
174174
let (_handle, _rx, _listener_ref) =
175-
factory.register_listener("test_cluster_helpers_listener".to_string()).await.unwrap();
175+
factory.register_listener("test_cluster_helpers_listener".to_string(), None).await.unwrap();
176176

177177
assert!(is_internal_listener_available("test_cluster_helpers_listener").await);
178178

@@ -181,3 +181,29 @@ async fn test_cluster_helpers() {
181181

182182
factory.unregister_listener("test_cluster_helpers_listener").await.unwrap();
183183
}
184+
185+
#[tokio::test]
186+
async fn test_buffer_size_configuration() {
187+
let factory = global_internal_connection_factory();
188+
189+
let listener_name = "test_buffer_size_listener";
190+
let buffer_size_kb = Some(8);
191+
192+
let (_handle, mut connection_receiver, _listener_ref) =
193+
factory.register_listener(listener_name.to_string(), buffer_size_kb).await.unwrap();
194+
195+
let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None);
196+
197+
let connection_future = connector.connect();
198+
let listener_future = connection_receiver.recv();
199+
let (_cluster_conn, listener_pair) = tokio::join!(connection_future, listener_future);
200+
201+
assert!(listener_pair.is_some());
202+
let connection_pair = listener_pair.unwrap();
203+
204+
let metadata = connection_pair.downstream.metadata();
205+
assert_eq!(metadata.buffer_size_kb, Some(8));
206+
assert_eq!(metadata.listener_name, listener_name);
207+
208+
factory.unregister_listener(listener_name).await.unwrap();
209+
}

orion-lib/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub use clusters::{
4444
load_assignment::PartialClusterLoadAssignment,
4545
ClusterLoadAssignmentBuilder,
4646
};
47-
pub use listeners::listener::ListenerFactory;
47+
pub use listeners::listener::{init_internal_worker_pool, ListenerFactory};
4848
pub use listeners_manager::{ListenerConfigurationChange, ListenersManager, RouteConfigurationChange};
4949
pub use orion_configuration::config::network_filters::http_connection_manager::RouteConfiguration;
5050
use orion_configuration::config::{

orion-lib/src/listeners/listener.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ impl InternalConnectionWorkerPool {
8484
while let Some(task) = receiver.recv().await {
8585
if let Err(e) =
8686
handle_internal_connection_static(task.listener_name, task.connection_pair, task.filter_chains)
87-
.await
8887
{
8988
warn!("Error handling internal connection task: {}", e);
9089
}
@@ -112,8 +111,19 @@ impl InternalConnectionWorkerPool {
112111

113112
static INTERNAL_WORKER_POOL: std::sync::OnceLock<InternalConnectionWorkerPool> = std::sync::OnceLock::new();
114113

114+
pub fn init_internal_worker_pool(num_workers: Option<usize>) {
115+
let workers = num_workers
116+
.or_else(|| std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::<usize>().ok()))
117+
.unwrap_or(4);
118+
let _ = INTERNAL_WORKER_POOL.set(InternalConnectionWorkerPool::new(workers));
119+
}
120+
115121
fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool {
116-
INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default
122+
INTERNAL_WORKER_POOL.get_or_init(|| {
123+
let workers =
124+
std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::<usize>().ok()).unwrap_or(4);
125+
InternalConnectionWorkerPool::new(workers)
126+
})
117127
}
118128

119129
#[derive(Debug, Clone)]
@@ -135,7 +145,6 @@ enum ListenerAddress {
135145

136146
#[derive(Debug, Clone)]
137147
struct InternalListenerConfig {
138-
#[allow(dead_code)]
139148
buffer_size_kb: Option<u32>,
140149
}
141150
#[derive(Debug, Clone)]
@@ -395,7 +404,7 @@ impl Listener {
395404

396405
async fn run_internal_listener(
397406
name: &'static str,
398-
_internal_config: InternalListenerConfig,
407+
internal_config: InternalListenerConfig,
399408
filter_chains: HashMap<FilterChainMatch, FilterchainType>,
400409
_with_tls_inspector: bool,
401410
_with_tlv_listener_filter: bool,
@@ -408,13 +417,14 @@ impl Listener {
408417
let filter_chains = Arc::new(filter_chains);
409418
let factory = global_internal_connection_factory();
410419

411-
let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_owned()).await {
412-
Ok(result) => result,
413-
Err(e) => {
414-
error!("Failed to register internal listener '{}': {}", name, e);
415-
return e;
416-
},
417-
};
420+
let (_handle, mut connection_receiver, _listener_ref) =
421+
match factory.register_listener(name.to_owned(), internal_config.buffer_size_kb).await {
422+
Ok(result) => result,
423+
Err(e) => {
424+
error!("Failed to register internal listener '{}': {}", name, e);
425+
return e;
426+
},
427+
};
418428

419429
info!("Internal listener '{}' registered to connection factory", name);
420430

@@ -983,7 +993,7 @@ filter_chains:
983993
}
984994
}
985995

986-
async fn handle_internal_connection_static(
996+
fn handle_internal_connection_static(
987997
listener_name: String,
988998
connection_pair: orion_internal::InternalConnectionPair,
989999
filter_chains: Arc<HashMap<FilterChainMatch, FilterchainType>>,
@@ -1004,17 +1014,19 @@ async fn handle_internal_connection_static(
10041014
return Err(crate::Error::new("No matching filter chain"));
10051015
};
10061016

1007-
let _downstream_stream = connection_pair.downstream;
1017+
let downstream_stream = connection_pair.downstream;
10081018

10091019
match &filter_chain.handler {
10101020
crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => {
10111021
info!("Processing internal connection through HTTP filter chain");
1012-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1022+
// TODO: Implement HTTP connection processing
1023+
let _ = downstream_stream;
10131024
Ok(())
10141025
},
10151026
crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => {
10161027
info!("Processing internal connection through TCP filter chain");
1017-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1028+
// TODO: Implement TCP connection processing
1029+
let _ = downstream_stream;
10181030
Ok(())
10191031
},
10201032
}

0 commit comments

Comments
 (0)