DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/
Multiplexer.rs1use std::{
15 collections::HashMap,
16 sync::{
17 Arc,
18 atomic::{AtomicBool, AtomicU64, Ordering},
19 },
20 time::Duration,
21};
22
23use dashmap::DashMap;
24use lazy_static::lazy_static;
25use parking_lot::Mutex;
26use serde_json::Value;
27use tokio::sync::{mpsc, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30
31use super::{
32 Error::VineError,
33 Generated::{
34 CancelOperationRequest,
35 Envelope,
36 GenericNotification,
37 GenericRequest,
38 GenericResponse,
39 RpcError,
40 cocoon_service_client::CocoonServiceClient,
41 envelope::Payload,
42 },
43};
44use crate::dev_log;
45
46const SINK_CAPACITY:usize = 1024;
50
51pub struct Multiplexer {
55 SideCarIdentifier:String,
56
57 Sink:mpsc::Sender<Envelope>,
58
59 Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
60
61 NextRequestIdentifier:AtomicU64,
62
63 Closed:AtomicBool,
64}
65
66lazy_static! {
67
68 static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
72}
73
74impl Multiplexer {
75 pub async fn Open(
80 SideCarIdentifier:String,
81
82 mut Client:CocoonServiceClient<tonic::transport::Channel>,
83 ) -> Result<Arc<Self>, VineError> {
84 let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
85
86 let OutboundStream = ReceiverStream::new(OutboundReceiver);
87
88 let Response = Client
89 .open_channel_from_mountain(OutboundStream)
90 .await
91 .map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
92
93 let InboundStream:Streaming<Envelope> = Response.into_inner();
94
95 let SelfReference = Arc::new(Self {
96 SideCarIdentifier:SideCarIdentifier.clone(),
97 Sink,
98 Pending:Arc::new(DashMap::new()),
99 NextRequestIdentifier:AtomicU64::new(1),
100 Closed:AtomicBool::new(false),
101 });
102
103 let SelfForReadPump = SelfReference.clone();
105
106 tokio::spawn(async move {
107 ReadPump(InboundStream, SelfForReadPump).await;
108 });
109
110 MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
112
113 Ok(SelfReference)
114 }
115
116 pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
120
121 pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
124
125 pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
128 if self.Closed.load(Ordering::Relaxed) {
129 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
130 }
131
132 let Bytes = serde_json::to_vec(&Parameters)?;
133
134 let Frame = Envelope {
135 payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
136 };
137
138 self.Sink
139 .send(Frame)
140 .await
141 .map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
142 }
143
144 pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
149 if self.Closed.load(Ordering::Relaxed) {
150 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
151 }
152
153 let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
154
155 let (Tx, Rx) = oneshot::channel();
156
157 self.Pending.insert(Identifier, Tx);
158
159 let Bytes = serde_json::to_vec(&Parameters)?;
160
161 let MethodForError = Method.clone();
162
163 let Frame = Envelope {
164 payload:Some(Payload::Request(GenericRequest {
165 request_identifier:Identifier,
166 method:Method,
167 parameter:Bytes,
168 })),
169 };
170
171 if self.Sink.send(Frame).await.is_err() {
172 self.Pending.remove(&Identifier);
173
174 return Err(VineError::RPCError(format!(
175 "Sink closed for sidecar {}",
176 self.SideCarIdentifier
177 )));
178 }
179
180 match tokio::time::timeout(Timeout, Rx).await {
181 Ok(Ok(Response)) => {
182 if let Some(Error) = Response.error {
183 return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
184 }
185
186 if Response.result.is_empty() {
187 return Ok(Value::Null);
188 }
189
190 serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
191 },
192
193 Ok(Err(_)) => {
194 self.Pending.remove(&Identifier);
195
196 Err(VineError::RPCError(
197 "response sender closed (peer disconnect mid-request)".into(),
198 ))
199 },
200
201 Err(_) => {
202 self.Pending.remove(&Identifier);
203
204 Err(VineError::RequestTimeout {
205 SideCarIdentifier:self.SideCarIdentifier.clone(),
206 MethodName:MethodForError,
207 TimeoutMilliseconds:Timeout.as_millis() as u64,
208 })
209 },
210 }
211 }
212
213 pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
217 if self.Closed.load(Ordering::Relaxed) {
218 return Ok(());
219 }
220
221 let Frame = Envelope {
222 payload:Some(Payload::Cancel(CancelOperationRequest {
223 request_identifier_to_cancel:RequestIdentifier,
224 })),
225 };
226
227 let _ = self.Sink.send(Frame).await;
228
229 Ok(())
230 }
231
232 pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
233
234 pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
235}
236
237async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
242 use futures_util::StreamExt;
243
244 while let Some(FrameResult) = Stream.next().await {
245 let Frame = match FrameResult {
246 Ok(F) => F,
247
248 Err(Status) => {
249 dev_log!(
250 "grpc",
251 "[Vine::Multiplexer] read err on {}: {}",
252 State.SideCarIdentifier,
253 Status
254 );
255
256 break;
257 },
258 };
259
260 let Payload = match Frame.payload {
261 Some(P) => P,
262
263 None => continue,
264 };
265
266 match Payload {
267 Payload::Notification(N) => {
268 let Parameters:Value = if N.parameter.is_empty() {
269 Value::Null
270 } else {
271 serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
272 };
273
274 super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
275 },
276
277 Payload::Response(R) => {
278 let Identifier = R.request_identifier;
279
280 if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
281 let _ = Sender.send(R);
282 }
283
284 },
287
288 Payload::Request(_) => {
289
290 },
297
298 Payload::Cancel(_) => {
299
300 },
304 }
305 }
306
307 State.Closed.store(true, Ordering::Relaxed);
308
309 let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
312
313 for Key in Keys {
314 if let Some((_, Sender)) = State.Pending.remove(&Key) {
315 let _ = Sender.send(GenericResponse {
316 request_identifier:Key,
317 result:Vec::new(),
318 error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
319 });
320 }
321 }
322
323 Multiplexer::Deregister(&State.SideCarIdentifier);
324
325 dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
326}