Skip to main content

Mountain/IPC/Connection/
Manager.rs

1use std::{collections::HashMap, sync::Arc};
2
3use tokio::{
4	sync::{Mutex as AsyncMutex, Semaphore},
5	time::{Duration, timeout},
6};
7
8use super::{
9	Health::HealthChecker,
10	Types::{ConnectionHandle, ConnectionStats},
11};
12use crate::dev_log;
13
14/// Connection manager (alias for ConnectionPool)
15///
16/// This is the main connection management structure, providing connection
17/// pooling with health monitoring and automatic cleanup.
18pub type ConnectionManager = ConnectionPool;
19
20/// Connection pool for IPC operations
21///
22/// This structure manages a pool of connections, preventing connection
23/// exhaustion by reusing connections and providing health monitoring.
24///
25/// ## Pool Architecture
26///
27/// ```text
28/// ConnectionPool
29///     |
30///     | Semaphore (limits max connections)
31///     v
32/// Active Connections (HashMap<id, ConnectionHandle>)
33///     |
34///     | Health Checker (background task)
35///     v
36/// Monitor health and update scores
37/// ```
38///
39/// ## Connection Lifecycle
40///
41/// 1. **Acquisition**: Get a connection from the pool (or create new)
42/// 2. **Usage**: Use the connection for operations
43/// 3. **Release**: Return the connection to the pool
44/// 4. **Cleanup**: Automatically remove stale/unhealthy connections
45///
46/// ## Health Monitoring
47///
48/// Each connection has:
49/// - Health score (0.0 to 100.0)
50/// - Error count
51/// - Last used timestamp
52/// - Background health checks every 30 seconds
53///
54/// ## Example Usage
55///
56/// ```rust,ignore
57/// let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
58///
59/// // Get a connection
60/// let Handle = pool.GetConnection().await?;
61///
62/// // Use the connection...
63///
64/// // Release the connection
65/// pool.ReleaseConnection(Handle).await;
66///
67/// // Get statistics
68/// let stats = pool.GetStats().await;
69/// ```
70pub struct ConnectionPool {
71	/// Maximum number of concurrent connections allowed
72	MaxConnections:usize,
73
74	/// Timeout for acquiring a connection from the pool
75	ConnectionTimeout:Duration,
76
77	/// Semaphore to limit concurrent connections
78	Semaphore:Arc<Semaphore>,
79
80	/// Map of active connection by ID
81	ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83	/// Health checker for monitoring connection health
84	HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88	/// Create a new connection pool with specified parameters
89	///
90	/// ## Parameters
91	/// - `MaxConnections`: Maximum number of concurrent connections
92	/// - `ConnectionTimeout`: Timeout for acquiring a connection
93	///
94	/// ## Example
95	///
96	/// ```rust,ignore
97	/// let pool = ConnectionPool::new(10, Duration::from_secs(30));
98	/// ```
99	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100		dev_log!(
101			"ipc",
102			"[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
103			MaxConnections,
104			ConnectionTimeout
105		);
106
107		Self {
108			MaxConnections,
109
110			ConnectionTimeout,
111
112			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
113
114			ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
115
116			HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
117		}
118	}
119
120	/// Create a connection pool with default settings
121	///
122	/// Default settings: 10 max connections, 30s timeout
123	pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
124
125	/// Get a connection Handle from the pool with timeout
126	///
127	/// This method acquires a semaphore permit and creates a new connection
128	/// Handle. If the pool is at capacity, it will wait until a connection
129	/// becomes available or the timeout is reached.
130	///
131	/// ## Returns
132	/// - `Ok(ConnectionHandle)`: New connection Handle
133	/// - `Err(String)`: Error Message if timeout or failure occurs
134	///
135	/// ## Example
136	///
137	/// ```rust,ignore
138	/// let Handle = pool.GetConnection().await?;
139	/// ```
140	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
141		dev_log!("ipc", "[ConnectionPool] Acquiring connection permit");
142
143		// Acquire semaphore permit with timeout
144		let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
145			.await
146			.map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
147			.map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
148
149		// Create new connection Handle
150		let Handle = ConnectionHandle::new();
151
152		// Add to active connections
153		{
154			let mut connections = self.ActiveConnection.lock().await;
155
156			connections.insert(Handle.id.clone(), Handle.clone());
157		}
158
159		dev_log!(
160			"ipc",
161			"[ConnectionPool] Connection {} acquired (permit released on drop)",
162			Handle.id
163		);
164
165		// Start health monitoring for this connection
166		self.StartHealthMonitoring(&Handle.id).await;
167
168		// The permit will be automatically released when dropped
169		drop(permit);
170
171		Ok(Handle)
172	}
173
174	/// Release a connection Handle back to the pool
175	///
176	/// This method removes the connection from the active connections map,
177	/// allowing the semaphore permit to be reused.
178	///
179	/// ## Parameters
180	/// - `Handle`: The connection Handle to release
181	///
182	/// ## Example
183	///
184	/// ```rust,ignore
185	/// pool.ReleaseConnection(Handle).await;
186	/// ```
187	pub async fn ReleaseConnection(&self, Handle:ConnectionHandle) {
188		dev_log!("ipc", "[ConnectionPool] Releasing connection {}", Handle.id);
189
190		{
191			let mut connections = self.ActiveConnection.lock().await;
192
193			connections.remove(&Handle.id);
194		}
195
196		dev_log!("ipc", "[ConnectionPool] Connection {} released", Handle.id);
197	}
198
199	/// Get connection statistics for monitoring
200	///
201	/// This method returns aggregate statistics about the connection pool,
202	/// useful for monitoring and debugging.
203	///
204	/// ## Returns
205	/// Connection statistics including total connections, healthy connections,
206	/// utilization, etc.
207	///
208	/// ## Example
209	///
210	/// ```rust,ignore
211	/// let stats = pool.GetStats().await;
212	/// println!("Pool stats: {:?}", stats.summary());
213	/// ```
214	pub async fn GetStats(&self) -> ConnectionStats {
215		let connections = self.ActiveConnection.lock().await;
216
217		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
218
219		ConnectionStats {
220			total_connections:connections.len(),
221
222			healthy_connections,
223
224			max_connections:self.MaxConnections,
225
226			available_permits:self.Semaphore.available_permits(),
227
228			connection_timeout:self.ConnectionTimeout,
229		}
230	}
231
232	/// Clean up stale connections
233	///
234	/// This method removes connections that have not been used recently
235	/// or are unhealthy, preventing memory leaks and resource exhaustion.
236	///
237	/// Stale connection criteria:
238	/// - Unused for 5 minutes (300 seconds)
239	/// - Not healthy (health score <= 50 or error count >= 5)
240	///
241	/// ## Returns
242	/// The number of stale connections removed
243	///
244	/// ## Example
245	///
246	/// ```rust,ignore
247	/// let cleaned = pool.CleanUpStaleConnections().await;
248	/// println!("Cleaned up {} stale connections", cleaned);
249	/// ```
250	pub async fn CleanUpStaleConnections(&self) -> usize {
251		let mut connections = self.ActiveConnection.lock().await;
252
253		let now = std::time::SystemTime::now();
254
255		let stale_threshold = Duration::from_secs(300); // 5 minutes
256
257		let stale_ids:Vec<String> = connections
258			.iter()
259			.filter(|(_, Handle)| {
260				// Check if connection is stale using SystemTime
261				let is_stale_by_time = match now.duration_since(Handle.last_used) {
262					Ok(idle_time) => idle_time > stale_threshold,
263					Err(_) => true, // If time went backwards, consider it stale
264				};
265				is_stale_by_time || !Handle.is_healthy()
266			})
267			.map(|(id, _)| id.clone())
268			.collect();
269
270		let stale_count = stale_ids.len();
271
272		for id in stale_ids {
273			dev_log!("ipc", "[ConnectionPool] Removing stale connection {}", id);
274
275			connections.remove(&id);
276		}
277
278		if stale_count > 0 {
279			dev_log!("ipc", "[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
280		}
281
282		stale_count
283	}
284
285	/// Start health monitoring for a connection
286	///
287	/// This method spawns a background task that periodically checks the
288	/// health of the connection and updates its health score.
289	///
290	/// ## Parameters
291	/// - `connection_id`: The ID of the connection to monitor
292	async fn StartHealthMonitoring(&self, connection_id:&str) {
293		let health_checker = self.HealthChecker.clone();
294
295		let active_connection = self.ActiveConnection.clone();
296
297		let connection_id = connection_id.to_string();
298
299		tokio::spawn(async move {
300			let mut interval = tokio::time::interval(Duration::from_secs(30));
301
302			loop {
303				interval.tick().await;
304
305				let checker = health_checker.lock().await;
306				let mut connections = match active_connection.try_lock() {
307					Ok(conns) => conns,
308					Err(_) => continue,
309				};
310
311				if let Some(Handle) = connections.get_mut(&connection_id) {
312					let is_healthy = checker.check_connection_health(Handle).await;
313					Handle.update_health(is_healthy);
314
315					if !Handle.is_healthy() {
316						dev_log!(
317							"ipc",
318							"[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
319							Handle.id,
320							Handle.health_score,
321							Handle.error_count
322						);
323					}
324				} else {
325					// Connection removed from pool, stop monitoring
326					dev_log!(
327						"ipc",
328						"[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
329						connection_id
330					);
331					break;
332				}
333			}
334		});
335	}
336
337	/// Get the maximum number of connections
338	pub fn max_connections(&self) -> usize { self.MaxConnections }
339
340	/// Get the connection timeout
341	pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
342
343	/// Get the number of available permits
344	pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
345
346	/// Get the number of active connections
347	pub async fn active_connection(&self) -> usize {
348		let connections = self.ActiveConnection.lock().await;
349
350		connections.len()
351	}
352}
353
354#[cfg(test)]
355mod tests {
356
357	use super::*;
358
359	#[tokio::test]
360	async fn test_connection_pool_creation() {
361		let pool = ConnectionPool::new(10, Duration::from_secs(30));
362
363		assert_eq!(pool.max_connections(), 10);
364
365		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
366
367		assert_eq!(pool.available_permits(), 10);
368
369		assert_eq!(pool.active_connections().await, 0);
370	}
371
372	#[tokio::test]
373	async fn test_default_connection_pool() {
374		let pool = ConnectionPool::default();
375
376		assert_eq!(pool.max_connections(), 10);
377
378		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
379	}
380
381	#[tokio::test]
382	async fn test_get_and_release_connection() {
383		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
384
385		// Get a connection
386		let Handle = pool.GetConnection().await.unwrap();
387
388		assert_eq!(pool.active_connections().await, 1);
389
390		assert_eq!(pool.available_permits(), 4); // One permit used
391
392		// Release the connection
393		pool.ReleaseConnection(Handle).await;
394
395		assert_eq!(pool.active_connections().await, 0);
396
397		assert_eq!(pool.available_permits(), 5); // Permit restored
398	}
399
400	#[tokio::test]
401	async fn test_multiple_connections() {
402		let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
403
404		// Collect handles properly without await in sync closure
405		let mut handles = Vec::new();
406
407		for _ in 0..3 {
408			handles.push(pool.GetConnection().await.unwrap());
409		}
410
411		assert_eq!(pool.active_connections().await, 3);
412
413		assert_eq!(pool.available_permits(), 0);
414
415		// Try to get one more - should timeout
416		let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
417
418		assert!(result.is_err()); // Timeout
419
420		// Release one connection
421		pool.ReleaseConnection(handles[0].clone()).await;
422
423		assert_eq!(pool.available_permits(), 1);
424
425		// Now we can get another
426		let Handle = pool.GetConnection().await.unwrap();
427
428		assert_eq!(pool.available_permits(), 0);
429
430		// Release all
431		for Handle in handles {
432			pool.ReleaseConnection(Handle).await;
433		}
434
435		pool.ReleaseConnection(Handle).await;
436	}
437
438	#[tokio::test]
439	async fn test_connection_stats() {
440		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
441
442		let stats = pool.GetStats().await;
443
444		assert_eq!(stats.total_connections, 0);
445
446		assert_eq!(stats.healthy_connections, 0);
447
448		assert_eq!(stats.max_connections, 5);
449
450		assert_eq!(stats.utilization(), 0.0);
451
452		// Add some connections
453		for _ in 0..3 {
454			let _ = pool.GetConnection().await.unwrap();
455		}
456
457		let stats = pool.GetStats().await;
458
459		assert_eq!(stats.total_connections, 3);
460
461		assert!(stats.healthy_connections > 0);
462
463		assert!(stats.utilization() > 0.0);
464	}
465
466	#[tokio::test]
467	async fn test_cleanup_stale_connections() {
468		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
469
470		// Create a connection and make it stale
471		let mut Handle = pool.GetConnection().await.unwrap();
472
473		// Manually make it stale by setting old last_used and degrading health
474		unsafe {
475			let ptr = &mut Handle as *mut ConnectionHandle;
476
477			// Set last_used to a time in the past for testing
478			(*ptr).last_used = std::time::SystemTime::now()
479				.checked_sub(Duration::from_secs(360))
480				.unwrap_or((*ptr).last_used);
481
482			(*ptr).health_score = 25.0; // Unhealthy
483		}
484
485		// Release and try to clean up
486		pool.ReleaseConnection(Handle).await;
487
488		// Clean up (will have to adjust logic for testing or add a method to force
489		// cleanup) For now, we'll just verify the method exists and runs
490		let cleaned = pool.CleanUpStaleConnections().await;
491
492		assert!(cleaned >= 0);
493	}
494
495	#[tokio::test]
496	async fn test_pool_utilization() {
497		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
498
499		assert_eq!(pool.GetStats().await.utilization(), 0.0);
500
501		// Use half the connections
502		for _ in 0..5 {
503			let _ = pool.GetConnection().await.unwrap();
504		}
505
506		let stats = pool.GetStats().await;
507
508		assert_eq!(stats.utilization(), 50.0);
509	}
510}