Skip to main content

Mountain/IPC/
TauriIPCServer.rs

1//! # TauriIPCServer - Mountain-Wind IPC Bridge
2//!
3//! **File Responsibilities:**
4//! This module serves as the core IPC (Inter-Process Communication) server for
5//! Mountain, establishing and managing the bidirectional communication bridge
6//! between Mountain's Rust backend and Wind's TypeScript frontend. It
7//! implements the Mountain counterpart to Wind's TauriIPCServer.ts, ensuring
8//! seamless integration across the language boundary.
9//!
10//! **Architectural Role in Wind-Mountain Connection:**
11//! The TauriIPCServer acts as the central Message router and communication
12//! orchestrator:
13//!
14//! 1. **Connection Management:**
15//!    - Establishes secure connections between Wind and Mountain
16//!    - Maintains connection health and auto-reconnects on failure
17//!    - Manages connection pooling for optimal resource usage
18//!    - Tracks connection state for monitoring and debugging
19//!
20//! 2. **Message Routing:**
21//!    - Routes incoming messages from Wind to appropriate handlers
22//!    - Broadcasts messages from Mountain to Wind subscribers
23//!    - Implements Message filtering and prioritization
24//!    - Supports point-to-point and publish-subscribe patterns
25//!
26//! 3. **Security Layer:**
27//!    - Validates all incoming messages for security
28//!    - Implements permission-based access control (RBAC)
29//!    - Provides AES-256-GCM encryption for sensitive data
30//!    - Logs all security events for audit trails
31//!
32//! 4. **Reliability Features:**
33//!    - Message queuing for offline scenarios
34//!    - Automatic retry with exponential backoff
35//!    - Graceful degradation when services unavailable
36//!    - Circuit breaker pattern for cascading failure prevention
37//!
38//! **Communication Patterns:**
39//!
40//! **1. Request-Response Pattern:**
41//! ```text
42//! // Wind sends request
43//! let result = app_handle.invoke_handler("command", args).await?;
44//!
45//! // Mountain processes and responds
46//! let response = handle_request().await;
47//! ipc.emit(response_channel, response).await;
48//! ```
49//!
50//! **2. Event Emission Pattern:**
51//! ```text
52//! // Mountain emits events to Wind subscribers
53//! app.emit("configuration-updated", new_config).await;
54//! app.emit("file-changed", file_event).await;
55//! ```
56//!
57//! **3. Broadcast Pattern:**
58//! ```rust
59//! // Broadcast to all subscribers on a channel
60//! for listener in listeners.get(channel) {
61//! 	listener(Message.clone()).await;
62//! }
63//! ```
64//!
65//! **Message Flow:**
66//! ```text
67//! Wind Frontend
68//! |
69//! | 4. Response
70//! v
71//! Tauri Bridge (JS Bridge)
72//! |
73//! | 1. IPC Invoke
74//! v
75//! TauriIPCServer (Rust)
76//! |
77//! | 2. Route & Validate
78//! v
79//! WindServiceHandlers
80//! |
81//! | 3. Execute
82//! v
83//! Mountain Services
84//! ```
85//!
86//! **Key Structures:**
87//!
88//! - **TauriIPCMessage:** Standard Message format for all IPC communication
89//! - **ConnectionStatus:** Tracks connection health and uptime
90//! - **ConnectionPool:** Manages concurrent IPC connections efficiently
91//! - **PermissionManager:** Implements role-based access control
92//! - **SecureMessageChannel:** Provides encryption for sensitive data
93//! - **MessageCompressor:** Gzip compression for large payloads
94//!
95//! **Defensive Coding Practices:**
96//!
97//! 1. **Input Validation:**
98//!    - All messages validated before processing
99//!    - Type checking for all serialized data
100//!    - Schema validation for complex payloads
101//!
102//! 2. **Error Handling:**
103//!    - Comprehensive error messages with context
104//!    - Error logging at appropriate levels
105//!    - Graceful handling of transient failures
106//!    - Automatic retry with backoff
107//!
108//! 3. **Timeout Management:**
109//!    - Configurable timeouts for all operations
110//!    - Timeout-based circuit breaking
111//!    - Graceful degradation on timeout
112//!
113//! 4. **Resource Management:**
114//!    - Connection pooling to prevent exhaustion
115//!    - Automatic cleanup of stale resources
116//!    - Memory-efficient Message queuing
117//!
118//! **Security Architecture:**
119//!
120//! - **Authentication:** User identity verification
121//! - **Authorization:** Permission-based access control (RBAC)
122//! - **Encryption:** AES-256-GCM for sensitive data
123//! - **Auditing:** Complete security event logging
124//! - **Threat Detection:** Anomaly monitoring and alerts
125//!
126//! **Performance Optimizations:**
127//!
128//! - **Message Compression:** Gzip for large payloads
129//! - **Connection Pooling:** Reuse connections efficiently
130//! - **Caching:** Cache frequently used data
131//! - **Batching:** Batch multiple messages for efficiency
132//! - **Async/Await:** Non-blocking I/O operations
133//!
134//! **Monitoring & Observability:**
135//!
136//! - **Connection Status:** Real-time health monitoring
137//! - **Performance Metrics:** Latency, throughput, error rates
138//! - **Audit Logs:** Complete Message and security event logging
139//! - **Health Checks:** Periodic health assessments
140//!
141//! **VSCode RPC Patterns (Study Reference):**
142//! This implementation draws inspiration from VSCode's RPC/IPC architecture:
143//! - Channel-based Message routing
144//! - Request-response correlation
145//! - Cancellation token support
146//! - Binary protocol Message serialization
147//! - Protocol versioning for compatibility
148
149use std::{
150	collections::HashMap,
151	io::{Read, Write},
152	sync::{Arc, Mutex},
153	time::Duration,
154};
155
156use base64::{Engine, engine::general_purpose};
157use flate2::{Compression, read::GzDecoder, write::GzEncoder};
158use ring::{
159	aead::{self, AES_256_GCM, LessSafeKey, UnboundKey},
160	hmac,
161	rand::{SecureRandom, SystemRandom},
162};
163use serde::{Deserialize, Serialize};
164use tauri::{AppHandle, Emitter, Manager};
165use tokio::{
166	sync::{Mutex as AsyncMutex, RwLock, Semaphore},
167	time::timeout,
168};
169
170use crate::dev_log;
171
172/// IPC Message structure matching Wind's ITauriIPCMessage interface
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct TauriIPCMessage {
175	pub channel:String,
176
177	pub data:serde_json::Value,
178
179	pub sender:Option<String>,
180
181	pub timestamp:u64,
182}
183
184/// Connection status Message
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct ConnectionStatus {
187	pub connected:bool,
188}
189
190/// Listener callback type
191type ListenerCallback = Box<dyn Fn(serde_json::Value) -> Result<(), String> + Send + Sync>;
192
193/// Mountain's IPC Server counterpart to Wind's TauriIPCServer
194#[derive(Clone)]
195pub struct TauriIPCServer {
196	app_handle:AppHandle,
197
198	listeners:Arc<Mutex<HashMap<String, Vec<ListenerCallback>>>>,
199
200	is_connected:Arc<Mutex<bool>>,
201
202	message_queue:Arc<Mutex<Vec<TauriIPCMessage>>>,
203}
204
205/// Message compression utility for optimizing IPC Message transfer
206pub struct MessageCompressor {
207	CompressionLevel:u32,
208
209	BatchSize:usize,
210}
211
212impl MessageCompressor {
213	/// Create a new Message compressor with specified parameters
214	pub fn new(CompressionLevel:u32, BatchSize:usize) -> Self { Self { CompressionLevel, BatchSize } }
215
216	/// Compress messages using Gzip for efficient transfer
217	pub fn compress_messages(&self, Messages:Vec<TauriIPCMessage>) -> Result<Vec<u8>, String> {
218		let SerializedMessages =
219			serde_json::to_vec(&Messages).map_err(|e| format!("Failed to serialize messages: {}", e))?;
220
221		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.CompressionLevel));
222
223		encoder
224			.write_all(&SerializedMessages)
225			.map_err(|e| format!("Failed to compress messages: {}", e))?;
226
227		encoder.finish().map_err(|e| format!("Failed to finish compression: {}", e))
228	}
229
230	/// Decompress messages from compressed data
231	pub fn decompress_messages(&self, CompressedData:&[u8]) -> Result<Vec<TauriIPCMessage>, String> {
232		let mut decoder = GzDecoder::new(CompressedData);
233
234		let mut DecompressedData = Vec::new();
235
236		decoder
237			.read_to_end(&mut DecompressedData)
238			.map_err(|e| format!("Failed to decompress data: {}", e))?;
239
240		serde_json::from_slice(&DecompressedData).map_err(|e| format!("Failed to deserialize messages: {}", e))
241	}
242
243	/// Check if messages should be batched for compression
244	pub fn should_batch(&self, MessagesCount:usize) -> bool { MessagesCount >= self.BatchSize }
245}
246
247impl TauriIPCServer {
248	/// Create a new Tauri IPC Server instance
249	pub fn new(app_handle:AppHandle) -> Self {
250		dev_log!("ipc", "[TauriIPCServer] Initializing Mountain IPC Server");
251
252		Self {
253			app_handle,
254
255			listeners:Arc::new(Mutex::new(HashMap::new())),
256
257			is_connected:Arc::new(Mutex::new(false)),
258
259			message_queue:Arc::new(Mutex::new(Vec::new())),
260		}
261	}
262
263	/// Initialize the IPC server and set up event listeners
264	pub async fn initialize(&self) -> Result<(), String> {
265		dev_log!("ipc", "[TauriIPCServer] Setting up IPC listeners");
266
267		// Set up connection status
268		{
269			let mut is_connected = self
270				.is_connected
271				.lock()
272				.map_err(|e| format!("Failed to lock connection status: {}", e))?;
273
274			*is_connected = true;
275		}
276
277		// Notify Wind that Mountain is ready
278		self.send_connection_status(true)
279			.await
280			.map_err(|e| format!("Failed to send connection status: {}", e))?;
281
282		dev_log!("ipc", "[TauriIPCServer] IPC Server initialized successfully");
283
284		// Process any queued messages
285		self.process_message_queue().await;
286
287		Ok(())
288	}
289
290	/// Send a Message to the Wind frontend
291	pub async fn send(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
292		let Message = TauriIPCMessage {
293			channel:channel.to_string(),
294
295			data,
296
297			sender:Some("mountain".to_string()),
298
299			timestamp:std::time::SystemTime::now()
300				.duration_since(std::time::UNIX_EPOCH)
301				.unwrap_or_default()
302				.as_millis() as u64,
303		};
304
305		let is_connected = {
306			let guard = self
307				.is_connected
308				.lock()
309				.map_err(|e| format!("Failed to check connection status: {}", e))?;
310
311			*guard
312		};
313
314		if !is_connected {
315			// Queue the Message for later delivery
316			let mut queue = self
317				.message_queue
318				.lock()
319				.map_err(|e| format!("Failed to access Message queue: {}", e))?;
320
321			queue.push(Message);
322
323			dev_log!(
324				"ipc",
325				"[TauriIPCServer] Message queued (channel: {}, queue size: {})",
326				channel,
327				queue.len()
328			);
329
330			return Ok(());
331		}
332
333		// Send immediately
334		self.emit_message(&Message).await
335	}
336
337	/// Register a listener for incoming messages from Wind
338	pub fn on(&self, channel:&str, callback:ListenerCallback) -> Result<(), String> {
339		let mut listeners = self
340			.listeners
341			.lock()
342			.map_err(|e| format!("Failed to access listeners: {}", e))?;
343
344		listeners.entry(channel.to_string()).or_insert_with(Vec::new).push(callback);
345
346		dev_log!("ipc", "[TauriIPCServer] Listener registered for channel: {}", channel);
347
348		Ok(())
349	}
350
351	/// Remove a listener
352	pub fn off(&self, channel:&str, callback:&ListenerCallback) -> Result<(), String> {
353		let mut listeners = self
354			.listeners
355			.lock()
356			.map_err(|e| format!("Failed to access listeners: {}", e))?;
357
358		if let Some(channel_listeners) = listeners.get_mut(channel) {
359			channel_listeners.retain(|cb| !std::ptr::eq(cb as *const _, callback as *const _));
360
361			if channel_listeners.is_empty() {
362				listeners.remove(channel);
363			}
364		}
365
366		dev_log!("ipc", "[TauriIPCServer] Listener removed from channel: {}", channel);
367
368		Ok(())
369	}
370
371	/// Handle incoming messages from Wind
372	pub async fn IncomingMessage(&self, Message:TauriIPCMessage) -> Result<(), String> {
373		dev_log!("ipc", "[TauriIPCServer] Received Message on channel: {}", Message.channel);
374
375		let listeners = self
376			.listeners
377			.lock()
378			.map_err(|e| format!("Failed to access listeners: {}", e))?;
379
380		if let Some(channel_listeners) = listeners.get(&Message.channel) {
381			for callback in channel_listeners {
382				if let Err(e) = callback(Message.data.clone()) {
383					dev_log!(
384						"ipc",
385						"error: [TauriIPCServer] Error in listener for channel {}: {}",
386						Message.channel,
387						e
388					);
389				}
390			}
391		} else {
392			dev_log!("ipc", "[TauriIPCServer] No listeners found for channel: {}", Message.channel);
393		}
394
395		Ok(())
396	}
397
398	/// Send connection status to Wind
399	async fn send_connection_status(&self, connected:bool) -> Result<(), String> {
400		let status = ConnectionStatus { connected };
401
402		self.app_handle
403			.emit("vscode-ipc-status", status)
404			.map_err(|e| format!("Failed to emit connection status: {}", e))?;
405
406		dev_log!("ipc", "[TauriIPCServer] Connection status sent: {}", connected);
407
408		Ok(())
409	}
410
411	/// Emit a Message to Wind
412	async fn emit_message(&self, Message:&TauriIPCMessage) -> Result<(), String> {
413		self.app_handle
414			.emit("vscode-ipc-Message", Message)
415			.map_err(|e| format!("Failed to emit Message: {}", e))?;
416
417		dev_log!("ipc", "[TauriIPCServer] Message emitted on channel: {}", Message.channel);
418
419		Ok(())
420	}
421
422	/// Process queued messages
423	async fn process_message_queue(&self) {
424		let mut queue = match self.message_queue.lock() {
425			Ok(queue) => queue,
426
427			Err(e) => {
428				dev_log!("ipc", "error: [TauriIPCServer] Failed to access Message queue: {}", e);
429
430				return;
431			},
432		};
433
434		while let Some(Message) = queue.pop() {
435			if let Err(e) = self.emit_message(&Message).await {
436				dev_log!("ipc", "error: [TauriIPCServer] Failed to send queued Message: {}", e);
437
438				// Put the Message back in the queue
439				queue.insert(0, Message);
440
441				break;
442			}
443		}
444
445		dev_log!(
446			"ipc",
447			"[TauriIPCServer] Message queue processed, {} messages remaining",
448			queue.len()
449		);
450	}
451
452	/// Get connection status
453	pub fn get_connection_status(&self) -> Result<bool, String> {
454		let guard = self
455			.is_connected
456			.lock()
457			.map_err(|e| format!("Failed to get connection status: {}", e))?;
458
459		Ok(*guard)
460	}
461
462	/// Get queued Message count
463	pub fn get_queue_size(&self) -> Result<usize, String> {
464		let guard = self
465			.message_queue
466			.lock()
467			.map_err(|e| format!("Failed to get queue size: {}", e))?;
468
469		Ok(guard.len())
470	}
471
472	/// Cleanup resources
473	pub fn dispose(&self) -> Result<(), String> {
474		{
475			let mut listeners = self
476				.listeners
477				.lock()
478				.map_err(|e| format!("Failed to access listeners: {}", e))?;
479
480			listeners.clear();
481		}
482
483		{
484			let mut queue = self
485				.message_queue
486				.lock()
487				.map_err(|e| format!("Failed to access Message queue: {}", e))?;
488
489			queue.clear();
490		}
491
492		{
493			let mut is_connected = self
494				.is_connected
495				.lock()
496				.map_err(|e| format!("Failed to access connection status: {}", e))?;
497
498			*is_connected = false;
499		}
500
501		dev_log!("ipc", "[TauriIPCServer] IPC Server disposed");
502
503		Ok(())
504	}
505
506	/// Advanced: Validate Message permissions
507	pub async fn validate_message_permissions(&self, Message:&TauriIPCMessage) -> Result<(), String> {
508		let permission_manager = PermissionManager::new();
509
510		permission_manager.initialize_defaults().await;
511
512		let context = self.create_security_context(Message);
513
514		// Extract operation from channel name
515		let operation = Message.channel.replace("mountain_", "");
516
517		// Validate permission
518		permission_manager.validate_permission(&operation, &context).await
519	}
520
521	/// Advanced: Create security context from Message
522	fn create_security_context(&self, Message:&TauriIPCMessage) -> SecurityContext {
523		SecurityContext {
524			user_id:Message.sender.clone().unwrap_or("unknown".to_string()),
525
526			// Default role assigned to authenticated IPC connections
527			roles:vec!["user".to_string()],
528
529			permissions:vec![],
530
531			// IPC connections use loopback address for security (localhost only)
532			ip_address:"127.0.0.1".to_string(),
533
534			timestamp:std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(Message.timestamp),
535		}
536	}
537
538	/// Advanced: Log security event
539	pub async fn log_security_event(&self, event:SecurityEvent) {
540		let permission_manager = PermissionManager::new();
541
542		permission_manager.log_security_event(event).await;
543	}
544
545	/// Advanced: Record performance metrics
546	pub async fn record_performance_metrics(&self, channel:String, duration:std::time::Duration, success:bool) {
547		// This would integrate with the PerformanceDashboard
548		dev_log!(
549			"ipc",
550			"[TauriIPCServer] Performance recorded - Channel: {}, Duration: {:?}, Success: {}",
551			channel,
552			duration,
553			success
554		);
555	}
556
557	/// Advanced: Get security audit log
558	pub async fn get_security_audit_log(&self, limit:usize) -> Result<Vec<SecurityEvent>, String> {
559		let permission_manager = PermissionManager::new();
560
561		Ok(permission_manager.get_audit_log(limit).await)
562	}
563
564	/// Send compressed Message batch
565	pub async fn send_compressed_batch(&self, channel:&str, messages:Vec<TauriIPCMessage>) -> Result<(), String> {
566		// Configure compressor with balanced settings: level 6 (good compression/speed
567		// tradeoff) and batch size 10 (aggregate small messages for efficiency)
568		let compressor = MessageCompressor::new(6, 10);
569
570		let compressed_data = compressor
571			.compress_messages(messages)
572			.map_err(|e| format!("Failed to compress batch: {}", e))?;
573
574		let batch_message = TauriIPCMessage {
575			channel:"compressed_batch".to_string(),
576
577			data:serde_json::Value::String(general_purpose::STANDARD.encode(&compressed_data)),
578
579			sender:Some("mountain".to_string()),
580
581			timestamp:std::time::SystemTime::now()
582				.duration_since(std::time::UNIX_EPOCH)
583				.unwrap_or_default()
584				.as_millis() as u64,
585		};
586
587		self.send(channel, serde_json::to_value(batch_message).unwrap()).await
588	}
589
590	/// Handle compressed batch Message
591	pub async fn CompressedBatch(&self, Message:TauriIPCMessage) -> Result<(), String> {
592		let compressed_data_base64 = Message.data.as_str().ok_or("Compressed batch data must be a string")?;
593
594		let compressed_data = general_purpose::STANDARD
595			.decode(compressed_data_base64)
596			.map_err(|e| format!("Failed to decode base64: {}", e))?;
597
598		let compressor = MessageCompressor::new(6, 10);
599
600		let messages = compressor
601			.decompress_messages(&compressed_data)
602			.map_err(|e| format!("Failed to decompress batch: {}", e))?;
603
604		// Process each Message in the batch
605		for Message in messages {
606			self.IncomingMessage(Message).await?;
607		}
608
609		Ok(())
610	}
611
612	/// Send Message using connection pool
613	pub async fn send_with_pool(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
614		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
615
616		let Handle = pool
617			.GetConnection()
618			.await
619			.map_err(|e| format!("Failed to get connection: {}", e))?;
620
621		let result = self.send(channel, data).await;
622
623		pool.ReleaseConnection(Handle).await;
624
625		result
626	}
627
628	/// Get connection pool statistics
629	pub async fn get_connection_stats(&self) -> Result<ConnectionStats, String> {
630		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
631
632		Ok(pool.GetStats().await)
633	}
634
635	/// Send encrypted Message
636	pub async fn send_secure(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
637		let secure_channel =
638			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
639
640		let Message = TauriIPCMessage {
641			channel:channel.to_string(),
642
643			data,
644
645			sender:Some("mountain".to_string()),
646
647			timestamp:std::time::SystemTime::now()
648				.duration_since(std::time::UNIX_EPOCH)
649				.unwrap_or_default()
650				.as_millis() as u64,
651		};
652
653		let encrypted_message = secure_channel
654			.encrypt_message(&Message)
655			.map_err(|e| format!("Failed to encrypt Message: {}", e))?;
656
657		let encrypted_data = serde_json::to_value(encrypted_message)
658			.map_err(|e| format!("Failed to serialize encrypted Message: {}", e))?;
659
660		self.send("secure_message", encrypted_data).await
661	}
662
663	/// Handle encrypted Message
664	pub async fn SecureMessage(&self, encrypted_data:serde_json::Value) -> Result<(), String> {
665		let encrypted_message:EncryptedMessage = serde_json::from_value(encrypted_data)
666			.map_err(|e| format!("Failed to deserialize encrypted Message: {}", e))?;
667
668		let secure_channel =
669			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
670
671		let Message = secure_channel
672			.decrypt_message(&encrypted_message)
673			.map_err(|e| format!("Failed to decrypt Message: {}", e))?;
674
675		self.IncomingMessage(Message).await
676	}
677
678	/// Handle Message with permission validation
679	pub async fn MessageWithPermissions(&self, Message:TauriIPCMessage) -> Result<(), String> {
680		let permission_manager = PermissionManager::new();
681
682		let context = self.create_security_context(&Message);
683
684		// Extract operation from channel name
685		let operation = Message.channel.replace("mountain_", "");
686
687		// Validate permission
688		permission_manager.validate_permission(&operation, &context).await?;
689
690		// Process the Message
691		self.IncomingMessage(Message).await
692	}
693}
694
695/// Connection pool for IPC operations - manages concurrent connections
696/// efficiently
697///
698/// **Purpose:** Prevents connection exhaustion by pooling connections and
699/// reusing them **Features:** Health monitoring, automatic cleanup,
700/// configurable timeouts
701pub struct ConnectionPool {
702	MaxConnections:usize,
703
704	ConnectionTimeout:Duration,
705
706	Semaphore:Arc<Semaphore>,
707
708	ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
709
710	HealthChecker:Arc<AsyncMutex<ConnectionHealthChecker>>,
711}
712
713/// Handle representing an active connection
714#[derive(Clone)]
715pub struct ConnectionHandle {
716	pub id:String,
717
718	pub created_at:std::time::Instant,
719
720	pub last_used:std::time::Instant,
721
722	pub health_score:f64,
723
724	pub error_count:usize,
725}
726
727impl ConnectionHandle {
728	/// Create a new connection Handle with health monitoring
729	pub fn new() -> Self {
730		Self {
731			id:uuid::Uuid::new_v4().to_string(),
732
733			created_at:std::time::Instant::now(),
734
735			last_used:std::time::Instant::now(),
736
737			health_score:100.0,
738
739			error_count:0,
740		}
741	}
742
743	/// Update health score based on operation success
744	pub fn update_health(&mut self, success:bool) {
745		if success {
746			self.health_score = (self.health_score + 10.0).min(100.0);
747
748			self.error_count = 0;
749		} else {
750			self.health_score = (self.health_score - 25.0).max(0.0);
751
752			self.error_count += 1;
753		}
754
755		self.last_used = std::time::Instant::now();
756	}
757
758	/// Check if connection is healthy
759	pub fn is_healthy(&self) -> bool { self.health_score > 50.0 && self.error_count < 5 }
760}
761
762impl ConnectionPool {
763	/// Create a new connection pool with specified parameters
764	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
765		Self {
766			MaxConnections,
767
768			ConnectionTimeout,
769
770			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
771
772			ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
773
774			HealthChecker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
775		}
776	}
777
778	/// Get a connection Handle from the pool with timeout
779	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
780		let _permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
781			.await
782			.map_err(|_| "Connection timeout")?
783			.map_err(|e| format!("Failed to acquire connection: {}", e))?;
784
785		let Handle = ConnectionHandle::new();
786
787		{
788			let mut connections = self.ActiveConnection.lock().await;
789
790			connections.insert(Handle.id.clone(), Handle.clone());
791		}
792
793		// Start health monitoring for this connection
794		self.StartHealthMonitoring(&Handle.id).await;
795
796		Ok(Handle)
797	}
798
799	/// Release a connection Handle back to the pool
800	pub async fn ReleaseConnection(&self, Handle:ConnectionHandle) {
801		{
802			let mut connections = self.ActiveConnection.lock().await;
803
804			connections.remove(&Handle.id);
805		}
806
807		// The permit is released when dropped
808	}
809
810	/// Get connection statistics for monitoring
811	pub async fn GetStats(&self) -> ConnectionStats {
812		let connections = self.ActiveConnection.lock().await;
813
814		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
815
816		ConnectionStats {
817			total_connections:connections.len(),
818
819			healthy_connections,
820
821			max_connections:self.MaxConnections,
822
823			available_permits:self.Semaphore.available_permits(),
824
825			connection_timeout:self.ConnectionTimeout,
826		}
827	}
828
829	/// Clean up stale connections
830	pub async fn CleanUpStaleConnections(&self) -> usize {
831		let mut connections = self.ActiveConnection.lock().await;
832
833		let now = std::time::Instant::now();
834
835		// Stale connections are those unused for 5 minutes (300 seconds)
836		let stale_threshold = Duration::from_secs(300);
837
838		let stale_ids:Vec<String> = connections
839			.iter()
840			.filter(|(_, Handle)| now.duration_since(Handle.last_used) > stale_threshold || !Handle.is_healthy())
841			.map(|(id, _)| id.clone())
842			.collect();
843
844		let stale_count = stale_ids.len();
845
846		for id in stale_ids {
847			connections.remove(&id);
848		}
849
850		stale_count
851	}
852
853	/// Start health monitoring for a connection
854	async fn StartHealthMonitoring(&self, connection_id:&str) {
855		let health_checker = self.HealthChecker.clone();
856
857		let active_connection = self.ActiveConnection.clone();
858
859		let connection_id = connection_id.to_string();
860
861		tokio::spawn(async move {
862			let mut interval = tokio::time::interval(Duration::from_secs(30));
863
864			loop {
865				interval.tick().await;
866
867				let checker = health_checker.lock().await;
868				let mut connections = match active_connection.try_lock() {
869					Ok(conns) => conns,
870					Err(_) => continue,
871				};
872
873				if let Some(Handle) = connections.get_mut(&connection_id) {
874					let is_healthy = checker.check_connection_health(Handle).await;
875					Handle.update_health(is_healthy);
876
877					if !Handle.is_healthy() {
878						dev_log!(
879							"ipc",
880							"Connection {} marked as unhealthy (score: {:.1})",
881							Handle.id,
882							Handle.health_score
883						);
884					}
885				} else {
886					// The connection has been removed from the pool, stop monitoring
887					break;
888				}
889			}
890		});
891	}
892}
893
894/// Connection health checker
895struct ConnectionHealthChecker {
896	ping_timeout:Duration,
897}
898
899impl ConnectionHealthChecker {
900	fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
901
902	/// Check connection health by sending a ping
903	async fn check_connection_health(&self, _handle:&mut ConnectionHandle) -> bool {
904		// Simulate health check by ensuring connection can Handle basic operations
905		// In a real implementation, this would send an actual ping Message
906		let start_time = std::time::Instant::now();
907
908		// Simulate network latency
909		tokio::time::sleep(Duration::from_millis(10)).await;
910
911		let response_time = start_time.elapsed();
912
913		// Connection is healthy if response time is reasonable
914		response_time < self.ping_timeout
915	}
916}
917
918/// Connection statistics
919#[derive(Debug, Clone, Default)]
920pub struct ConnectionStats {
921	pub total_connections:usize,
922
923	pub healthy_connections:usize,
924
925	pub max_connections:usize,
926
927	pub available_permits:usize,
928
929	pub connection_timeout:Duration,
930}
931
932/// Secure Message channel with encryption and authentication
933pub struct SecureMessageChannel {
934	encryption_key:LessSafeKey,
935
936	hmac_key:Vec<u8>,
937}
938
939impl SecureMessageChannel {
940	/// Create a new secure channel
941	pub fn new() -> Result<Self, String> {
942		let rng = SystemRandom::new();
943
944		// Generate encryption key
945		let mut encryption_key_bytes = vec![0u8; 32];
946
947		rng.fill(&mut encryption_key_bytes)
948			.map_err(|e| format!("Failed to generate encryption key: {}", e))?;
949
950		let unbound_key = UnboundKey::new(&AES_256_GCM, &encryption_key_bytes)
951			.map_err(|e| format!("Failed to create unbound key: {}", e))?;
952
953		let encryption_key = LessSafeKey::new(unbound_key);
954
955		// Generate HMAC key
956		let mut hmac_key = vec![0u8; 32];
957
958		rng.fill(&mut hmac_key)
959			.map_err(|e| format!("Failed to generate HMAC key: {}", e))?;
960
961		Ok(Self { encryption_key, hmac_key })
962	}
963
964	/// Encrypt and authenticate a Message
965	pub fn encrypt_message(&self, Message:&TauriIPCMessage) -> Result<EncryptedMessage, String> {
966		let serialized_message =
967			serde_json::to_vec(Message).map_err(|e| format!("Failed to serialize Message: {}", e))?;
968
969		// Generate nonce
970		let mut nonce = [0u8; 12];
971
972		SystemRandom::new()
973			.fill(&mut nonce)
974			.map_err(|e| format!("Failed to generate nonce: {}", e))?;
975
976		// Encrypt Message
977		let mut in_out = serialized_message.clone();
978
979		self.encryption_key
980			.seal_in_place_append_tag(aead::Nonce::assume_unique_for_key(nonce), aead::Aad::empty(), &mut in_out)
981			.map_err(|e| format!("Encryption failed: {}", e))?;
982
983		// Create HMAC
984		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
985
986		let hmac_tag = hmac::sign(&hmac_key, &in_out);
987
988		Ok(EncryptedMessage { nonce:nonce.to_vec(), ciphertext:in_out, hmac_tag:hmac_tag.as_ref().to_vec() })
989	}
990
991	/// Decrypt and verify a Message
992	pub fn decrypt_message(&self, encrypted:&EncryptedMessage) -> Result<TauriIPCMessage, String> {
993		// Verify HMAC
994		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
995
996		hmac::verify(&hmac_key, &encrypted.ciphertext, &encrypted.hmac_tag)
997			.map_err(|_| "HMAC verification failed".to_string())?;
998
999		// Decrypt Message
1000		let mut in_out = encrypted.ciphertext.clone();
1001
1002		let nonce_slice:&[u8] = &encrypted.nonce;
1003
1004		let nonce_array:[u8; 12] = nonce_slice.try_into().map_err(|_| "Invalid nonce length".to_string())?;
1005
1006		let nonce = aead::Nonce::assume_unique_for_key(nonce_array);
1007
1008		self.encryption_key
1009			.open_in_place(nonce, aead::Aad::empty(), &mut in_out)
1010			.map_err(|e| format!("Decryption failed: {}", e))?;
1011
1012		// Remove authentication tag
1013		let plaintext_len = in_out.len() - AES_256_GCM.tag_len();
1014
1015		in_out.truncate(plaintext_len);
1016
1017		// Deserialize Message
1018		serde_json::from_slice(&in_out).map_err(|e| format!("Failed to deserialize Message: {}", e))
1019	}
1020
1021	/// Rotate encryption keys
1022	pub fn rotate_keys(&mut self) -> Result<(), String> {
1023		*self = Self::new()?;
1024		Ok(())
1025	}
1026}
1027
1028/// Encrypted Message structure
1029#[derive(Debug, Clone, Serialize, Deserialize)]
1030pub struct EncryptedMessage {
1031	nonce:Vec<u8>,
1032
1033	ciphertext:Vec<u8>,
1034
1035	hmac_tag:Vec<u8>,
1036}
1037
1038/// Advanced permission-based IPC Message handler
1039#[tauri::command]
1040pub async fn mountain_ipc_receive_message(app_handle:tauri::AppHandle, Message:TauriIPCMessage) -> Result<(), String> {
1041	dev_log!(
1042		"ipc",
1043		"[TauriIPCServer] Received IPC Message from Wind on channel: {}",
1044		Message.channel
1045	);
1046
1047	// Get the IPC server instance from application state
1048	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
1049		// Advanced security: Validate permissions before processing
1050		if let Err(e) = ipc_server.validate_message_permissions(&Message).await {
1051			dev_log!(
1052				"ipc",
1053				"error: [TauriIPCServer] Permission validation failed for channel {}: {}",
1054				Message.channel,
1055				e
1056			);
1057
1058			// Log security event
1059			ipc_server
1060				.log_security_event(SecurityEvent {
1061					event_type:SecurityEventType::PermissionDenied,
1062					user_id:Message.sender.clone().unwrap_or("unknown".to_string()),
1063					operation:Message.channel.clone(),
1064					timestamp:std::time::SystemTime::now(),
1065					details:Some(format!("Permission denied: {}", e)),
1066				})
1067				.await;
1068
1069			return Err(format!("Permission denied: {}", e));
1070		}
1071
1072		// Advanced monitoring: Track Message processing time
1073		let start_time = std::time::Instant::now();
1074
1075		let result = ipc_server.IncomingMessage(Message.clone()).await;
1076
1077		let duration = start_time.elapsed();
1078
1079		// Record performance metrics
1080		ipc_server
1081			.record_performance_metrics(Message.channel, duration, result.is_ok())
1082			.await;
1083
1084		result
1085	} else {
1086		Err("IPC Server not found in application state".to_string())
1087	}
1088}
1089
1090/// Tauri command handler for Wind to check connection status
1091///
1092/// **Command Registration:** Registered in Tauri's invoke_handler
1093/// Called by Wind using: `app.Handle.invoke('mountain_ipc_get_status')`
1094///
1095/// **Response Format:**
1096/// ```json
1097/// {
1098///   "connected": true
1099/// }
1100/// ```
1101#[tauri::command]
1102pub async fn mountain_ipc_get_status(app_handle:tauri::AppHandle) -> Result<ConnectionStatus, String> {
1103	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
1104		let connected = ipc_server
1105			.get_connection_status()
1106			.map_err(|e| format!("Failed to get connection status: {}", e))?;
1107
1108		Ok(ConnectionStatus { connected })
1109	} else {
1110		Err("IPC Server not found in application state".to_string())
1111	}
1112}
1113
1114/// Security context for permission validation
1115#[derive(Debug, Clone, Serialize, Deserialize)]
1116pub struct SecurityContext {
1117	pub user_id:String,
1118
1119	pub roles:Vec<String>,
1120
1121	pub permissions:Vec<String>,
1122
1123	pub ip_address:String,
1124
1125	pub timestamp:std::time::SystemTime,
1126}
1127
1128/// Permission manager for IPC operations
1129pub struct PermissionManager {
1130	roles:Arc<RwLock<HashMap<String, Role>>>,
1131
1132	permissions:Arc<RwLock<HashMap<String, Permission>>>,
1133
1134	audit_log:Arc<RwLock<Vec<SecurityEvent>>>,
1135}
1136
1137/// Security event for auditing
1138#[derive(Debug, Clone, Serialize, Deserialize)]
1139pub struct SecurityEvent {
1140	pub event_type:SecurityEventType,
1141
1142	pub user_id:String,
1143
1144	pub operation:String,
1145
1146	pub timestamp:std::time::SystemTime,
1147
1148	pub details:Option<String>,
1149}
1150
1151#[derive(Debug, Clone, Serialize, Deserialize)]
1152pub enum SecurityEventType {
1153	PermissionDenied,
1154
1155	AccessGranted,
1156
1157	ConfigurationChange,
1158
1159	SecurityViolation,
1160
1161	PerformanceAnomaly,
1162}
1163
1164/// Role definition for RBAC
1165#[derive(Debug, Clone, Serialize, Deserialize)]
1166pub struct Role {
1167	pub name:String,
1168
1169	pub permissions:Vec<String>,
1170
1171	pub description:String,
1172}
1173
1174/// Permission definition
1175#[derive(Debug, Clone, Serialize, Deserialize)]
1176pub struct Permission {
1177	pub name:String,
1178
1179	pub description:String,
1180
1181	pub category:String,
1182}
1183
1184impl PermissionManager {
1185	pub fn new() -> Self {
1186		Self {
1187			roles:Arc::new(RwLock::new(HashMap::new())),
1188
1189			permissions:Arc::new(RwLock::new(HashMap::new())),
1190
1191			audit_log:Arc::new(RwLock::new(Vec::new())),
1192		}
1193	}
1194
1195	/// Validate permission for an operation
1196	pub async fn validate_permission(&self, operation:&str, context:&SecurityContext) -> Result<(), String> {
1197		// Check if operation requires specific permissions
1198		let required_permissions = self.get_required_permissions(operation).await;
1199
1200		if required_permissions.is_empty() {
1201			return Ok(()); // No specific permissions required
1202		}
1203
1204		// Check if user has required permissions
1205		let mut user_permissions:Vec<String> = context.permissions.iter().cloned().collect();
1206
1207		for role in context.roles.iter() {
1208			let role_perms = self.get_role_permissions(role).await;
1209
1210			user_permissions.extend(role_perms);
1211		}
1212
1213		for required in required_permissions {
1214			if !user_permissions.contains(&required) {
1215				return Err(format!("Missing permission: {}", required));
1216			}
1217		}
1218
1219		// Log successful access
1220		self.log_security_event(SecurityEvent {
1221			event_type:SecurityEventType::AccessGranted,
1222			user_id:context.user_id.clone(),
1223			operation:operation.to_string(),
1224			timestamp:std::time::SystemTime::now(),
1225			details:Some(format!("Access granted for operation: {}", operation)),
1226		})
1227		.await;
1228
1229		Ok(())
1230	}
1231
1232	/// Get required permissions for an operation
1233	async fn get_required_permissions(&self, operation:&str) -> Vec<String> {
1234		// Define operation-to-permission mapping
1235		match operation {
1236			"file:write" | "file:delete" => vec!["file.write".to_string()],
1237
1238			"configuration:update" => vec!["config.update".to_string()],
1239
1240			"storage:set" => vec!["storage.write".to_string()],
1241
1242			"native:openExternal" => vec!["system.external".to_string()],
1243
1244			// Operations not in the mapping require no special permissions by default
1245			_ => Vec::new(),
1246		}
1247	}
1248
1249	/// Get permissions for a role
1250	async fn get_role_permissions(&self, role_name:&str) -> Vec<String> {
1251		let roles = self.roles.read().await;
1252
1253		roles.get(role_name).map(|role| role.permissions.clone()).unwrap_or_default()
1254	}
1255
1256	/// Log security event
1257	pub async fn log_security_event(&self, event:SecurityEvent) {
1258		let mut audit_log = self.audit_log.write().await;
1259
1260		audit_log.push(event);
1261
1262		// Keep only last 1000 events
1263		if audit_log.len() > 1000 {
1264			audit_log.remove(0);
1265		}
1266	}
1267
1268	/// Get security audit log
1269	pub async fn get_audit_log(&self, limit:usize) -> Vec<SecurityEvent> {
1270		let audit_log = self.audit_log.read().await;
1271
1272		audit_log.iter().rev().take(limit).cloned().collect()
1273	}
1274
1275	/// Initialize default roles and permissions
1276	pub async fn initialize_defaults(&self) {
1277		let mut permissions = self.permissions.write().await;
1278
1279		let mut roles = self.roles.write().await;
1280
1281		// Define standard permissions
1282		let standard_permissions = vec![
1283			("file.read", "Read file operations"),
1284			("file.write", "Write file operations"),
1285			("config.read", "Read configuration"),
1286			("config.update", "Update configuration"),
1287			("storage.read", "Read storage"),
1288			("storage.write", "Write storage"),
1289			("system.external", "Access external system resources"),
1290		];
1291
1292		for (name, description) in standard_permissions {
1293			permissions.insert(
1294				name.to_string(),
1295				Permission {
1296					name:name.to_string(),
1297					description:description.to_string(),
1298					category:"standard".to_string(),
1299				},
1300			);
1301		}
1302
1303		// Define standard roles
1304		let standard_roles = vec![
1305			("user", vec!["file.read", "config.read", "storage.read"]),
1306			(
1307				"developer",
1308				vec!["file.read", "file.write", "config.read", "storage.read", "storage.write"],
1309			),
1310			(
1311				"admin",
1312				vec![
1313					"file.read",
1314					"file.write",
1315					"config.read",
1316					"config.update",
1317					"storage.read",
1318					"storage.write",
1319					"system.external",
1320				],
1321			),
1322		];
1323
1324		for (name, role_permissions) in standard_roles {
1325			roles.insert(
1326				name.to_string(),
1327				Role {
1328					name:name.to_string(),
1329					permissions:role_permissions.iter().map(|p| p.to_string()).collect(),
1330					description:format!("{} role with standard permissions", name),
1331				},
1332			);
1333		}
1334	}
1335}