Skip to main content

Mountain/Vine/
Multiplexer.rs

1#![allow(non_snake_case)]
2
3//! Bidirectional streaming multiplexer for the Vine gRPC bus.
4//!
5//! Owns one bidirectional h2 stream per sidecar. Inbound notifications
6//! fan out to the process-wide broadcast
7//! (`Vine::Client::SubscribeNotifications`); inbound responses route to the
8//! matching pending-request `oneshot` sender. Inbound reverse-RPC requests and
9//! cancellations are TODO for a follow-up phase.
10//!
11//! This is the P14.1 foundation of Patch 14 - it lands the open(),
12//! Notify(), Request(), and ReadPump skeleton so subsequent phases can
13//! wire `SendNotification` / `SendRequest` to consult the multiplexer
14//! when `LAND_VINE_STREAMING=1` is set.
15
16use std::{
17	collections::HashMap,
18	sync::{
19		Arc,
20		atomic::{AtomicBool, AtomicU64, Ordering},
21	},
22	time::Duration,
23};
24
25use dashmap::DashMap;
26use lazy_static::lazy_static;
27use parking_lot::Mutex;
28use serde_json::Value;
29use tokio::sync::{mpsc, oneshot};
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::Streaming;
32
33use super::{
34	Error::VineError,
35	Generated::{
36		CancelOperationRequest,
37		Envelope,
38		GenericNotification,
39		GenericRequest,
40		GenericResponse,
41		RpcError,
42		cocoon_service_client::CocoonServiceClient,
43		envelope::Payload,
44	},
45};
46use crate::dev_log;
47
48/// Outbound queue capacity per multiplexer. Bounded so a stalled
49/// sidecar applies backpressure to the producer side instead of
50/// burning unbounded heap.
51const SINK_CAPACITY:usize = 1024;
52
53/// One multiplexer per sidecar connection. Holds the outbound sink,
54/// the pending-request correlation map, and a shared-state shutdown
55/// flag.
56pub struct Multiplexer {
57	SideCarIdentifier:String,
58
59	Sink:mpsc::Sender<Envelope>,
60
61	Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
62
63	NextRequestIdentifier:AtomicU64,
64
65	Closed:AtomicBool,
66}
67
68lazy_static! {
69
70	/// Process-wide registry, one entry per sidecar identifier.
71	/// Lookup site for `SendNotification` / `SendRequest` to consult
72	/// when `LAND_VINE_STREAMING=1`.
73	static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
74}
75
76impl Multiplexer {
77	/// Open a bidirectional streaming channel against an existing
78	/// `CocoonServiceClient`. Spawns the read pump as a detached
79	/// tokio task and registers the multiplexer in the global
80	/// registry. Returns once the stream is established.
81	pub async fn Open(
82		SideCarIdentifier:String,
83
84		mut Client:CocoonServiceClient<tonic::transport::Channel>,
85	) -> Result<Arc<Self>, VineError> {
86		let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
87
88		let OutboundStream = ReceiverStream::new(OutboundReceiver);
89
90		let Response = Client
91			.open_channel_from_mountain(OutboundStream)
92			.await
93			.map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
94
95		let InboundStream:Streaming<Envelope> = Response.into_inner();
96
97		let SelfReference = Arc::new(Self {
98			SideCarIdentifier:SideCarIdentifier.clone(),
99			Sink,
100			Pending:Arc::new(DashMap::new()),
101			NextRequestIdentifier:AtomicU64::new(1),
102			Closed:AtomicBool::new(false),
103		});
104
105		// Spawn the read pump.
106		let SelfForReadPump = SelfReference.clone();
107
108		tokio::spawn(async move {
109			ReadPump(InboundStream, SelfForReadPump).await;
110		});
111
112		// Register globally so consumers can look us up.
113		MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
114
115		Ok(SelfReference)
116	}
117
118	/// Look up the multiplexer for a sidecar. Returns `None` if no
119	/// streaming connection has been opened for that sidecar (the
120	/// caller should fall back to the unary path).
121	pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
122
123	/// Drop the registry entry. Called by the read-pump when the
124	/// stream closes.
125	pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
126
127	/// Send a notification frame (fire-and-forget). Non-blocking
128	/// modulo Sink backpressure (capacity `SINK_CAPACITY`).
129	pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
130		if self.Closed.load(Ordering::Relaxed) {
131			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
132		}
133
134		let Bytes = serde_json::to_vec(&Parameters)?;
135
136		let Frame = Envelope {
137			payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
138		};
139
140		self.Sink
141			.send(Frame)
142			.await
143			.map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
144	}
145
146	/// Send a request and await the matching response. Cancels the
147	/// pending entry on timeout. The future is `Send + 'static`-clean
148	/// so callers can drive it inside `tokio::select!` for finer-
149	/// grained cancellation.
150	pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
151		if self.Closed.load(Ordering::Relaxed) {
152			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
153		}
154
155		let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
156
157		let (Tx, Rx) = oneshot::channel();
158
159		self.Pending.insert(Identifier, Tx);
160
161		let Bytes = serde_json::to_vec(&Parameters)?;
162
163		let MethodForError = Method.clone();
164
165		let Frame = Envelope {
166			payload:Some(Payload::Request(GenericRequest {
167				request_identifier:Identifier,
168				method:Method,
169				parameter:Bytes,
170			})),
171		};
172
173		if self.Sink.send(Frame).await.is_err() {
174			self.Pending.remove(&Identifier);
175
176			return Err(VineError::RPCError(format!(
177				"Sink closed for sidecar {}",
178				self.SideCarIdentifier
179			)));
180		}
181
182		match tokio::time::timeout(Timeout, Rx).await {
183			Ok(Ok(Response)) => {
184				if let Some(Error) = Response.error {
185					return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
186				}
187
188				if Response.result.is_empty() {
189					return Ok(Value::Null);
190				}
191
192				serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
193			},
194
195			Ok(Err(_)) => {
196				self.Pending.remove(&Identifier);
197
198				Err(VineError::RPCError(
199					"response sender closed (peer disconnect mid-request)".into(),
200				))
201			},
202
203			Err(_) => {
204				self.Pending.remove(&Identifier);
205
206				Err(VineError::RequestTimeout {
207					SideCarIdentifier:self.SideCarIdentifier.clone(),
208					MethodName:MethodForError,
209					TimeoutMilliseconds:Timeout.as_millis() as u64,
210				})
211			},
212		}
213	}
214
215	/// Send a Cancel frame asking the peer to abort an in-flight
216	/// request matching `RequestIdentifier`. Best-effort; the peer
217	/// chooses whether to honour it.
218	pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
219		if self.Closed.load(Ordering::Relaxed) {
220			return Ok(());
221		}
222
223		let Frame = Envelope {
224			payload:Some(Payload::Cancel(CancelOperationRequest {
225				request_identifier_to_cancel:RequestIdentifier,
226			})),
227		};
228
229		let _ = self.Sink.send(Frame).await;
230
231		Ok(())
232	}
233
234	pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
235
236	pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
237}
238
239/// Drain the inbound side of the bidirectional stream. Notifications
240/// fan out to the process-wide broadcast; responses wake the parked
241/// `Request` future. Reverse-RPC requests and cancellations are
242/// recorded for a follow-up phase.
243async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
244	use futures_util::StreamExt;
245
246	while let Some(FrameResult) = Stream.next().await {
247		let Frame = match FrameResult {
248			Ok(F) => F,
249
250			Err(Status) => {
251				dev_log!(
252					"grpc",
253					"[Vine::Multiplexer] read err on {}: {}",
254					State.SideCarIdentifier,
255					Status
256				);
257
258				break;
259			},
260		};
261
262		let Payload = match Frame.payload {
263			Some(P) => P,
264
265			None => continue,
266		};
267
268		match Payload {
269			Payload::Notification(N) => {
270				let Parameters:Value = if N.parameter.is_empty() {
271					Value::Null
272				} else {
273					serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
274				};
275
276				super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
277			},
278
279			Payload::Response(R) => {
280				let Identifier = R.request_identifier;
281
282				if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
283					let _ = Sender.send(R);
284				}
285
286				// A Response with no matching pending entry is a
287				// duplicate or post-cancel arrival; drop silently.
288			},
289
290			Payload::Request(_) => {
291
292				// TODO P14.1.1: dispatch the inbound (reverse-RPC)
293				// request to the same handler tree the unary path
294				// uses, then enqueue the GenericResponse onto Sink.
295				// For now we drop, which is safe: the unary path is
296				// still authoritative until phase P14.4 lands the
297				// streaming handler tree on Cocoon side.
298			},
299
300			Payload::Cancel(_) => {
301
302				// TODO P14.1.2: signal abort for the in-flight
303				// handler. For now no-op (the unary path doesn't
304				// support cancel either).
305			},
306		}
307	}
308
309	State.Closed.store(true, Ordering::Relaxed);
310
311	// Drain pending senders with disconnect errors so awaiting
312	// fibers don't hang forever.
313	let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
314
315	for Key in Keys {
316		if let Some((_, Sender)) = State.Pending.remove(&Key) {
317			let _ = Sender.send(GenericResponse {
318				request_identifier:Key,
319				result:Vec::new(),
320				error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
321			});
322		}
323	}
324
325	Multiplexer::Deregister(&State.SideCarIdentifier);
326
327	dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
328}