Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs

1//! `Pool::Struct` - bounded connection pool with health
2//! monitoring, idle/lifetime cleanup, wait-queue timeouts, and
3//! statistics. Acquire via `get_connection` (drops a permit on
4//! the inner `Semaphore`); return via `release_connection`.
5//! The struct + 18-method impl + Clone + tests stay in one
6//! file - tightly coupled cluster.
7
8use std::{
9	collections::HashMap,
10	sync::Arc,
11	time::{Duration, Instant},
12};
13
14use tokio::{
15	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
16	time::{interval, timeout},
17};
18
19use crate::{
20	IPC::Enhanced::ConnectionPool::{
21		ConnectionHandle::Struct as ConnectionHandle,
22		HealthChecker::Struct as HealthChecker,
23		PoolConfig::Struct as PoolConfig,
24		PoolStats::Struct as PoolStats,
25	},
26	dev_log,
27};
28
29pub struct Struct {
30	pub config:PoolConfig,
31
32	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
33
34	pub semaphore:Arc<Semaphore>,
35
36	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
37
38	pub stats:Arc<RwLock<PoolStats>>,
39
40	pub health_checker:Arc<AsyncMutex<HealthChecker>>,
41
42	pub is_running:Arc<AsyncMutex<bool>>,
43}
44
45impl Struct {
46	pub fn new(config:PoolConfig) -> Self {
47		let max_connections = config.max_connections;
48
49		let min_connections = config.min_connections;
50
51		let pool = Self {
52			config:config.clone(),
53
54			connections:Arc::new(AsyncMutex::new(HashMap::new())),
55
56			semaphore:Arc::new(Semaphore::new(max_connections)),
57
58			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
59
60			stats:Arc::new(RwLock::new(PoolStats {
61				total_connections:0,
62				active_connections:0,
63				idle_connections:0,
64				healthy_connections:0,
65				max_connections,
66				min_connections,
67				wait_queue_size:0,
68				average_wait_time_ms:0.0,
69				total_operations:0,
70				successful_operations:0,
71				error_rate:0.0,
72			})),
73
74			health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
75
76			is_running:Arc::new(AsyncMutex::new(false)),
77		};
78
79		dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
80
81		pool
82	}
83
84	pub async fn start(&self) -> Result<(), String> {
85		{
86			let mut running = self.is_running.lock().await;
87
88			if *running {
89				return Ok(());
90			}
91
92			*running = true;
93		}
94
95		self.start_health_monitoring().await;
96
97		self.start_connection_cleanup().await;
98
99		self.initialize_min_connections().await;
100
101		dev_log!("ipc", "[ConnectionPool] Started connection pool");
102
103		Ok(())
104	}
105
106	pub async fn stop(&self) -> Result<(), String> {
107		{
108			let mut running = self.is_running.lock().await;
109
110			if !*running {
111				return Ok(());
112			}
113
114			*running = false;
115		}
116
117		{
118			let mut connections = self.connections.lock().await;
119
120			connections.clear();
121		}
122
123		{
124			let mut wait_queue = self.wait_queue.lock().await;
125
126			for notifier in wait_queue.drain(..) {
127				notifier.notify_one();
128			}
129		}
130
131		dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
132
133		Ok(())
134	}
135
136	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
137		let start_time = Instant::now();
138
139		let _permit = timeout(
140			Duration::from_millis(self.config.connection_timeout_ms),
141			self.semaphore.acquire(),
142		)
143		.await
144		.map_err(|_| "Connection timeout".to_string())?
145		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
146
147		let wait_time = start_time.elapsed().as_millis() as f64;
148
149		{
150			let mut stats = self.stats.write().await;
151
152			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
153				/ (stats.total_operations as f64 + 1.0);
154		}
155
156		let connection = self.find_or_create_connection().await?;
157
158		{
159			let mut stats = self.stats.write().await;
160
161			stats.active_connections += 1;
162
163			stats.total_operations += 1;
164		}
165
166		dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
167
168		Ok(connection)
169	}
170
171	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
172		let connection_id = handle.id.clone();
173
174		handle.last_used = Instant::now();
175
176		{
177			let mut connections = self.connections.lock().await;
178
179			connections.insert(handle.id.clone(), handle.clone());
180		}
181
182		{
183			let mut stats = self.stats.write().await;
184
185			stats.active_connections = stats.active_connections.saturating_sub(1);
186
187			stats.idle_connections += 1;
188		}
189
190		drop(handle);
191
192		dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
193	}
194
195	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
196		let mut connections = self.connections.lock().await;
197
198		for (_id, handle) in connections.iter_mut() {
199			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
200				handle.last_used = Instant::now();
201
202				return Ok(handle.clone());
203			}
204		}
205
206		let new_handle = ConnectionHandle::new();
207
208		connections.insert(new_handle.id.clone(), new_handle.clone());
209
210		{
211			let mut stats = self.stats.write().await;
212
213			stats.total_connections += 1;
214
215			stats.healthy_connections += 1;
216		}
217
218		Ok(new_handle)
219	}
220
221	async fn start_health_monitoring(&self) {
222		let pool = Arc::new(self.clone());
223
224		tokio::spawn(async move {
225			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
226
227			while *pool.is_running.lock().await {
228				interval.tick().await;
229
230				if let Err(e) = pool.check_connection_health().await {
231					dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
232				}
233			}
234		});
235	}
236
237	async fn start_connection_cleanup(&self) {
238		let pool = Arc::new(self.clone());
239
240		tokio::spawn(async move {
241			let mut interval = interval(Duration::from_secs(60));
242
243			while *pool.is_running.lock().await {
244				interval.tick().await;
245
246				let cleaned_count = pool.cleanup_stale_connections().await;
247				if cleaned_count > 0 {
248					dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
249				}
250			}
251		});
252	}
253
254	async fn initialize_min_connections(&self) {
255		let current_count = self.connections.lock().await.len();
256
257		if current_count < self.config.min_connections {
258			let needed = self.config.min_connections - current_count;
259
260			for _ in 0..needed {
261				let handle = ConnectionHandle::new();
262
263				let mut connections = self.connections.lock().await;
264
265				connections.insert(handle.id.clone(), handle);
266			}
267
268			dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
269		}
270	}
271
272	async fn check_connection_health(&self) -> Result<(), String> {
273		let mut connections = self.connections.lock().await;
274
275		let mut _health_checker = self.health_checker.lock().await;
276
277		let mut healthy_count = 0;
278
279		for (_id, handle) in connections.iter_mut() {
280			let is_healthy = _health_checker.check_connection_health(handle).await;
281
282			handle.update_health(is_healthy);
283
284			if handle.is_healthy() {
285				healthy_count += 1;
286			}
287		}
288
289		{
290			let mut stats = self.stats.write().await;
291
292			stats.healthy_connections = healthy_count;
293
294			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
295
296			if stats.total_operations > 0 {
297				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
298			}
299		}
300
301		Ok(())
302	}
303
304	pub async fn cleanup_stale_connections(&self) -> usize {
305		let mut connections = self.connections.lock().await;
306
307		let stale_ids:Vec<String> = connections
308			.iter()
309			.filter(|(_, handle)| {
310				handle.age().as_millis() > self.config.max_lifetime_ms as u128
311					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
312					|| !handle.is_healthy()
313			})
314			.map(|(id, _)| id.clone())
315			.collect();
316
317		for id in &stale_ids {
318			connections.remove(id);
319		}
320
321		{
322			let mut stats = self.stats.write().await;
323
324			stats.total_connections = connections.len();
325
326			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
327		}
328
329		stale_ids.len()
330	}
331
332	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
333
334	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
335
336	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
337
338	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
339
340	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
341
342	pub fn high_performance_pool() -> Self {
343		Self::new(PoolConfig {
344			max_connections:50,
345			min_connections:10,
346			connection_timeout_ms:10000,
347			max_lifetime_ms:180000,
348			idle_timeout_ms:30000,
349			health_check_interval_ms:15000,
350		})
351	}
352
353	pub fn conservative_pool() -> Self {
354		Self::new(PoolConfig {
355			max_connections:5,
356			min_connections:1,
357			connection_timeout_ms:60000,
358			max_lifetime_ms:600000,
359			idle_timeout_ms:120000,
360			health_check_interval_ms:60000,
361		})
362	}
363
364	pub fn calculate_optimal_pool_size() -> usize {
365		let num_cpus = num_cpus::get();
366
367		(num_cpus * 2).max(4).min(50)
368	}
369}
370
371impl Clone for Struct {
372	fn clone(&self) -> Self {
373		Self {
374			config:self.config.clone(),
375
376			connections:self.connections.clone(),
377
378			semaphore:self.semaphore.clone(),
379
380			wait_queue:self.wait_queue.clone(),
381
382			stats:self.stats.clone(),
383
384			health_checker:self.health_checker.clone(),
385
386			is_running:self.is_running.clone(),
387		}
388	}
389}