1#![allow(non_snake_case)]
2
3use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use dashmap::DashMap;
26use lazy_static::lazy_static;
27use parking_lot::Mutex;
28use serde_json::Value;
29use tokio::sync::{mpsc, oneshot};
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::Streaming;
32
33use super::{
34 Error::VineError,
35 Generated::{
36 CancelOperationRequest,
37 Envelope,
38 GenericNotification,
39 GenericRequest,
40 GenericResponse,
41 RpcError,
42 cocoon_service_client::CocoonServiceClient,
43 envelope::Payload,
44 },
45};
46use crate::dev_log;
47
48const SINK_CAPACITY:usize = 1024;
52
53pub struct Multiplexer {
57 SideCarIdentifier:String,
58
59 Sink:mpsc::Sender<Envelope>,
60
61 Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
62
63 NextRequestIdentifier:AtomicU64,
64
65 Closed:AtomicBool,
66}
67
68lazy_static! {
69
70 static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
74}
75
76impl Multiplexer {
77 pub async fn Open(
82 SideCarIdentifier:String,
83
84 mut Client:CocoonServiceClient<tonic::transport::Channel>,
85 ) -> Result<Arc<Self>, VineError> {
86 let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
87
88 let OutboundStream = ReceiverStream::new(OutboundReceiver);
89
90 let Response = Client
91 .open_channel_from_mountain(OutboundStream)
92 .await
93 .map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
94
95 let InboundStream:Streaming<Envelope> = Response.into_inner();
96
97 let SelfReference = Arc::new(Self {
98 SideCarIdentifier:SideCarIdentifier.clone(),
99 Sink,
100 Pending:Arc::new(DashMap::new()),
101 NextRequestIdentifier:AtomicU64::new(1),
102 Closed:AtomicBool::new(false),
103 });
104
105 let SelfForReadPump = SelfReference.clone();
107
108 tokio::spawn(async move {
109 ReadPump(InboundStream, SelfForReadPump).await;
110 });
111
112 MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
114
115 Ok(SelfReference)
116 }
117
118 pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
122
123 pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
126
127 pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
130 if self.Closed.load(Ordering::Relaxed) {
131 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
132 }
133
134 let Bytes = serde_json::to_vec(&Parameters)?;
135
136 let Frame = Envelope {
137 payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
138 };
139
140 self.Sink
141 .send(Frame)
142 .await
143 .map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
144 }
145
146 pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
151 if self.Closed.load(Ordering::Relaxed) {
152 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
153 }
154
155 let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
156
157 let (Tx, Rx) = oneshot::channel();
158
159 self.Pending.insert(Identifier, Tx);
160
161 let Bytes = serde_json::to_vec(&Parameters)?;
162
163 let MethodForError = Method.clone();
164
165 let Frame = Envelope {
166 payload:Some(Payload::Request(GenericRequest {
167 request_identifier:Identifier,
168 method:Method,
169 parameter:Bytes,
170 })),
171 };
172
173 if self.Sink.send(Frame).await.is_err() {
174 self.Pending.remove(&Identifier);
175
176 return Err(VineError::RPCError(format!(
177 "Sink closed for sidecar {}",
178 self.SideCarIdentifier
179 )));
180 }
181
182 match tokio::time::timeout(Timeout, Rx).await {
183 Ok(Ok(Response)) => {
184 if let Some(Error) = Response.error {
185 return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
186 }
187
188 if Response.result.is_empty() {
189 return Ok(Value::Null);
190 }
191
192 serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
193 },
194
195 Ok(Err(_)) => {
196 self.Pending.remove(&Identifier);
197
198 Err(VineError::RPCError(
199 "response sender closed (peer disconnect mid-request)".into(),
200 ))
201 },
202
203 Err(_) => {
204 self.Pending.remove(&Identifier);
205
206 Err(VineError::RequestTimeout {
207 SideCarIdentifier:self.SideCarIdentifier.clone(),
208 MethodName:MethodForError,
209 TimeoutMilliseconds:Timeout.as_millis() as u64,
210 })
211 },
212 }
213 }
214
215 pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
219 if self.Closed.load(Ordering::Relaxed) {
220 return Ok(());
221 }
222
223 let Frame = Envelope {
224 payload:Some(Payload::Cancel(CancelOperationRequest {
225 request_identifier_to_cancel:RequestIdentifier,
226 })),
227 };
228
229 let _ = self.Sink.send(Frame).await;
230
231 Ok(())
232 }
233
234 pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
235
236 pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
237}
238
239async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
244 use futures_util::StreamExt;
245
246 while let Some(FrameResult) = Stream.next().await {
247 let Frame = match FrameResult {
248 Ok(F) => F,
249
250 Err(Status) => {
251 dev_log!(
252 "grpc",
253 "[Vine::Multiplexer] read err on {}: {}",
254 State.SideCarIdentifier,
255 Status
256 );
257
258 break;
259 },
260 };
261
262 let Payload = match Frame.payload {
263 Some(P) => P,
264
265 None => continue,
266 };
267
268 match Payload {
269 Payload::Notification(N) => {
270 let Parameters:Value = if N.parameter.is_empty() {
271 Value::Null
272 } else {
273 serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
274 };
275
276 super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
277 },
278
279 Payload::Response(R) => {
280 let Identifier = R.request_identifier;
281
282 if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
283 let _ = Sender.send(R);
284 }
285
286 },
289
290 Payload::Request(_) => {
291
292 },
299
300 Payload::Cancel(_) => {
301
302 },
306 }
307 }
308
309 State.Closed.store(true, Ordering::Relaxed);
310
311 let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
314
315 for Key in Keys {
316 if let Some((_, Sender)) = State.Pending.remove(&Key) {
317 let _ = Sender.send(GenericResponse {
318 request_identifier:Key,
319 result:Vec::new(),
320 error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
321 });
322 }
323 }
324
325 Multiplexer::Deregister(&State.SideCarIdentifier);
326
327 dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
328}