Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/
Multiplexer.rs

1//! Bidirectional streaming multiplexer for the Vine gRPC bus.
2//!
3//! Owns one bidirectional h2 stream per sidecar. Inbound notifications
4//! fan out to the process-wide broadcast
5//! (`Vine::Client::SubscribeNotifications`); inbound responses route to the
6//! matching pending-request `oneshot` sender. Inbound reverse-RPC requests and
7//! cancellations are TODO for a follow-up phase.
8//!
9//! This is the P14.1 foundation of Patch 14 - it lands the open(),
10//! Notify(), Request(), and ReadPump skeleton so subsequent phases can
11//! wire `SendNotification` / `SendRequest` to consult the multiplexer
12//! when `LAND_VINE_STREAMING=1` is set.
13
14use std::{
15	collections::HashMap,
16	sync::{
17		Arc,
18		atomic::{AtomicBool, AtomicU64, Ordering},
19	},
20	time::Duration,
21};
22
23use dashmap::DashMap;
24use lazy_static::lazy_static;
25use parking_lot::Mutex;
26use serde_json::Value;
27use tokio::sync::{mpsc, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30
31use super::{
32	Error::VineError,
33	Generated::{
34		CancelOperationRequest,
35		Envelope,
36		GenericNotification,
37		GenericRequest,
38		GenericResponse,
39		RpcError,
40		cocoon_service_client::CocoonServiceClient,
41		envelope::Payload,
42	},
43};
44use crate::dev_log;
45
46/// Outbound queue capacity per multiplexer. Bounded so a stalled
47/// sidecar applies backpressure to the producer side instead of
48/// burning unbounded heap.
49const SINK_CAPACITY:usize = 1024;
50
51/// One multiplexer per sidecar connection. Holds the outbound sink,
52/// the pending-request correlation map, and a shared-state shutdown
53/// flag.
54pub struct Multiplexer {
55	SideCarIdentifier:String,
56
57	Sink:mpsc::Sender<Envelope>,
58
59	Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
60
61	NextRequestIdentifier:AtomicU64,
62
63	Closed:AtomicBool,
64}
65
66lazy_static! {
67
68	/// Process-wide registry, one entry per sidecar identifier.
69	/// Lookup site for `SendNotification` / `SendRequest` to consult
70	/// when `LAND_VINE_STREAMING=1`.
71	static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
72}
73
74impl Multiplexer {
75	/// Open a bidirectional streaming channel against an existing
76	/// `CocoonServiceClient`. Spawns the read pump as a detached
77	/// tokio task and registers the multiplexer in the global
78	/// registry. Returns once the stream is established.
79	pub async fn Open(
80		SideCarIdentifier:String,
81
82		mut Client:CocoonServiceClient<tonic::transport::Channel>,
83	) -> Result<Arc<Self>, VineError> {
84		let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
85
86		let OutboundStream = ReceiverStream::new(OutboundReceiver);
87
88		let Response = Client
89			.open_channel_from_mountain(OutboundStream)
90			.await
91			.map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
92
93		let InboundStream:Streaming<Envelope> = Response.into_inner();
94
95		let SelfReference = Arc::new(Self {
96			SideCarIdentifier:SideCarIdentifier.clone(),
97			Sink,
98			Pending:Arc::new(DashMap::new()),
99			NextRequestIdentifier:AtomicU64::new(1),
100			Closed:AtomicBool::new(false),
101		});
102
103		// Spawn the read pump.
104		let SelfForReadPump = SelfReference.clone();
105
106		tokio::spawn(async move {
107			ReadPump(InboundStream, SelfForReadPump).await;
108		});
109
110		// Register globally so consumers can look us up.
111		MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
112
113		Ok(SelfReference)
114	}
115
116	/// Look up the multiplexer for a sidecar. Returns `None` if no
117	/// streaming connection has been opened for that sidecar (the
118	/// caller should fall back to the unary path).
119	pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
120
121	/// Drop the registry entry. Called by the read-pump when the
122	/// stream closes.
123	pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
124
125	/// Send a notification frame (fire-and-forget). Non-blocking
126	/// modulo Sink backpressure (capacity `SINK_CAPACITY`).
127	pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
128		if self.Closed.load(Ordering::Relaxed) {
129			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
130		}
131
132		let Bytes = serde_json::to_vec(&Parameters)?;
133
134		let Frame = Envelope {
135			payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
136		};
137
138		self.Sink
139			.send(Frame)
140			.await
141			.map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
142	}
143
144	/// Send a request and await the matching response. Cancels the
145	/// pending entry on timeout. The future is `Send + 'static`-clean
146	/// so callers can drive it inside `tokio::select!` for finer-
147	/// grained cancellation.
148	pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
149		if self.Closed.load(Ordering::Relaxed) {
150			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
151		}
152
153		let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
154
155		let (Tx, Rx) = oneshot::channel();
156
157		self.Pending.insert(Identifier, Tx);
158
159		let Bytes = serde_json::to_vec(&Parameters)?;
160
161		let MethodForError = Method.clone();
162
163		let Frame = Envelope {
164			payload:Some(Payload::Request(GenericRequest {
165				request_identifier:Identifier,
166				method:Method,
167				parameter:Bytes,
168			})),
169		};
170
171		if self.Sink.send(Frame).await.is_err() {
172			self.Pending.remove(&Identifier);
173
174			return Err(VineError::RPCError(format!(
175				"Sink closed for sidecar {}",
176				self.SideCarIdentifier
177			)));
178		}
179
180		match tokio::time::timeout(Timeout, Rx).await {
181			Ok(Ok(Response)) => {
182				if let Some(Error) = Response.error {
183					return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
184				}
185
186				if Response.result.is_empty() {
187					return Ok(Value::Null);
188				}
189
190				serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
191			},
192
193			Ok(Err(_)) => {
194				self.Pending.remove(&Identifier);
195
196				Err(VineError::RPCError(
197					"response sender closed (peer disconnect mid-request)".into(),
198				))
199			},
200
201			Err(_) => {
202				self.Pending.remove(&Identifier);
203
204				Err(VineError::RequestTimeout {
205					SideCarIdentifier:self.SideCarIdentifier.clone(),
206					MethodName:MethodForError,
207					TimeoutMilliseconds:Timeout.as_millis() as u64,
208				})
209			},
210		}
211	}
212
213	/// Send a Cancel frame asking the peer to abort an in-flight
214	/// request matching `RequestIdentifier`. Best-effort; the peer
215	/// chooses whether to honour it.
216	pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
217		if self.Closed.load(Ordering::Relaxed) {
218			return Ok(());
219		}
220
221		let Frame = Envelope {
222			payload:Some(Payload::Cancel(CancelOperationRequest {
223				request_identifier_to_cancel:RequestIdentifier,
224			})),
225		};
226
227		let _ = self.Sink.send(Frame).await;
228
229		Ok(())
230	}
231
232	pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
233
234	pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
235}
236
237/// Drain the inbound side of the bidirectional stream. Notifications
238/// fan out to the process-wide broadcast; responses wake the parked
239/// `Request` future. Reverse-RPC requests and cancellations are
240/// recorded for a follow-up phase.
241async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
242	use futures_util::StreamExt;
243
244	while let Some(FrameResult) = Stream.next().await {
245		let Frame = match FrameResult {
246			Ok(F) => F,
247
248			Err(Status) => {
249				dev_log!(
250					"grpc",
251					"[Vine::Multiplexer] read err on {}: {}",
252					State.SideCarIdentifier,
253					Status
254				);
255
256				break;
257			},
258		};
259
260		let Payload = match Frame.payload {
261			Some(P) => P,
262
263			None => continue,
264		};
265
266		match Payload {
267			Payload::Notification(N) => {
268				let Parameters:Value = if N.parameter.is_empty() {
269					Value::Null
270				} else {
271					serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
272				};
273
274				super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
275			},
276
277			Payload::Response(R) => {
278				let Identifier = R.request_identifier;
279
280				if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
281					let _ = Sender.send(R);
282				}
283
284				// A Response with no matching pending entry is a
285				// duplicate or post-cancel arrival; drop silently.
286			},
287
288			Payload::Request(_) => {
289
290				// TODO P14.1.1: dispatch the inbound (reverse-RPC)
291				// request to the same handler tree the unary path
292				// uses, then enqueue the GenericResponse onto Sink.
293				// For now we drop, which is safe: the unary path is
294				// still authoritative until phase P14.4 lands the
295				// streaming handler tree on Cocoon side.
296			},
297
298			Payload::Cancel(_) => {
299
300				// TODO P14.1.2: signal abort for the in-flight
301				// handler. For now no-op (the unary path doesn't
302				// support cancel either).
303			},
304		}
305	}
306
307	State.Closed.store(true, Ordering::Relaxed);
308
309	// Drain pending senders with disconnect errors so awaiting
310	// fibers don't hang forever.
311	let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
312
313	for Key in Keys {
314		if let Some((_, Sender)) = State.Pending.remove(&Key) {
315			let _ = Sender.send(GenericResponse {
316				request_identifier:Key,
317				result:Vec::new(),
318				error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
319			});
320		}
321	}
322
323	Multiplexer::Deregister(&State.SideCarIdentifier);
324
325	dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
326}