DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Client/
SendRequest.rs1use std::time::Duration;
9
10use serde_json::{Value, from_slice, to_vec};
11use tokio::time::timeout;
12
13use crate::{
14 Vine::{
15 Client::{
16 IsShuttingDown,
17 Shared::{
18 DEFAULT_TIMEOUT_MS,
19 RecordSideCarFailure,
20 SIDECAR_CLIENTS,
21 UpdateSideCarActivity,
22 ValidateMessageSize,
23 },
24 },
25 Error::VineError,
26 Generated::GenericRequest,
27 },
28 dev_log,
29};
30
31pub async fn Fn(
32 SideCarIdentifier:&str,
33
34 Method:String,
35
36 Parameters:Value,
37
38 TimeoutMilliseconds:u64,
39) -> Result<Value, VineError> {
40 if IsShuttingDown::Fn() {
41 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
42 }
43
44 if Method.is_empty() || Method.len() > 128 {
45 return Err(VineError::RPCError(
46 "Method name must be between 1 and 128 characters".to_string(),
47 ));
48 }
49
50 let TimeoutDuration =
51 Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
52
53 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
54 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
55 if !Mux.IsClosed() {
56 match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
57 Ok(Result_) => {
58 UpdateSideCarActivity(SideCarIdentifier);
59
60 return Ok(Result_);
61 },
62
63 Err(VineError::RequestTimeout { .. }) => {
64 return Err(VineError::RequestTimeout {
65 SideCarIdentifier:SideCarIdentifier.to_string(),
66 MethodName:Method,
67 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
68 });
69 },
70
71 Err(Error) => {
72 dev_log!(
73 "grpc",
74 "warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
75 unary",
76 SideCarIdentifier,
77 Method,
78 Error
79 );
80 },
81 }
82 }
83 }
84 }
85
86 let ParameterBytes =
87 to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
88
89 ValidateMessageSize(&ParameterBytes)?;
90
91 let Client = {
92 let Pool = SIDECAR_CLIENTS.lock();
93
94 Pool.get(SideCarIdentifier).cloned()
95 };
96
97 let Some(mut Client) = Client else {
98 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
99 };
100
101 use std::sync::atomic::{AtomicU64, Ordering as AO};
102
103 static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
104
105 let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
106
107 let MethodForLog = Method.clone();
108
109 let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
110
111 let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
112
113 match Result_ {
114 Ok(Ok(Response)) => {
115 UpdateSideCarActivity(SideCarIdentifier);
116
117 dev_log!(
118 "grpc",
119 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
120 SideCarIdentifier,
121 MethodForLog
122 );
123
124 let InnerResponse = Response.into_inner();
125
126 let ResultBytes = InnerResponse.result;
127
128 let ResultValue:Value = from_slice(&ResultBytes)
129 .map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
130
131 if let Some(ErrorData) = InnerResponse.error {
132 return Err(VineError::RPCError(format!(
133 "RPC error from sidecar: code={}, message={}",
134 ErrorData.code, ErrorData.message
135 )));
136 }
137
138 Ok(ResultValue)
139 },
140
141 Ok(Err(Status)) => {
142 RecordSideCarFailure(SideCarIdentifier);
143
144 Err(VineError::RPCError(format!("gRPC error: {}", Status)))
145 },
146
147 Err(_) => {
148 RecordSideCarFailure(SideCarIdentifier);
149
150 Err(VineError::RequestTimeout {
151 SideCarIdentifier:SideCarIdentifier.to_string(),
152 MethodName:MethodForLog,
153 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
154 })
155 },
156 }
157}