Skip to main content

Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs

1#![allow(non_snake_case)]
2
3//! `PerformanceDashboard` aggregator + 25-method impl. Holds
4//! the metric ring-buffer, trace store, alert ring, statistics
5//! cell, and the `is_running` lifecycle flag. Method bodies
6//! tightly couple with the sibling DTOs so the impl stays a
7//! single file (per the "tightly-coupled cluster" exception).
8
9use std::{
10	collections::{HashMap, VecDeque},
11	sync::Arc,
12	time::{Duration, SystemTime},
13};
14
15use tokio::{
16	sync::{Mutex as AsyncMutex, RwLock},
17	time::interval,
18};
19
20use crate::{
21	IPC::Enhanced::PerformanceDashboard::{
22		AlertSeverity::Enum as AlertSeverity,
23		DashboardConfig::Struct as DashboardConfig,
24		DashboardStatistics::Struct as DashboardStatistics,
25		LogLevel::Enum as LogLevel,
26		MetricType::Enum as MetricType,
27		PerformanceAlert::Struct as PerformanceAlert,
28		PerformanceMetric::Struct as PerformanceMetric,
29		TraceLog::Struct as TraceLog,
30		TraceSpan::Struct as TraceSpan,
31	},
32	dev_log,
33};
34
35pub struct Struct {
36	pub(super) config:DashboardConfig,
37
38	pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
39
40	pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
41
42	pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
43
44	pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
45
46	pub(super) is_running:Arc<AsyncMutex<bool>>,
47}
48
49impl Struct {
50	pub fn new(config:DashboardConfig) -> Self {
51		let config_clone = config.clone();
52
53		let dashboard = Self {
54			config,
55
56			metrics:Arc::new(RwLock::new(VecDeque::new())),
57
58			traces:Arc::new(RwLock::new(HashMap::new())),
59
60			alerts:Arc::new(RwLock::new(VecDeque::new())),
61
62			statistics:Arc::new(RwLock::new(DashboardStatistics {
63				total_metrics_collected:0,
64				total_traces_collected:0,
65				total_alerts_triggered:0,
66				average_processing_time_ms:0.0,
67				peak_processing_time_ms:0,
68				error_rate_percentage:0.0,
69				throughput_messages_per_second:0.0,
70				memory_usage_mb:0.0,
71				last_update:SystemTime::now()
72					.duration_since(SystemTime::UNIX_EPOCH)
73					.unwrap_or_default()
74					.as_secs(),
75			})),
76
77			is_running:Arc::new(AsyncMutex::new(false)),
78		};
79
80		dev_log!(
81			"ipc",
82			"[PerformanceDashboard] Created dashboard with {}ms update interval",
83			config_clone.update_interval_ms
84		);
85
86		dashboard
87	}
88
89	pub async fn start(&self) -> Result<(), String> {
90		{
91			let mut running = self.is_running.lock().await;
92
93			if *running {
94				return Ok(());
95			}
96
97			*running = true;
98		}
99
100		self.start_metrics_collection().await;
101
102		self.start_alert_monitoring().await;
103
104		self.start_data_cleanup().await;
105
106		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
107
108		Ok(())
109	}
110
111	pub async fn stop(&self) -> Result<(), String> {
112		{
113			let mut running = self.is_running.lock().await;
114
115			if !*running {
116				return Ok(());
117			}
118
119			*running = false;
120		}
121
122		{
123			let mut metrics = self.metrics.write().await;
124
125			metrics.clear();
126		}
127
128		{
129			let mut traces = self.traces.write().await;
130
131			traces.clear();
132		}
133
134		{
135			let mut alerts = self.alerts.write().await;
136
137			alerts.clear();
138		}
139
140		dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
141
142		Ok(())
143	}
144
145	pub async fn record_metric(&self, metric:PerformanceMetric) {
146		let mut metrics = self.metrics.write().await;
147
148		metrics.push_back(metric.clone());
149
150		drop(metrics);
151
152		self.update_statistics().await;
153
154		self.check_alerts(&metric).await;
155
156		dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
157	}
158
159	pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
160		let trace_id = Self::generate_trace_id();
161
162		let span_id = Self::generate_span_id();
163
164		let span = TraceSpan {
165			trace_id:trace_id.clone(),
166
167			span_id:span_id.clone(),
168
169			parent_span_id:None,
170
171			operation_name,
172
173			start_time:SystemTime::now()
174				.duration_since(SystemTime::UNIX_EPOCH)
175				.unwrap_or_default()
176				.as_millis() as u64,
177
178			end_time:None,
179
180			duration_ms:None,
181
182			tags:HashMap::new(),
183
184			logs:Vec::new(),
185		};
186
187		{
188			let mut traces = self.traces.write().await;
189
190			traces.insert(span_id.clone(), span.clone());
191		}
192
193		{
194			let mut stats = self.statistics.write().await;
195
196			stats.total_traces_collected += 1;
197		}
198
199		span
200	}
201
202	pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
203		let mut traces = self.traces.write().await;
204
205		if let Some(span) = traces.get_mut(span_id) {
206			let end_time = SystemTime::now()
207				.duration_since(SystemTime::UNIX_EPOCH)
208				.unwrap_or_default()
209				.as_millis() as u64;
210
211			span.end_time = Some(end_time);
212
213			span.duration_ms = Some(end_time.saturating_sub(span.start_time));
214
215			dev_log!(
216				"ipc",
217				"[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
218				span.operation_name,
219				span.duration_ms.unwrap_or(0)
220			);
221
222			Ok(())
223		} else {
224			Err(format!("Trace span not found: {}", span_id))
225		}
226	}
227
228	pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
229		let mut traces = self.traces.write().await;
230
231		if let Some(span) = traces.get_mut(span_id) {
232			span.logs.push(log);
233
234			Ok(())
235		} else {
236			Err(format!("Trace span not found: {}", span_id))
237		}
238	}
239
240	async fn start_metrics_collection(&self) {
241		let dashboard = Arc::new(self.clone());
242
243		tokio::spawn(async move {
244			let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
245
246			while *dashboard.is_running.lock().await {
247				interval.tick().await;
248				dashboard.collect_system_metrics().await;
249				dashboard.update_statistics().await;
250			}
251		});
252	}
253
254	async fn start_alert_monitoring(&self) {
255		let dashboard = Arc::new(self.clone());
256
257		tokio::spawn(async move {
258			let mut interval = interval(Duration::from_secs(10));
259
260			while *dashboard.is_running.lock().await {
261				interval.tick().await;
262				dashboard.check_performance_alerts().await;
263			}
264		});
265	}
266
267	async fn start_data_cleanup(&self) {
268		let dashboard = Arc::new(self.clone());
269
270		tokio::spawn(async move {
271			let mut interval = interval(Duration::from_secs(3600));
272
273			while *dashboard.is_running.lock().await {
274				interval.tick().await;
275				dashboard.cleanup_old_data().await;
276			}
277		});
278	}
279
280	async fn collect_system_metrics(&self) {
281		if let Ok(memory_usage) = Self::get_memory_usage() {
282			let metric = PerformanceMetric {
283				metric_type:MetricType::MemoryUsage,
284
285				value:memory_usage,
286
287				timestamp:SystemTime::now()
288					.duration_since(SystemTime::UNIX_EPOCH)
289					.unwrap_or_default()
290					.as_millis() as u64,
291
292				channel:None,
293
294				tags:HashMap::new(),
295			};
296
297			self.record_metric(metric).await;
298		}
299
300		if let Ok(cpu_usage) = Self::get_cpu_usage() {
301			let metric = PerformanceMetric {
302				metric_type:MetricType::CpuUsage,
303
304				value:cpu_usage,
305
306				timestamp:SystemTime::now()
307					.duration_since(SystemTime::UNIX_EPOCH)
308					.unwrap_or_default()
309					.as_millis() as u64,
310
311				channel:None,
312
313				tags:HashMap::new(),
314			};
315
316			self.record_metric(metric).await;
317		}
318	}
319
320	async fn update_statistics(&self) {
321		let metrics = self.metrics.read().await;
322
323		let mut stats = self.statistics.write().await;
324
325		let processing_metrics:Vec<&PerformanceMetric> = metrics
326			.iter()
327			.filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
328			.collect();
329
330		if !processing_metrics.is_empty() {
331			let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
332
333			stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
334
335			stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
336		}
337
338		let error_metrics:Vec<&PerformanceMetric> = metrics
339			.iter()
340			.filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
341			.collect();
342
343		if !error_metrics.is_empty() {
344			let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
345
346			stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
347		}
348
349		let throughput_metrics:Vec<&PerformanceMetric> = metrics
350			.iter()
351			.filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
352			.collect();
353
354		if !throughput_metrics.is_empty() {
355			let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
356
357			stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
358		}
359
360		let memory_metrics:Vec<&PerformanceMetric> = metrics
361			.iter()
362			.filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
363			.collect();
364
365		if !memory_metrics.is_empty() {
366			let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
367
368			stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
369		}
370
371		stats.last_update = SystemTime::now()
372			.duration_since(SystemTime::UNIX_EPOCH)
373			.unwrap_or_default()
374			.as_secs();
375	}
376
377	async fn check_alerts(&self, metric:&PerformanceMetric) {
378		let threshold = match metric.metric_type {
379			MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
380
381			MetricType::ErrorRate => 5.0,
382
383			MetricType::MemoryUsage => 1024.0,
384
385			MetricType::CpuUsage => 90.0,
386
387			_ => return,
388		};
389
390		if metric.value > threshold {
391			let severity = match metric.value / threshold {
392				ratio if ratio > 5.0 => AlertSeverity::Critical,
393
394				ratio if ratio > 3.0 => AlertSeverity::High,
395
396				ratio if ratio > 2.0 => AlertSeverity::Medium,
397
398				_ => AlertSeverity::Low,
399			};
400
401			let alert = PerformanceAlert {
402				alert_id:Self::generate_alert_id(),
403
404				metric_type:metric.metric_type.clone(),
405
406				threshold,
407
408				current_value:metric.value,
409
410				timestamp:metric.timestamp,
411
412				channel:metric.channel.clone(),
413
414				severity,
415
416				message:format!(
417					"{} exceeded threshold: {} > {}",
418					Self::metric_type_name(&metric.metric_type),
419					metric.value,
420					threshold
421				),
422			};
423
424			{
425				let mut alerts = self.alerts.write().await;
426
427				alerts.push_back(alert.clone());
428			}
429
430			{
431				let mut stats = self.statistics.write().await;
432
433				stats.total_alerts_triggered += 1;
434			}
435
436			dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
437		}
438	}
439
440	async fn check_performance_alerts(&self) {
441		dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
442	}
443
444	async fn cleanup_old_data(&self) {
445		let retention_threshold = SystemTime::now()
446			.duration_since(SystemTime::UNIX_EPOCH)
447			.unwrap_or_default()
448			.as_secs()
449			- (self.config.metrics_retention_hours * 3600);
450
451		{
452			let mut metrics = self.metrics.write().await;
453
454			metrics.retain(|m| m.timestamp >= retention_threshold);
455		}
456
457		{
458			let mut traces = self.traces.write().await;
459
460			traces.retain(|_, span| span.start_time >= retention_threshold);
461
462			if traces.len() > self.config.max_traces_stored {
463				let excess = traces.len() - self.config.max_traces_stored;
464
465				let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
466
467				for key in keys_to_remove {
468					traces.remove(&key);
469				}
470			}
471		}
472
473		{
474			let mut alerts = self.alerts.write().await;
475
476			alerts.retain(|a| a.timestamp >= retention_threshold);
477		}
478
479		dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
480	}
481
482	fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
483
484	fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
485
486	fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
487
488	fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
489
490	fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
491
492	fn metric_type_name(metric_type:&MetricType) -> &'static str {
493		match metric_type {
494			MetricType::MessageProcessingTime => "Message Processing Time",
495
496			MetricType::ConnectionLatency => "Connection Latency",
497
498			MetricType::MemoryUsage => "Memory Usage",
499
500			MetricType::CpuUsage => "CPU Usage",
501
502			MetricType::NetworkThroughput => "Network Throughput",
503
504			MetricType::ErrorRate => "Error Rate",
505
506			MetricType::QueueSize => "Queue Size",
507		}
508	}
509
510	pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
511
512	pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
513		let metrics = self.metrics.read().await;
514
515		metrics.iter().rev().take(limit).cloned().collect()
516	}
517
518	pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
519		let alerts = self.alerts.read().await;
520
521		alerts.iter().rev().cloned().collect()
522	}
523
524	pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
525		let traces = self.traces.read().await;
526
527		traces.values().find(|span| span.trace_id == trace_id).cloned()
528	}
529
530	pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
531
532	pub fn high_frequency_dashboard() -> Self {
533		Self::new(DashboardConfig {
534			update_interval_ms:1000,
535			metrics_retention_hours:1,
536			alert_threshold_ms:500,
537			trace_sampling_rate:1.0,
538			max_traces_stored:5000,
539		})
540	}
541
542	pub fn create_metric(
543		metric_type:MetricType,
544
545		value:f64,
546
547		channel:Option<String>,
548
549		tags:HashMap<String, String>,
550	) -> PerformanceMetric {
551		PerformanceMetric {
552			metric_type,
553
554			value,
555
556			timestamp:SystemTime::now()
557				.duration_since(SystemTime::UNIX_EPOCH)
558				.unwrap_or_default()
559				.as_millis() as u64,
560
561			channel,
562
563			tags,
564		}
565	}
566
567	pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
568		TraceLog {
569			timestamp:SystemTime::now()
570				.duration_since(SystemTime::UNIX_EPOCH)
571				.unwrap_or_default()
572				.as_millis() as u64,
573
574			message,
575
576			level,
577
578			fields,
579		}
580	}
581
582	pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
583		let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
584
585		let error_score = 100.0 * (1.0 - error_rate / 100.0);
586
587		let throughput_score = throughput / 1000.0;
588
589		(time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
590			.max(0.0)
591			.min(100.0)
592	}
593
594	pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
595		match metric_type {
596			MetricType::MessageProcessingTime => format!("{:.2}ms", value),
597
598			MetricType::ConnectionLatency => format!("{:.2}ms", value),
599
600			MetricType::MemoryUsage => format!("{:.2}MB", value),
601
602			MetricType::CpuUsage => format!("{:.2}%", value),
603
604			MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
605
606			MetricType::ErrorRate => format!("{:.2}%", value),
607
608			MetricType::QueueSize => format!("{:.0}", value),
609		}
610	}
611}
612
613impl Clone for Struct {
614	fn clone(&self) -> Self {
615		Self {
616			config:self.config.clone(),
617
618			metrics:self.metrics.clone(),
619
620			traces:self.traces.clone(),
621
622			alerts:self.alerts.clone(),
623
624			statistics:self.statistics.clone(),
625
626			is_running:Arc::new(AsyncMutex::new(false)),
627		}
628	}
629}