1use std::{
277 collections::HashMap,
278 sync::{Arc, Mutex},
279 time::{Duration, SystemTime},
280};
281
282use serde::{Deserialize, Serialize};
283use tokio::time::interval;
284use tauri::{Emitter, Manager};
285
286use crate::{
287 IPC::AdvancedFeatures::PerformanceStats::Struct as PerformanceStats,
288 RunTime::ApplicationRunTime::ApplicationRunTime,
289 dev_log,
290};
291
292#[derive(Clone, Serialize, Deserialize, Debug)]
299pub struct SyncStatus {
300 pub total_documents:u32,
301
302 pub synced_documents:u32,
303
304 pub conflicted_documents:u32,
305
306 pub offline_documents:u32,
307
308 pub last_sync_duration_ms:u64,
309}
310
311#[derive(Clone, Copy, PartialEq, Debug)]
313pub enum SyncState {
314 Modified,
315
316 Synced,
317
318 Conflicted,
319
320 Offline,
321}
322
323#[derive(Clone, Copy, Debug)]
325pub enum ChangeType {
326 Update,
327
328 Insert,
329
330 Delete,
331
332 Move,
333
334 Other,
335}
336
337#[derive(Clone, Debug)]
339pub struct SynchronizedDocument {
340 pub document_id:String,
341
342 pub file_path:String,
343
344 pub last_modified:u64,
345
346 pub content_hash:String,
347
348 pub sync_state:SyncState,
349
350 pub version:u32,
351}
352
353#[derive(Clone, Debug)]
355pub struct DocumentChange {
356 pub change_id:String,
357
358 pub document_id:String,
359
360 pub change_type:ChangeType,
361
362 pub content:Option<String>,
363
364 pub applied:bool,
365}
366
367pub struct DocumentSynchronization {
369 pub synchronized_documents:HashMap<String, SynchronizedDocument>,
370
371 pub pending_changes:HashMap<String, Vec<DocumentChange>>,
372
373 pub last_sync_time:u64,
374
375 pub sync_status:SyncStatus,
376}
377
378#[derive(Clone, Serialize, Deserialize, Debug)]
380pub struct RealTimeUpdate {
381 pub target:String,
382
383 pub data:String,
384}
385
386pub struct RealTimeUpdateManager {
388 pub Updates:Vec<RealTimeUpdate>,
389
390 pub Subscribers:HashMap<String, Vec<String>>,
391
392 pub UpdateQueue:Vec<RealTimeUpdate>,
393
394 pub LastBroadcast:u64,
395}
396
397#[derive(Clone, Debug)]
399pub struct ViewState {
400 pub zoom_level:f32,
401
402 pub sidebar_visible:bool,
403
404 pub panel_visible:bool,
405
406 pub status_bar_visible:bool,
407}
408
409#[derive(Clone, Debug)]
411pub struct GridLayout {
412 pub rows:u32,
413
414 pub columns:u32,
415
416 pub cell_width:u32,
417
418 pub cell_height:u32,
419}
420
421#[derive(Clone, Debug)]
423pub struct LayoutState {
424 pub editor_groups:Vec<String>,
425
426 pub active_group:u32,
427
428 pub grid_layout:GridLayout,
429}
430
431#[derive(Clone, Debug)]
433pub struct UIStateSynchronization {
434 pub active_editor:Option<String>,
435
436 pub cursor_positions:HashMap<String, (u32, u32)>,
437
438 pub selection_ranges:HashMap<String, (u32, u32)>,
439
440 pub view_state:ViewState,
441
442 pub theme:String,
443
444 pub layout:LayoutState,
445}
446
447#[derive(Clone)]
449pub struct WindAdvancedSync {
450 runtime:Arc<ApplicationRunTime>,
451
452 document_sync:Arc<Mutex<DocumentSynchronization>>,
453
454 ui_state_sync:Arc<Mutex<UIStateSynchronization>>,
455
456 real_time_updates:Arc<Mutex<RealTimeUpdateManager>>,
457
458 performance_stats:Arc<Mutex<PerformanceStats>>,
459 }
461
462impl WindAdvancedSync {
463 pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
465 Self {
466 runtime:runtime.clone(),
467
468 document_sync:Arc::new(Mutex::new(DocumentSynchronization {
469 synchronized_documents:HashMap::new(),
470 pending_changes:HashMap::new(),
471 last_sync_time:0,
472 sync_status:SyncStatus {
473 total_documents:0,
474 synced_documents:0,
475 conflicted_documents:0,
476 offline_documents:0,
477 last_sync_duration_ms:0,
478 },
479 })),
480
481 ui_state_sync:Arc::new(Mutex::new(UIStateSynchronization {
482 active_editor:None,
483 cursor_positions:HashMap::new(),
484 selection_ranges:HashMap::new(),
485 view_state:ViewState {
486 zoom_level:1.0,
487 sidebar_visible:true,
488 panel_visible:true,
489 status_bar_visible:true,
490 },
491 theme:"default".to_string(),
492 layout:LayoutState {
493 editor_groups:Vec::new(),
494 active_group:0,
495 grid_layout:GridLayout { rows:1, columns:1, cell_width:100, cell_height:100 },
496 },
497 })),
498
499 real_time_updates:Arc::new(Mutex::new(RealTimeUpdateManager {
500 Updates:Vec::new(),
501 Subscribers:HashMap::new(),
502 UpdateQueue:Vec::new(),
503 LastBroadcast:0,
504 })),
505
506 performance_stats:Arc::new(Mutex::new(PerformanceStats {
507 total_messages_sent:0,
508 total_messages_received:0,
509 average_processing_time_ms:0.0,
510 peak_message_rate:0,
511 error_count:0,
512 last_update:0,
513 connection_uptime:0,
514 })),
515 }
517 }
518
519 pub async fn initialize(&self) -> Result<(), String> {
521 dev_log!("ipc", "Initializing Wind Advanced Sync service");
522
523 self.start_sync_task().await;
525
526 self.start_performance_monitoring().await;
528
529 dev_log!("ipc", "Wind Advanced Sync service initialized successfully");
530
531 Ok(())
532 }
533
534 async fn start_sync_task(&self) {
536 let document_sync = self.document_sync.clone();
537
538 let runtime = self.runtime.clone();
539
540 tokio::spawn(async move {
541 let mut interval = interval(Duration::from_secs(5));
542
543 loop {
544 interval.tick().await;
545
546 if let Ok(mut sync) = document_sync.lock() {
548 let modified_docs:Vec<String> = sync
549 .synchronized_documents
550 .iter()
551 .filter(|(_, document)| document.sync_state == SyncState::Modified)
552 .map(|(doc_id, _)| doc_id.clone())
553 .collect();
554
555 if !modified_docs.is_empty() {
556 dev_log!("ipc", "Synchronizing {} documents", modified_docs.len());
557
558 sync.last_sync_time =
560 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
561
562 sync.sync_status = Self::calculate_sync_status(&sync.synchronized_documents);
564
565 if std::env::var("LAND_SYNC_STATUS_EMIT").is_ok() {
572 let _ = runtime
573 .Environment
574 .ApplicationHandle
575 .emit("mountain_sync_status_update", sync.sync_status.clone());
576 }
577 }
578 }
579 }
580 });
581 }
582
583 async fn start_performance_monitoring(&self) {
585 let performance_stats = self.performance_stats.clone();
586
587 let runtime = self.runtime.clone();
588
589 tokio::spawn(async move {
590 let mut interval = interval(Duration::from_secs(10));
591
592 loop {
593 interval.tick().await;
594
595 if let Ok(mut stats) = performance_stats.lock() {
596 stats.last_update =
597 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
598 stats.connection_uptime += 10;
599
600 if std::env::var("LAND_PERF_EMIT").is_ok() {
605 let _ = runtime
606 .Environment
607 .ApplicationHandle
608 .emit("mountain_performance_update", stats.clone());
609 }
610 }
611 }
612 });
613 }
614
615 fn calculate_sync_status(documents:&HashMap<String, SynchronizedDocument>) -> SyncStatus {
617 let total = documents.len() as u32;
618
619 let synced = documents.values().filter(|d| d.sync_state == SyncState::Synced).count() as u32;
620
621 let conflicted = documents.values().filter(|d| d.sync_state == SyncState::Conflicted).count() as u32;
622
623 let offline = documents.values().filter(|d| d.sync_state == SyncState::Offline).count() as u32;
624
625 SyncStatus {
626 total_documents:total,
627
628 synced_documents:synced,
629
630 conflicted_documents:conflicted,
631
632 offline_documents:offline,
633
634 last_sync_duration_ms:0,
635 }
636 }
637
638 pub fn register_commands(_app:&mut tauri::App) -> Result<(), Box<dyn std::error::Error>> {
640 dev_log!("ipc", "Registering Wind Advanced Sync IPC commands");
641
642 Ok(())
643 }
644}
645
646impl WindAdvancedSync {
647 pub async fn start_synchronization(self: Arc<Self>) -> Result<(), String> {
649 dev_log!("lifecycle", "Starting advanced synchronization");
650
651 let sync1 = self.clone();
653
654 tokio::spawn(async move {
655 sync1.synchronize_documents().await;
656 });
657
658 let sync2 = self.clone();
660
661 tokio::spawn(async move {
662 sync2.synchronize_ui_state().await;
663 });
664
665 let sync3 = self.clone();
667
668 tokio::spawn(async move {
669 sync3.broadcast_real_time_updates().await;
670 });
671
672 Ok(())
673 }
674
675 async fn synchronize_documents(&self) {
677 let mut interval = interval(Duration::from_secs(5));
678
679 let mut consecutive_failures = 0;
680
681 let max_consecutive_failures = 3;
682
683 loop {
684 interval.tick().await;
685
686 dev_log!("lifecycle", "Synchronizing documents");
687
688 let sync_start = std::time::Instant::now();
690
691 let mut success_count = 0;
692
693 let mut error_count = 0;
694
695 let changes = self.get_pending_changes().await;
697
698 for change in changes {
700 match self.apply_document_change(change).await {
701 Ok(_) => success_count += 1,
702
703 Err(e) => {
704 error_count += 1;
705
706 dev_log!("ipc", "error: [WindAdvancedSync] Failed to apply document change: {}", e);
707
708 consecutive_failures += 1;
710
711 if consecutive_failures >= max_consecutive_failures {
712 dev_log!("lifecycle", "Too many consecutive failures, slowing sync interval");
713
714 interval = tokio::time::interval(Duration::from_secs(30));
717 }
718 },
719 }
720 }
721
722 if success_count > 0 {
724 consecutive_failures = 0;
725
726 interval = tokio::time::interval(Duration::from_secs(5));
728 }
729
730 self.update_sync_status().await;
732
733 let sync_duration = sync_start.elapsed();
735
736 dev_log!(
737 "ipc",
738 "[WindAdvancedSync] Document sync completed: {} success, {} errors, {:.2}ms",
739 success_count,
740 error_count,
741 sync_duration.as_millis()
742 );
743 }
744 }
745
746 async fn synchronize_ui_state(&self) {
748 let mut interval = interval(Duration::from_secs(1));
749
750 loop {
751 interval.tick().await;
752
753 dev_log!("ipc", "[WindAdvancedSync] Synchronizing UI state");
754
755 let ui_state = self.get_ui_state().await;
757
758 if let Err(e) = self.update_ui_state(ui_state).await {
760 dev_log!("ipc", "error: [WindAdvancedSync] Failed to update UI state: {}", e);
761 }
762 }
763 }
764
765 async fn broadcast_real_time_updates(&self) {
767 let mut interval = interval(Duration::from_millis(100));
768
769 loop {
770 interval.tick().await;
771
772 {
778 let rt = self.real_time_updates.lock().unwrap();
779
780 if rt.Subscribers.is_empty() {
781 continue;
782 }
783 }
784
785 let updates = self.get_pending_updates().await;
786
787 if !updates.is_empty() {
788 if let Err(e) = self.broadcast_updates(updates).await {
790 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast updates: {}", e);
791 }
792 }
793 }
794 }
795
796 async fn get_pending_changes(&self) -> Vec<DocumentChange> {
798 let sync = self.document_sync.lock().unwrap();
799
800 sync.pending_changes.values().flatten().cloned().collect()
801 }
802
803 async fn apply_document_change(&self, change:DocumentChange) -> Result<(), String> {
805 dev_log!("lifecycle", "Applying document change: {}", change.change_id);
806
807 let change_start = std::time::Instant::now();
809
810 if let Err(conflict) = self.check_for_conflicts(&change).await {
812 dev_log!("lifecycle", "Conflict detected: {}", conflict);
813
814 return Err(format!("Conflict detected: {}", conflict));
815 }
816
817 match change.change_type {
819 ChangeType::Update => {
820 if let Some(_content) = &change.content {
822
823 }
832 },
833
834 ChangeType::Insert => {
835 if let Some(_content) = &change.content {
837
838 }
847 },
848
849 ChangeType::Delete => {
850
851 },
860
861 _ => {
862 dev_log!("lifecycle", "Unsupported change type: {:?}", change.change_type);
863 },
864 }
865
866 let mut sync = self.document_sync.lock().unwrap();
868
869 if let Some(changes) = sync.pending_changes.get_mut(&change.document_id) {
870 if let Some(change_idx) = changes.iter().position(|c| c.change_id == change.change_id) {
871 changes[change_idx].applied = true;
872 }
873 }
874
875 let change_duration = change_start.elapsed();
877
878 dev_log!(
879 "ipc",
880 "[WindAdvancedSync] Change applied successfully in {:.2}ms: {}",
881 change_duration.as_millis(),
882 change.change_id
883 );
884
885 Ok(())
886 }
887
888 async fn check_for_conflicts(&self, change:&DocumentChange) -> Result<(), String> {
890 let sync = self.document_sync.lock().unwrap();
891
892 if let Some(document) = sync.synchronized_documents.get(&change.document_id) {
894 let current_time = SystemTime::now()
895 .duration_since(SystemTime::UNIX_EPOCH)
896 .unwrap_or_default()
897 .as_secs();
898
899 if current_time - document.last_modified < 10 {
902 return Err(format!(
903 "Document {} was modified recently ({}s ago)",
904 document.document_id,
905 current_time - document.last_modified
906 ));
907 }
908
909 if matches!(document.sync_state, SyncState::Conflicted) {
911 return Err(format!("Document {} is in conflicted state", document.document_id));
912 }
913 }
914
915 Ok(())
916 }
917
918 async fn update_sync_status(&self) {
920 let mut sync = self.document_sync.lock().unwrap();
921
922 sync.sync_status.total_documents = sync.synchronized_documents.len() as u32;
923
924 sync.sync_status.synced_documents = sync
925 .synchronized_documents
926 .values()
927 .filter(|doc| matches!(doc.sync_state, SyncState::Synced))
928 .count() as u32;
929
930 sync.sync_status.conflicted_documents = sync
931 .synchronized_documents
932 .values()
933 .filter(|doc| matches!(doc.sync_state, SyncState::Conflicted))
934 .count() as u32;
935
936 sync.sync_status.offline_documents = sync
937 .synchronized_documents
938 .values()
939 .filter(|doc| matches!(doc.sync_state, SyncState::Offline))
940 .count() as u32;
941
942 sync.last_sync_time = SystemTime::now()
943 .duration_since(SystemTime::UNIX_EPOCH)
944 .unwrap_or_default()
945 .as_secs();
946 }
947
948 async fn get_ui_state(&self) -> UIStateSynchronization {
950 let sync = self.ui_state_sync.lock().unwrap();
951
952 sync.clone()
953 }
954
955 async fn update_ui_state(&self, ui_state:UIStateSynchronization) -> Result<(), String> {
957 let mut sync = self.ui_state_sync.lock().unwrap();
958
959 *sync = ui_state;
960
961 Ok(())
967 }
968
969 async fn get_pending_updates(&self) -> Vec<RealTimeUpdate> {
971 let mut updates = self.real_time_updates.lock().unwrap();
972
973 let pending = updates.UpdateQueue.clone();
974
975 updates.UpdateQueue.clear();
976
977 pending
978 }
979
980 async fn broadcast_updates(&self, updates:Vec<RealTimeUpdate>) -> Result<(), String> {
982 for update in updates {
983 let subscribers = {
985 let rt = self.real_time_updates.lock().unwrap();
986
987 rt.Subscribers.get(&update.target).cloned()
988 };
989
990 if let Some(subscriber_list) = subscribers {
992 for subscriber in subscriber_list {
993 if let Err(e) = self
994 .runtime
995 .Environment
996 .ApplicationHandle
997 .emit(&format!("real-time-update-{}", subscriber), &update)
998 {
999 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast to {}: {}", subscriber, e);
1000 }
1001 }
1002 }
1003 }
1004
1005 Ok(())
1006 }
1007
1008 pub async fn add_document(&self, document_id:String, file_path:String) -> Result<(), String> {
1010 let mut sync = self.document_sync.lock().unwrap();
1011
1012 let document = SynchronizedDocument {
1013 document_id:document_id.clone(),
1014
1015 file_path,
1016
1017 last_modified:SystemTime::now()
1018 .duration_since(SystemTime::UNIX_EPOCH)
1019 .unwrap_or_default()
1020 .as_secs(),
1021
1022 content_hash:"".to_string(),
1023
1024 sync_state:SyncState::Synced,
1025
1026 version:1,
1027 };
1028
1029 sync.synchronized_documents.insert(document_id, document);
1030
1031 dev_log!("lifecycle", "Document added for synchronization");
1032
1033 Ok(())
1034 }
1035
1036 pub async fn subscribe_to_updates(&self, target:String, subscriber:String) -> Result<(), String> {
1038 let mut updates = self.real_time_updates.lock().unwrap();
1039
1040 let target_clone = target.clone();
1041
1042 updates
1043 .Subscribers
1044 .entry(target_clone.clone())
1045 .or_insert_with(Vec::new)
1046 .push(subscriber);
1047
1048 dev_log!("lifecycle", "Subscriber added for target: {}", target_clone);
1049
1050 Ok(())
1051 }
1052
1053 pub async fn queue_update(&self, update:RealTimeUpdate) -> Result<(), String> {
1055 let mut updates = self.real_time_updates.lock().unwrap();
1056
1057 updates.UpdateQueue.push(update);
1058
1059 updates.LastBroadcast = SystemTime::now()
1060 .duration_since(SystemTime::UNIX_EPOCH)
1061 .unwrap_or_default()
1062 .as_secs();
1063
1064 dev_log!("ipc", "[WindAdvancedSync] Update queued");
1065
1066 Ok(())
1067 }
1068
1069 pub async fn get_sync_status(&self) -> SyncStatus {
1071 let sync = self.document_sync.lock().unwrap();
1072
1073 sync.sync_status.clone()
1074 }
1075
1076 pub async fn get_current_ui_state(&self) -> UIStateSynchronization { self.get_ui_state().await }
1078
1079 #[allow(dead_code)]
1081 fn clone_sync(&self) -> WindAdvancedSync {
1082 WindAdvancedSync {
1083 runtime:self.runtime.clone(),
1084
1085 document_sync:self.document_sync.clone(),
1086
1087 ui_state_sync:self.ui_state_sync.clone(),
1088
1089 real_time_updates:self.real_time_updates.clone(),
1090
1091 performance_stats:self.performance_stats.clone(),
1092 }
1094 }
1095}
1096
1097#[tauri::command]
1099pub async fn mountain_add_document_for_sync(
1100 app_handle:tauri::AppHandle,
1101
1102 document_id:String,
1103
1104 file_path:String,
1105) -> Result<(), String> {
1106 dev_log!("lifecycle", "Tauri command: add_document_for_sync");
1107
1108 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1109 sync.add_document(document_id, file_path).await
1110 } else {
1111 Err("WindAdvancedSync not found in application state".to_string())
1112 }
1113}
1114
1115#[tauri::command]
1117pub async fn mountain_get_sync_status(app_handle:tauri::AppHandle) -> Result<SyncStatus, String> {
1118 dev_log!("lifecycle", "Tauri command: get_sync_status");
1119
1120 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1121 Ok(sync.get_sync_status().await)
1122 } else {
1123 Err("WindAdvancedSync not found in application state".to_string())
1124 }
1125}
1126
1127#[tauri::command]
1129pub async fn mountain_subscribe_to_updates(
1130 app_handle:tauri::AppHandle,
1131
1132 target:String,
1133
1134 subscriber:String,
1135) -> Result<(), String> {
1136 dev_log!("lifecycle", "Tauri command: subscribe_to_updates");
1137
1138 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
1139 sync.subscribe_to_updates(target, subscriber).await
1140 } else {
1141 Err("WindAdvancedSync not found in application state".to_string())
1142 }
1143}
1144
1145pub fn initialize_wind_advanced_sync(
1147 app_handle:&tauri::AppHandle,
1148
1149 runtime:Arc<ApplicationRunTime>,
1150) -> Result<(), String> {
1151 dev_log!("lifecycle", "Initializing Wind advanced synchronization");
1152
1153 let sync = Arc::new(WindAdvancedSync::new(runtime));
1154
1155 app_handle.manage(sync.clone());
1157
1158 let sync_clone = sync.clone();
1160
1161 tokio::spawn(async move {
1162 if let Err(e) = sync_clone.start_synchronization().await {
1163 dev_log!("ipc", "error: [WindAdvancedSync] Failed to start synchronization: {}", e);
1164 }
1165 });
1166
1167 Ok(())
1168}