Skip to main content

Mountain/IPC/DevLog/
EmitOTLPSpan.rs

1#![allow(non_snake_case)]
2
3//! Fire-and-forget OTLP span exporter. Sends a single
4//! `resourceSpans` payload over plain HTTP to the collector at
5//! `OTLPEndpoint` (default `127.0.0.1:4318`, configurable via
6//! `.env.Land.PostHog`). Stops trying after the first failure
7//! (`OTLP_AVAILABLE` flips to `false`) so a missing collector
8//! doesn't tax every IPC call. Release builds are compiled out
9//! via `cfg!(debug_assertions)`. Honors the `Capture` master
10//! telemetry kill switch and the per-pipe `OTLPEnabled` toggle.
11
12use std::{
13	collections::hash_map::DefaultHasher,
14	hash::{Hash, Hasher},
15	sync::{
16		OnceLock,
17		atomic::{AtomicBool, Ordering},
18	},
19};
20
21use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
22
23static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
24
25static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
26
27fn GetTraceId() -> &'static str {
28	OTLP_TRACE_ID.get_or_init(|| {
29		let mut H = DefaultHasher::new();
30		std::process::id().hash(&mut H);
31		NowNano::Fn().hash(&mut H);
32		format!("{:032x}", H.finish() as u128)
33	})
34}
35
36fn RandU64() -> u64 {
37	let mut H = DefaultHasher::new();
38
39	std::thread::current().id().hash(&mut H);
40
41	NowNano::Fn().hash(&mut H);
42
43	H.finish()
44}
45
46pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
47	if !cfg!(debug_assertions) {
48		return;
49	}
50
51	if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
52		return;
53	}
54
55	if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
56		return;
57	}
58
59	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
60		return;
61	}
62
63	let SpanId = format!("{:016x}", RandU64());
64
65	let TraceId = GetTraceId().to_string();
66
67	let SpanName = Name.to_string();
68
69	let AttributesJson:Vec<String> = Attributes
70		.iter()
71		.map(|(K, V)| {
72			format!(
73				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
74				K,
75				V.replace('\\', "\\\\").replace('"', "\\\"")
76			)
77		})
78		.collect();
79
80	let IsError = SpanName.contains("error");
81
82	let StatusCode = if IsError { 2 } else { 1 };
83
84	let Payload = format!(
85		concat!(
86			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
87			r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
88			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
89			r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
90			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
91			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
92			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
93		),
94		TraceId,
95		SpanId,
96		SpanName,
97		StartNano,
98		EndNano,
99		AttributesJson.join(","),
100		StatusCode,
101	);
102
103	// Resolve `OTLPEndpoint` (e.g. `http://127.0.0.1:4318`) → host:port + path.
104	// Strip scheme, split on `/` for the path component if any, default to
105	// `/v1/traces`.
106	let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
107
108	std::thread::spawn(move || {
109		use std::{
110			io::{Read as IoRead, Write as IoWrite},
111			net::TcpStream,
112			time::Duration,
113		};
114
115		let Ok(SocketAddress) = HostAddress.parse() else {
116			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
117			return;
118		};
119		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
120			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
121			return;
122		};
123		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
124		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
125
126		let HttpReq = format!(
127			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
128			 close\r\n\r\n",
129			PathSegment,
130			HostAddress,
131			Payload.len()
132		);
133		if Stream.write_all(HttpReq.as_bytes()).is_err() {
134			return;
135		}
136		if Stream.write_all(Payload.as_bytes()).is_err() {
137			return;
138		}
139		let mut Buf = [0u8; 32];
140		let _ = Stream.read(&mut Buf);
141		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
142			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
143		}
144	});
145}
146
147/// Split `http://host:port/path` into `(host:port, /path)`. Defaults the
148/// path to `/v1/traces` when the endpoint has none. Returns owned `String`s
149/// so the spawned thread does not borrow the build-time `&'static str`.
150fn ParseEndpoint(Endpoint:&str) -> (String, String) {
151	let WithoutScheme = Endpoint
152		.strip_prefix("http://")
153		.or_else(|| Endpoint.strip_prefix("https://"))
154		.unwrap_or(Endpoint);
155
156	let (HostPort, Path) = match WithoutScheme.split_once('/') {
157		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
158
159		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
160	};
161
162	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
163
164	(HostPort, PathFinal)
165}