Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Client/
SendRequest.rs

1//! Send a request and await a response. Validates method-name length
2//! and message size, prefers the streaming multiplexer when
3//! `LAND_VINE_STREAMING=1` is on (falls through to unary on any failure
4//! except the authoritative streaming-path timeout), enforces a per-call
5//! timeout via `tokio::time::timeout`, and updates per-connection
6//! activity / failure metadata on completion.
7
8use std::time::Duration;
9
10use serde_json::{Value, from_slice, to_vec};
11use tokio::time::timeout;
12
13use crate::{
14	Vine::{
15		Client::{
16			IsShuttingDown,
17			Shared::{
18				DEFAULT_TIMEOUT_MS,
19				RecordSideCarFailure,
20				SIDECAR_CLIENTS,
21				UpdateSideCarActivity,
22				ValidateMessageSize,
23			},
24		},
25		Error::VineError,
26		Generated::GenericRequest,
27	},
28	dev_log,
29};
30
31pub async fn Fn(
32	SideCarIdentifier:&str,
33
34	Method:String,
35
36	Parameters:Value,
37
38	TimeoutMilliseconds:u64,
39) -> Result<Value, VineError> {
40	if IsShuttingDown::Fn() {
41		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
42	}
43
44	if Method.is_empty() || Method.len() > 128 {
45		return Err(VineError::RPCError(
46			"Method name must be between 1 and 128 characters".to_string(),
47		));
48	}
49
50	let TimeoutDuration =
51		Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
52
53	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
54		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
55			if !Mux.IsClosed() {
56				match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
57					Ok(Result_) => {
58						UpdateSideCarActivity(SideCarIdentifier);
59
60						return Ok(Result_);
61					},
62
63					Err(VineError::RequestTimeout { .. }) => {
64						return Err(VineError::RequestTimeout {
65							SideCarIdentifier:SideCarIdentifier.to_string(),
66							MethodName:Method,
67							TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
68						});
69					},
70
71					Err(Error) => {
72						dev_log!(
73							"grpc",
74							"warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
75							 unary",
76							SideCarIdentifier,
77							Method,
78							Error
79						);
80					},
81				}
82			}
83		}
84	}
85
86	let ParameterBytes =
87		to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
88
89	ValidateMessageSize(&ParameterBytes)?;
90
91	let Client = {
92		let Pool = SIDECAR_CLIENTS.lock();
93
94		Pool.get(SideCarIdentifier).cloned()
95	};
96
97	let Some(mut Client) = Client else {
98		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
99	};
100
101	use std::sync::atomic::{AtomicU64, Ordering as AO};
102
103	static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
104
105	let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
106
107	let MethodForLog = Method.clone();
108
109	let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
110
111	let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
112
113	match Result_ {
114		Ok(Ok(Response)) => {
115			UpdateSideCarActivity(SideCarIdentifier);
116
117			dev_log!(
118				"grpc",
119				"[VineClient] Request sent successfully to sidecar '{}': method='{}'",
120				SideCarIdentifier,
121				MethodForLog
122			);
123
124			let InnerResponse = Response.into_inner();
125
126			let ResultBytes = InnerResponse.result;
127
128			let ResultValue:Value = from_slice(&ResultBytes)
129				.map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
130
131			if let Some(ErrorData) = InnerResponse.error {
132				return Err(VineError::RPCError(format!(
133					"RPC error from sidecar: code={}, message={}",
134					ErrorData.code, ErrorData.message
135				)));
136			}
137
138			Ok(ResultValue)
139		},
140
141		Ok(Err(Status)) => {
142			RecordSideCarFailure(SideCarIdentifier);
143
144			Err(VineError::RPCError(format!("gRPC error: {}", Status)))
145		},
146
147		Err(_) => {
148			RecordSideCarFailure(SideCarIdentifier);
149
150			Err(VineError::RequestTimeout {
151				SideCarIdentifier:SideCarIdentifier.to_string(),
152				MethodName:MethodForLog,
153				TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
154			})
155		},
156	}
157}