Skip to main content

Mountain/Environment/
FileWatcherProvider.rs

1//! # FileWatcherProvider (Environment)
2//!
3//! Backing implementation of
4//! [`FileWatcherProvider`](CommonLibrary::FileSystem::FileWatcherProvider)
5//! for [`MountainEnvironment`].
6//!
7//! Native filesystem notifications are delegated to the `notify` crate, which
8//! picks up inotify on Linux, FSEvents on macOS, and ReadDirectoryChangesW
9//! on Windows. Events from the watcher thread flow through an unbounded
10//! channel into a tokio task that forwards them back to Cocoon over the
11//! reverse-RPC channel as `$fileWatcher:event` notifications.
12//!
13//! # Concurrency notes
14//!
15//! - `notify::recommended_watcher` executes callbacks on its own native thread,
16//!   so we tunnel events through a bounded channel before touching async code.
17//!   The forwarder task is spawned once on first registration and lives for the
18//!   entire process lifetime.
19//! - macOS FSEvents may emit duplicate Create/Change events for the same path
20//!   in very short succession. We debounce by path within a 100 ms window
21//!   per-handle, keyed on `(handle, path, kind)`.
22//! - Linux inotify has a small per-user watcher cap
23//!   (`fs.inotify.max_user_watches`); hitting it surfaces as
24//!   `notify::Error::MaxFilesWatch`. We propagate that verbatim to the caller
25//!   so the UI can show a guidance message.
26
27use std::{
28	collections::HashMap,
29	path::PathBuf,
30	sync::{Arc, Mutex as StandardMutex},
31	time::{Duration, Instant},
32};
33
34use CommonLibrary::{
35	Environment::Requires::Requires,
36	Error::CommonError::CommonError,
37	FileSystem::FileWatcherProvider::{FileWatcherProvider, WatchEvent, WatchEventKind},
38	IPC::{IPCProvider::IPCProvider, SkyEvent::SkyEvent},
39};
40use async_trait::async_trait;
41use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
42use serde_json::json;
43use tokio::sync::mpsc as TokioMPSC;
44
45use super::MountainEnvironment::MountainEnvironment;
46use crate::dev_log;
47
48/// Interval below which a second (path, kind) event for the same handle is
49/// ignored. Tuned for FSEvents coalescing.
50const DebounceWindow:Duration = Duration::from_millis(100);
51
52/// Internal entry tracked per registered watcher. The `Watcher` handle must
53/// be kept alive for the lifetime of the registration; dropping it releases
54/// the OS resources.
55pub struct WatcherEntry {
56	#[allow(dead_code)]
57	Watcher:RecommendedWatcher,
58
59	LastSeen:HashMap<(PathBuf, &'static str), Instant>,
60}
61
62/// Composite key used to detect duplicate watcher registrations. Two
63/// extensions (or the same extension activated twice) frequently register
64/// the same `(root, recursive, pattern)` triple within milliseconds of
65/// each other - the typescript-language-features and git extensions are
66/// the worst offenders. Without dedup, each registration spawns its own
67/// notify::Watcher with its own kqueue/inotify subscription tree, doubling
68/// (or worse) FS-event traffic and burning kernel handles.
69type DedupKey = (PathBuf, bool, Option<String>);
70
71/// Lazily-initialised process-wide state for file watching. Instances of the
72/// event-forwarder task are singletons keyed on the MountainEnvironment
73/// handle. Access through `WatcherState::Get`.
74pub struct WatcherState {
75	pub Entries:Arc<StandardMutex<HashMap<String, WatcherEntry>>>,
76
77	pub EventSender:TokioMPSC::UnboundedSender<WatchEvent>,
78
79	/// Maps `(root, recursive, pattern)` to the primary handle that owns
80	/// the live OS watcher. Subsequent registrations matching the same
81	/// triple are aliased to the primary; only the primary creates a
82	/// notify::Watcher.
83	pub DedupIndex:Arc<StandardMutex<HashMap<DedupKey, String>>>,
84
85	/// Reverse index: primary handle → all aliased handles. When the
86	/// forwarder task gets an event for a primary, it fans the same
87	/// event out to every aliased handle so each extension's
88	/// `vscode.workspace.createFileSystemWatcher` callback fires once.
89	pub Aliases:Arc<StandardMutex<HashMap<String, Vec<String>>>>,
90
91	/// Reverse lookup for unregister: any handle (primary or alias) →
92	/// its primary. Lets `UnregisterWatcher` clean up alias entries
93	/// without scanning the entire `Aliases` map.
94	pub HandleToPrimary:Arc<StandardMutex<HashMap<String, String>>>,
95}
96
97impl WatcherState {
98	/// Obtain (or create) the global WatcherState. The forwarder task is
99	/// spawned on first access. Must be called from within a tokio runtime.
100	pub fn Get(env:&MountainEnvironment) -> Arc<WatcherState> {
101		use std::sync::OnceLock;
102
103		// One WatcherState per process - the backing notify watchers are
104		// cheap and multiplex fine, and we want a single forwarder task.
105		static GLOBAL:OnceLock<Arc<WatcherState>> = OnceLock::new();
106
107		GLOBAL
108			.get_or_init(|| {
109				let (tx, mut rx) = TokioMPSC::unbounded_channel::<WatchEvent>();
110				let state = Arc::new(WatcherState {
111					Entries:Arc::new(StandardMutex::new(HashMap::new())),
112					EventSender:tx,
113					DedupIndex:Arc::new(StandardMutex::new(HashMap::new())),
114					Aliases:Arc::new(StandardMutex::new(HashMap::new())),
115					HandleToPrimary:Arc::new(StandardMutex::new(HashMap::new())),
116				});
117
118				// The forwarder task holds a weak ref to the environment so
119				// it unwinds cleanly if the env is ever torn down. State is
120				// captured by Arc clone for the alias fan-out lookup.
121				let env_clone = env.clone();
122				let state_clone = state.clone();
123				tokio::spawn(async move {
124					use tauri::Emitter;
125					while let Some(WatchEvent { Handle, Kind, Path }) = rx.recv().await {
126						let ipc_provider:Arc<dyn IPCProvider> = env_clone.Require();
127						// Fan events to the primary handle plus every alias
128						// registered against it. Without this, the second
129						// extension to register a duplicate watcher would
130						// silently miss every event.
131						let mut Recipients:Vec<String> = vec![Handle.clone()];
132						if let Ok(AliasGuard) = state_clone.Aliases.lock() {
133							if let Some(AliasList) = AliasGuard.get(&Handle) {
134								Recipients.extend(AliasList.iter().cloned());
135							}
136						}
137						for RecipientHandle in Recipients {
138							let payload = json!({
139								"handle": RecipientHandle,
140								"kind": Kind.AsString(),
141								"path": Path.to_string_lossy().to_string(),
142							});
143							if let Err(error) = ipc_provider
144								.SendNotificationToSideCar(
145									"cocoon-main".to_string(),
146									"$fileWatcher:event".to_string(),
147									payload.clone(),
148								)
149								.await
150							{
151								dev_log!(
152									"filewatcher",
153									"warn: [FileWatcherProvider] Failed to forward event handle={} kind={} path={:?}: \
154									 {:?}",
155									RecipientHandle,
156									Kind.AsString(),
157									Path,
158									error
159								);
160							}
161							// Dual-emit to Wind/Sky so the Explorer tree,
162							// search index, and any other webview-side
163							// consumer can react to disk mutations without
164							// going through Cocoon. Wind's `TauriChannel`
165							// subscribes to `sky://vfs/fileChange` under
166							// the localFilesystem channel. Aliased handles
167							// each get their own emit so per-handle
168							// listeners on the Sky side fire correctly.
169							if let Err(Error) =
170								env_clone.ApplicationHandle.emit(SkyEvent::VFSFileChange.AsStr(), &payload)
171							{
172								dev_log!(
173									"filewatcher",
174									"warn: [FileWatcherProvider] sky://vfs/fileChange emit failed: {}",
175									Error
176								);
177							}
178						}
179					}
180				});
181
182				state
183			})
184			.clone()
185	}
186}
187
188fn MapEventKind(raw:&EventKind) -> Option<WatchEventKind> {
189	match raw {
190		EventKind::Create(_) => Some(WatchEventKind::Create),
191
192		EventKind::Modify(_) => Some(WatchEventKind::Change),
193
194		EventKind::Remove(_) => Some(WatchEventKind::Delete),
195
196		// Access / Any / Other events are not exposed to extensions.
197		_ => None,
198	}
199}
200
201/// Translate a VS Code glob pattern into a `regex::Regex` so the native
202/// watcher can apply the caller's filter before paying for an IPC hop. A
203/// small subset of the glob grammar is supported (`**`, `*`, `?`, `[…]`,
204/// `{…,…}` alternation) - exactly what TypeScript-language-features and
205/// the other ship-time extensions rely on.
206fn CompileGlobToRegex(Pattern:&str) -> Option<regex::Regex> {
207	let mut Regex = String::with_capacity(Pattern.len() * 2 + 4);
208
209	// Case-insensitive on macOS + Windows where the OS is typically
210	// case-insensitive; on case-sensitive Linux filesystems extensions commonly
211	// still use lowercase patterns, so the flag is safe across all three targets.
212	if cfg!(any(target_os = "macos", target_os = "windows")) {
213		Regex.push_str("(?i)");
214	}
215
216	Regex.push('^');
217
218	let mut Chars = Pattern.chars().peekable();
219
220	let mut InClass = false;
221
222	while let Some(C) = Chars.next() {
223		if InClass {
224			if C == ']' {
225				InClass = false;
226			}
227
228			Regex.push(C);
229
230			continue;
231		}
232
233		match C {
234			'*' => {
235				if Chars.peek() == Some(&'*') {
236					Chars.next();
237
238					if Chars.peek() == Some(&'/') {
239						Chars.next();
240
241						Regex.push_str("(?:.*/)?");
242					} else {
243						Regex.push_str(".*");
244					}
245				} else {
246					Regex.push_str("[^/]*");
247				}
248			},
249
250			'?' => Regex.push_str("[^/]"),
251
252			'[' => {
253				Regex.push('[');
254
255				InClass = true;
256			},
257
258			'{' => Regex.push_str("(?:"),
259
260			'}' => Regex.push(')'),
261
262			',' => Regex.push('|'),
263
264			'.' | '+' | '(' | ')' | '^' | '$' | '|' | '\\' => {
265				Regex.push('\\');
266
267				Regex.push(C);
268			},
269
270			_ => Regex.push(C),
271		}
272	}
273
274	Regex.push('$');
275
276	regex::Regex::new(&Regex).ok()
277}
278
279#[async_trait]
280impl FileWatcherProvider for MountainEnvironment {
281	async fn RegisterWatcher(
282		&self,
283
284		Handle:String,
285
286		Root:PathBuf,
287
288		IsRecursive:bool,
289
290		Pattern:Option<String>,
291	) -> Result<(), CommonError> {
292		let state = WatcherState::Get(self);
293
294		// De-dup pass 1: same handle re-registered (cheap idempotency).
295		{
296			let guard = state
297				.Entries
298				.lock()
299				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
300
301			if guard.contains_key(&Handle) {
302				dev_log!(
303					"filewatcher",
304					"[FileWatcherProvider] handle={} already registered; skipping duplicate",
305					Handle
306				);
307
308				return Ok(());
309			}
310		}
311
312		// De-dup pass 2: same (root, recursive, pattern) triple already has
313		// a primary watcher. The git extension, typescript-language-features,
314		// and several `composer.*` extensions all hit this path during boot
315		// (observed: `**/composer.json`, `**/composer.lock`, `**/*.md`,
316		// `**/package.json` registered twice each within ~50ms). Aliasing
317		// avoids the duplicate notify::Watcher / kqueue subscription tree
318		// while still fanning events to every aliased handle.
319		let DedupKeyValue:DedupKey = (Root.clone(), IsRecursive, Pattern.clone());
320
321		{
322			let DedupGuard = state
323				.DedupIndex
324				.lock()
325				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
326
327			if let Some(PrimaryHandle) = DedupGuard.get(&DedupKeyValue).cloned() {
328				drop(DedupGuard);
329
330				let mut AliasGuard = state
331					.Aliases
332					.lock()
333					.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
334
335				AliasGuard
336					.entry(PrimaryHandle.clone())
337					.or_insert_with(Vec::new)
338					.push(Handle.clone());
339
340				let mut H2PGuard = state
341					.HandleToPrimary
342					.lock()
343					.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
344
345				H2PGuard.insert(Handle.clone(), PrimaryHandle.clone());
346
347				dev_log!(
348					"filewatcher",
349					"[FileWatcherProvider] dedup hit; handle={} aliased to primary={} root={} pattern={:?}",
350					Handle,
351					PrimaryHandle,
352					Root.display(),
353					Pattern
354				);
355
356				return Ok(());
357			}
358		}
359
360		// First registration for this triple. The DedupIndex insert
361		// happens AFTER successful OS-watcher creation below so an
362		// errored or benign-absent registration doesn't leave a stale
363		// dedup entry pointing at a non-existent primary.
364
365		let CompiledPattern = Pattern.as_deref().and_then(CompileGlobToRegex);
366
367		let pattern_for_callback = CompiledPattern.clone();
368
369		// Prepare the per-event callback. It owns clones of the handle and
370		// the forwarder channel; debouncing state lives in the entry under
371		// the global mutex (fine - the callback is not hot).
372		let handle_for_callback = Handle.clone();
373
374		let sender = state.EventSender.clone();
375
376		let entries = state.Entries.clone();
377
378		let mut watcher = notify::recommended_watcher(move |event_result:notify::Result<notify::Event>| {
379			let Ok(event) = event_result else { return };
380			let Some(kind) = MapEventKind(&event.kind) else { return };
381			let kind_tag = kind.AsString();
382
383			// Pattern filter - reject early so the event never crosses IPC.
384			let matched_paths:Vec<PathBuf> = event
385				.paths
386				.into_iter()
387				.filter(|path| {
388					match &pattern_for_callback {
389						Some(re) => re.is_match(&path.to_string_lossy()),
390						None => true,
391					}
392				})
393				.collect();
394			if matched_paths.is_empty() {
395				return;
396			}
397
398			// Debounce per (handle, path, kind). Lock is uncontested for
399			// single-path events; bursts from FSEvents coalesce cleanly.
400			let mut final_paths:Vec<PathBuf> = Vec::with_capacity(matched_paths.len());
401			if let Ok(mut guard) = entries.lock() {
402				if let Some(entry) = guard.get_mut(&handle_for_callback) {
403					let now = Instant::now();
404					entry
405						.LastSeen
406						.retain(|_, instant| now.duration_since(*instant) < Duration::from_secs(10));
407					for path in matched_paths {
408						let key = (path.clone(), kind_tag);
409						let keep = match entry.LastSeen.get(&key) {
410							Some(previous) if now.duration_since(*previous) < DebounceWindow => false,
411							_ => {
412								entry.LastSeen.insert(key, now);
413								true
414							},
415						};
416						if keep {
417							final_paths.push(path);
418						}
419					}
420				} else {
421					return;
422				}
423			} else {
424				return;
425			}
426
427			for path in final_paths {
428				let _ = sender.send(WatchEvent { Handle:handle_for_callback.clone(), Kind:kind, Path:path });
429			}
430		})
431		.map_err(|error| CommonError::Unknown { Description:format!("FileWatcher create failed: {}", error) })?;
432
433		let mode = if IsRecursive { RecursiveMode::Recursive } else { RecursiveMode::NonRecursive };
434
435		// Watching a non-existent path is a common pattern: extensions
436		// register watchers on optional config dirs (`~/.roo/skills-*`,
437		// `.vscode/settings.json` in fresh workspaces, …) that may appear
438		// later. `notify` returns `Error::PathNotFound` / "No path was
439		// found"; failing the gRPC call counts against Cocoon's circuit
440		// breaker - 5 such probes at boot trip the breaker open and
441		// cascade into 60s of rejected reads. Record a "deferred" entry
442		// without a live OS watcher so Unregister still works; future
443		// events for that path won't fire, but the extension can re-
444		// register once the directory appears, just like in stock VS Code.
445		let WatchResult = watcher.watch(&Root, mode);
446
447		let mut guard = state
448			.Entries
449			.lock()
450			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
451
452		let _ = CompiledPattern;
453
454		match WatchResult {
455			Ok(()) => {
456				guard.insert(Handle.clone(), WatcherEntry { Watcher:watcher, LastSeen:HashMap::new() });
457
458				// Drop the Entries lock before grabbing DedupIndex to
459				// avoid lock-order divergence vs the alias path (which
460				// takes DedupIndex first). Re-acquire is cheap.
461				drop(guard);
462
463				if let Ok(mut DedupGuard) = state.DedupIndex.lock() {
464					DedupGuard.entry(DedupKeyValue.clone()).or_insert_with(|| Handle.clone());
465				}
466
467				dev_log!(
468					"filewatcher",
469					"[FileWatcherProvider] Registered watcher handle={} root={} recursive={} pattern={:?}",
470					Handle,
471					Root.display(),
472					IsRecursive,
473					Pattern
474				);
475
476				return Ok(());
477			},
478
479			Err(error) => {
480				let ErrorString = error.to_string().to_lowercase();
481
482				let IsBenignAbsent = ErrorString.contains("no path was found")
483					|| ErrorString.contains("no such file or directory")
484					|| ErrorString.contains("entity not found")
485					|| ErrorString.contains("path not found")
486					|| ErrorString.contains("os error 2")
487					|| !Root.exists();
488
489				if IsBenignAbsent {
490					dev_log!(
491						"filewatcher",
492						"[FileWatcherProvider] watch path absent (deferred) handle={} root={} err={}",
493						Handle,
494						Root.display(),
495						error
496					);
497
498					// Drop watcher (no live subscription); record handle so
499					// Unregister still finds something to remove. We do NOT
500					// reuse the closure's notify::Watcher here.
501					drop(watcher);
502				} else {
503					return Err(CommonError::Unknown {
504						Description:format!("FileWatcher watch failed for {}: {}", Root.display(), error),
505					});
506				}
507			},
508		}
509
510		dev_log!(
511			"filewatcher",
512			"[FileWatcherProvider] Registered watcher handle={} root={} recursive={} pattern={:?}",
513			Handle,
514			Root.display(),
515			IsRecursive,
516			Pattern
517		);
518
519		Ok(())
520	}
521
522	async fn UnregisterWatcher(&self, Handle:String) -> Result<(), CommonError> {
523		let state = WatcherState::Get(self);
524
525		// Step 1: alias removal. If the handle was aliased to a primary,
526		// just remove it from the alias list and the lookup map. The OS
527		// watcher stays alive because the primary still owns it.
528		let MaybePrimary = {
529			let mut H2PGuard = state
530				.HandleToPrimary
531				.lock()
532				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
533
534			H2PGuard.remove(&Handle)
535		};
536
537		if let Some(PrimaryHandle) = MaybePrimary {
538			let mut AliasGuard = state
539				.Aliases
540				.lock()
541				.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
542
543			if let Some(AliasList) = AliasGuard.get_mut(&PrimaryHandle) {
544				AliasList.retain(|EntryHandle| EntryHandle != &Handle);
545
546				if AliasList.is_empty() {
547					AliasGuard.remove(&PrimaryHandle);
548				}
549			}
550
551			dev_log!(
552				"filewatcher",
553				"[FileWatcherProvider] Unregistered alias handle={} primary={}",
554				Handle,
555				PrimaryHandle
556			);
557
558			return Ok(());
559		}
560
561		// Step 2: primary removal. Drop the OS watcher and clear the
562		// dedup index entry. Any still-aliased handles are left dangling -
563		// callers requesting a primary unregister while aliases still
564		// exist is unusual but not fatal; the alias entries simply
565		// stop receiving events.
566		let mut Guard = state
567			.Entries
568			.lock()
569			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
570
571		if Guard.remove(&Handle).is_some() {
572			dev_log!("filewatcher", "[FileWatcherProvider] Unregistered watcher handle={}", Handle);
573		}
574
575		drop(Guard);
576
577		// Clear the dedup-index entry pointing at this primary so a
578		// future registration for the same triple opens a fresh OS
579		// watcher rather than aliasing to a removed handle.
580		let mut DedupGuard = state
581			.DedupIndex
582			.lock()
583			.map_err(|error| CommonError::StateLockPoisoned { Context:error.to_string() })?;
584
585		DedupGuard.retain(|_, PrimaryHandle| PrimaryHandle != &Handle);
586
587		Ok(())
588	}
589}