DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs1use std::{
9 collections::HashMap,
10 sync::Arc,
11 time::{Duration, Instant},
12};
13
14use tokio::{
15 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
16 time::{interval, timeout},
17};
18
19use crate::{
20 IPC::Enhanced::ConnectionPool::{
21 ConnectionHandle::Struct as ConnectionHandle,
22 HealthChecker::Struct as HealthChecker,
23 PoolConfig::Struct as PoolConfig,
24 PoolStats::Struct as PoolStats,
25 },
26 dev_log,
27};
28
29pub struct Struct {
30 pub config:PoolConfig,
31
32 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
33
34 pub semaphore:Arc<Semaphore>,
35
36 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
37
38 pub stats:Arc<RwLock<PoolStats>>,
39
40 pub health_checker:Arc<AsyncMutex<HealthChecker>>,
41
42 pub is_running:Arc<AsyncMutex<bool>>,
43}
44
45impl Struct {
46 pub fn new(config:PoolConfig) -> Self {
47 let max_connections = config.max_connections;
48
49 let min_connections = config.min_connections;
50
51 let pool = Self {
52 config:config.clone(),
53
54 connections:Arc::new(AsyncMutex::new(HashMap::new())),
55
56 semaphore:Arc::new(Semaphore::new(max_connections)),
57
58 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
59
60 stats:Arc::new(RwLock::new(PoolStats {
61 total_connections:0,
62 active_connections:0,
63 idle_connections:0,
64 healthy_connections:0,
65 max_connections,
66 min_connections,
67 wait_queue_size:0,
68 average_wait_time_ms:0.0,
69 total_operations:0,
70 successful_operations:0,
71 error_rate:0.0,
72 })),
73
74 health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
75
76 is_running:Arc::new(AsyncMutex::new(false)),
77 };
78
79 dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
80
81 pool
82 }
83
84 pub async fn start(&self) -> Result<(), String> {
85 {
86 let mut running = self.is_running.lock().await;
87
88 if *running {
89 return Ok(());
90 }
91
92 *running = true;
93 }
94
95 self.start_health_monitoring().await;
96
97 self.start_connection_cleanup().await;
98
99 self.initialize_min_connections().await;
100
101 dev_log!("ipc", "[ConnectionPool] Started connection pool");
102
103 Ok(())
104 }
105
106 pub async fn stop(&self) -> Result<(), String> {
107 {
108 let mut running = self.is_running.lock().await;
109
110 if !*running {
111 return Ok(());
112 }
113
114 *running = false;
115 }
116
117 {
118 let mut connections = self.connections.lock().await;
119
120 connections.clear();
121 }
122
123 {
124 let mut wait_queue = self.wait_queue.lock().await;
125
126 for notifier in wait_queue.drain(..) {
127 notifier.notify_one();
128 }
129 }
130
131 dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
132
133 Ok(())
134 }
135
136 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
137 let start_time = Instant::now();
138
139 let _permit = timeout(
140 Duration::from_millis(self.config.connection_timeout_ms),
141 self.semaphore.acquire(),
142 )
143 .await
144 .map_err(|_| "Connection timeout".to_string())?
145 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
146
147 let wait_time = start_time.elapsed().as_millis() as f64;
148
149 {
150 let mut stats = self.stats.write().await;
151
152 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
153 / (stats.total_operations as f64 + 1.0);
154 }
155
156 let connection = self.find_or_create_connection().await?;
157
158 {
159 let mut stats = self.stats.write().await;
160
161 stats.active_connections += 1;
162
163 stats.total_operations += 1;
164 }
165
166 dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
167
168 Ok(connection)
169 }
170
171 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
172 let connection_id = handle.id.clone();
173
174 handle.last_used = Instant::now();
175
176 {
177 let mut connections = self.connections.lock().await;
178
179 connections.insert(handle.id.clone(), handle.clone());
180 }
181
182 {
183 let mut stats = self.stats.write().await;
184
185 stats.active_connections = stats.active_connections.saturating_sub(1);
186
187 stats.idle_connections += 1;
188 }
189
190 drop(handle);
191
192 dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
193 }
194
195 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
196 let mut connections = self.connections.lock().await;
197
198 for (_id, handle) in connections.iter_mut() {
199 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
200 handle.last_used = Instant::now();
201
202 return Ok(handle.clone());
203 }
204 }
205
206 let new_handle = ConnectionHandle::new();
207
208 connections.insert(new_handle.id.clone(), new_handle.clone());
209
210 {
211 let mut stats = self.stats.write().await;
212
213 stats.total_connections += 1;
214
215 stats.healthy_connections += 1;
216 }
217
218 Ok(new_handle)
219 }
220
221 async fn start_health_monitoring(&self) {
222 let pool = Arc::new(self.clone());
223
224 tokio::spawn(async move {
225 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
226
227 while *pool.is_running.lock().await {
228 interval.tick().await;
229
230 if let Err(e) = pool.check_connection_health().await {
231 dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
232 }
233 }
234 });
235 }
236
237 async fn start_connection_cleanup(&self) {
238 let pool = Arc::new(self.clone());
239
240 tokio::spawn(async move {
241 let mut interval = interval(Duration::from_secs(60));
242
243 while *pool.is_running.lock().await {
244 interval.tick().await;
245
246 let cleaned_count = pool.cleanup_stale_connections().await;
247 if cleaned_count > 0 {
248 dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
249 }
250 }
251 });
252 }
253
254 async fn initialize_min_connections(&self) {
255 let current_count = self.connections.lock().await.len();
256
257 if current_count < self.config.min_connections {
258 let needed = self.config.min_connections - current_count;
259
260 for _ in 0..needed {
261 let handle = ConnectionHandle::new();
262
263 let mut connections = self.connections.lock().await;
264
265 connections.insert(handle.id.clone(), handle);
266 }
267
268 dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
269 }
270 }
271
272 async fn check_connection_health(&self) -> Result<(), String> {
273 let mut connections = self.connections.lock().await;
274
275 let mut _health_checker = self.health_checker.lock().await;
276
277 let mut healthy_count = 0;
278
279 for (_id, handle) in connections.iter_mut() {
280 let is_healthy = _health_checker.check_connection_health(handle).await;
281
282 handle.update_health(is_healthy);
283
284 if handle.is_healthy() {
285 healthy_count += 1;
286 }
287 }
288
289 {
290 let mut stats = self.stats.write().await;
291
292 stats.healthy_connections = healthy_count;
293
294 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
295
296 if stats.total_operations > 0 {
297 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
298 }
299 }
300
301 Ok(())
302 }
303
304 pub async fn cleanup_stale_connections(&self) -> usize {
305 let mut connections = self.connections.lock().await;
306
307 let stale_ids:Vec<String> = connections
308 .iter()
309 .filter(|(_, handle)| {
310 handle.age().as_millis() > self.config.max_lifetime_ms as u128
311 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
312 || !handle.is_healthy()
313 })
314 .map(|(id, _)| id.clone())
315 .collect();
316
317 for id in &stale_ids {
318 connections.remove(id);
319 }
320
321 {
322 let mut stats = self.stats.write().await;
323
324 stats.total_connections = connections.len();
325
326 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
327 }
328
329 stale_ids.len()
330 }
331
332 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
333
334 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
335
336 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
337
338 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
339
340 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
341
342 pub fn high_performance_pool() -> Self {
343 Self::new(PoolConfig {
344 max_connections:50,
345 min_connections:10,
346 connection_timeout_ms:10000,
347 max_lifetime_ms:180000,
348 idle_timeout_ms:30000,
349 health_check_interval_ms:15000,
350 })
351 }
352
353 pub fn conservative_pool() -> Self {
354 Self::new(PoolConfig {
355 max_connections:5,
356 min_connections:1,
357 connection_timeout_ms:60000,
358 max_lifetime_ms:600000,
359 idle_timeout_ms:120000,
360 health_check_interval_ms:60000,
361 })
362 }
363
364 pub fn calculate_optimal_pool_size() -> usize {
365 let num_cpus = num_cpus::get();
366
367 (num_cpus * 2).max(4).min(50)
368 }
369}
370
371impl Clone for Struct {
372 fn clone(&self) -> Self {
373 Self {
374 config:self.config.clone(),
375
376 connections:self.connections.clone(),
377
378 semaphore:self.semaphore.clone(),
379
380 wait_queue:self.wait_queue.clone(),
381
382 stats:self.stats.clone(),
383
384 health_checker:self.health_checker.clone(),
385
386 is_running:self.is_running.clone(),
387 }
388 }
389}