1use std::{collections::HashMap, sync::Arc};
2
3use tokio::{
4 sync::{Mutex as AsyncMutex, Semaphore},
5 time::{Duration, timeout},
6};
7
8use super::{
9 Health::HealthChecker,
10 Types::{ConnectionHandle, ConnectionStats},
11};
12use crate::dev_log;
13
14pub type ConnectionManager = ConnectionPool;
19
20pub struct ConnectionPool {
71 MaxConnections:usize,
73
74 ConnectionTimeout:Duration,
76
77 Semaphore:Arc<Semaphore>,
79
80 ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83 HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88 pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100 dev_log!(
101 "ipc",
102 "[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
103 MaxConnections,
104 ConnectionTimeout
105 );
106
107 Self {
108 MaxConnections,
109
110 ConnectionTimeout,
111
112 Semaphore:Arc::new(Semaphore::new(MaxConnections)),
113
114 ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
115
116 HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
117 }
118 }
119
120 pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
124
125 pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
141 dev_log!("ipc", "[ConnectionPool] Acquiring connection permit");
142
143 let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
145 .await
146 .map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
147 .map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
148
149 let Handle = ConnectionHandle::new();
151
152 {
154 let mut connections = self.ActiveConnection.lock().await;
155
156 connections.insert(Handle.id.clone(), Handle.clone());
157 }
158
159 dev_log!(
160 "ipc",
161 "[ConnectionPool] Connection {} acquired (permit released on drop)",
162 Handle.id
163 );
164
165 self.StartHealthMonitoring(&Handle.id).await;
167
168 drop(permit);
170
171 Ok(Handle)
172 }
173
174 pub async fn ReleaseConnection(&self, Handle:ConnectionHandle) {
188 dev_log!("ipc", "[ConnectionPool] Releasing connection {}", Handle.id);
189
190 {
191 let mut connections = self.ActiveConnection.lock().await;
192
193 connections.remove(&Handle.id);
194 }
195
196 dev_log!("ipc", "[ConnectionPool] Connection {} released", Handle.id);
197 }
198
199 pub async fn GetStats(&self) -> ConnectionStats {
215 let connections = self.ActiveConnection.lock().await;
216
217 let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
218
219 ConnectionStats {
220 total_connections:connections.len(),
221
222 healthy_connections,
223
224 max_connections:self.MaxConnections,
225
226 available_permits:self.Semaphore.available_permits(),
227
228 connection_timeout:self.ConnectionTimeout,
229 }
230 }
231
232 pub async fn CleanUpStaleConnections(&self) -> usize {
251 let mut connections = self.ActiveConnection.lock().await;
252
253 let now = std::time::SystemTime::now();
254
255 let stale_threshold = Duration::from_secs(300); let stale_ids:Vec<String> = connections
258 .iter()
259 .filter(|(_, Handle)| {
260 let is_stale_by_time = match now.duration_since(Handle.last_used) {
262 Ok(idle_time) => idle_time > stale_threshold,
263 Err(_) => true, };
265 is_stale_by_time || !Handle.is_healthy()
266 })
267 .map(|(id, _)| id.clone())
268 .collect();
269
270 let stale_count = stale_ids.len();
271
272 for id in stale_ids {
273 dev_log!("ipc", "[ConnectionPool] Removing stale connection {}", id);
274
275 connections.remove(&id);
276 }
277
278 if stale_count > 0 {
279 dev_log!("ipc", "[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
280 }
281
282 stale_count
283 }
284
285 async fn StartHealthMonitoring(&self, connection_id:&str) {
293 let health_checker = self.HealthChecker.clone();
294
295 let active_connection = self.ActiveConnection.clone();
296
297 let connection_id = connection_id.to_string();
298
299 tokio::spawn(async move {
300 let mut interval = tokio::time::interval(Duration::from_secs(30));
301
302 loop {
303 interval.tick().await;
304
305 let checker = health_checker.lock().await;
306 let mut connections = match active_connection.try_lock() {
307 Ok(conns) => conns,
308 Err(_) => continue,
309 };
310
311 if let Some(Handle) = connections.get_mut(&connection_id) {
312 let is_healthy = checker.check_connection_health(Handle).await;
313 Handle.update_health(is_healthy);
314
315 if !Handle.is_healthy() {
316 dev_log!(
317 "ipc",
318 "[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
319 Handle.id,
320 Handle.health_score,
321 Handle.error_count
322 );
323 }
324 } else {
325 dev_log!(
327 "ipc",
328 "[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
329 connection_id
330 );
331 break;
332 }
333 }
334 });
335 }
336
337 pub fn max_connections(&self) -> usize { self.MaxConnections }
339
340 pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
342
343 pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
345
346 pub async fn active_connection(&self) -> usize {
348 let connections = self.ActiveConnection.lock().await;
349
350 connections.len()
351 }
352}
353
354#[cfg(test)]
355mod tests {
356
357 use super::*;
358
359 #[tokio::test]
360 async fn test_connection_pool_creation() {
361 let pool = ConnectionPool::new(10, Duration::from_secs(30));
362
363 assert_eq!(pool.max_connections(), 10);
364
365 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
366
367 assert_eq!(pool.available_permits(), 10);
368
369 assert_eq!(pool.active_connections().await, 0);
370 }
371
372 #[tokio::test]
373 async fn test_default_connection_pool() {
374 let pool = ConnectionPool::default();
375
376 assert_eq!(pool.max_connections(), 10);
377
378 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
379 }
380
381 #[tokio::test]
382 async fn test_get_and_release_connection() {
383 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
384
385 let Handle = pool.GetConnection().await.unwrap();
387
388 assert_eq!(pool.active_connections().await, 1);
389
390 assert_eq!(pool.available_permits(), 4); pool.ReleaseConnection(Handle).await;
394
395 assert_eq!(pool.active_connections().await, 0);
396
397 assert_eq!(pool.available_permits(), 5); }
399
400 #[tokio::test]
401 async fn test_multiple_connections() {
402 let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
403
404 let mut handles = Vec::new();
406
407 for _ in 0..3 {
408 handles.push(pool.GetConnection().await.unwrap());
409 }
410
411 assert_eq!(pool.active_connections().await, 3);
412
413 assert_eq!(pool.available_permits(), 0);
414
415 let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
417
418 assert!(result.is_err()); pool.ReleaseConnection(handles[0].clone()).await;
422
423 assert_eq!(pool.available_permits(), 1);
424
425 let Handle = pool.GetConnection().await.unwrap();
427
428 assert_eq!(pool.available_permits(), 0);
429
430 for Handle in handles {
432 pool.ReleaseConnection(Handle).await;
433 }
434
435 pool.ReleaseConnection(Handle).await;
436 }
437
438 #[tokio::test]
439 async fn test_connection_stats() {
440 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
441
442 let stats = pool.GetStats().await;
443
444 assert_eq!(stats.total_connections, 0);
445
446 assert_eq!(stats.healthy_connections, 0);
447
448 assert_eq!(stats.max_connections, 5);
449
450 assert_eq!(stats.utilization(), 0.0);
451
452 for _ in 0..3 {
454 let _ = pool.GetConnection().await.unwrap();
455 }
456
457 let stats = pool.GetStats().await;
458
459 assert_eq!(stats.total_connections, 3);
460
461 assert!(stats.healthy_connections > 0);
462
463 assert!(stats.utilization() > 0.0);
464 }
465
466 #[tokio::test]
467 async fn test_cleanup_stale_connections() {
468 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
469
470 let mut Handle = pool.GetConnection().await.unwrap();
472
473 unsafe {
475 let ptr = &mut Handle as *mut ConnectionHandle;
476
477 (*ptr).last_used = std::time::SystemTime::now()
479 .checked_sub(Duration::from_secs(360))
480 .unwrap_or((*ptr).last_used);
481
482 (*ptr).health_score = 25.0; }
484
485 pool.ReleaseConnection(Handle).await;
487
488 let cleaned = pool.CleanUpStaleConnections().await;
491
492 assert!(cleaned >= 0);
493 }
494
495 #[tokio::test]
496 async fn test_pool_utilization() {
497 let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
498
499 assert_eq!(pool.GetStats().await.utilization(), 0.0);
500
501 for _ in 0..5 {
503 let _ = pool.GetConnection().await.unwrap();
504 }
505
506 let stats = pool.GetStats().await;
507
508 assert_eq!(stats.utilization(), 50.0);
509 }
510}