Mountain/IPC/Enhanced/MessageCompressor/
Compressor.rs1#![allow(non_snake_case)]
2
3use std::{
10 collections::VecDeque,
11 io::{Read, Write},
12};
13
14use bincode::serde::{decode_from_slice, encode_to_vec};
15use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
16use flate2::{
17 Compression,
18 write::{GzEncoder, ZlibEncoder},
19};
20use tokio::time::Instant;
21
22use crate::IPC::Enhanced::MessageCompressor::{
23 BatchConfig::Struct as BatchConfig,
24 BatchStats::Struct as BatchStats,
25 CompressedBatch::Struct as CompressedBatch,
26 CompressionAlgorithm::Enum as CompressionAlgorithm,
27 CompressionInfo::Struct as CompressionInfo,
28 CompressionLevel::Enum as CompressionLevel,
29};
30
31pub struct Struct {
32 pub(super) Config:BatchConfig,
33
34 pub(super) CurrentBatch:VecDeque<Vec<u8>>,
35
36 pub(super) BatchStartTime:Option<Instant>,
37
38 pub(super) BatchSizeBytes:usize,
39}
40
41impl Struct {
42 pub fn new(config:BatchConfig) -> Self {
43 Self {
44 Config:config,
45
46 CurrentBatch:VecDeque::new(),
47
48 BatchStartTime:None,
49
50 BatchSizeBytes:0,
51 }
52 }
53
54 pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
55 let MessageSize = MessageData.len();
56
57 let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
58
59 if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
60 return false;
61 }
62
63 self.CurrentBatch.push_back(MessageData.to_vec());
64
65 self.BatchSizeBytes += MessageSize;
66
67 if self.BatchStartTime.is_none() {
68 self.BatchStartTime = Some(Instant::now());
69 }
70
71 true
72 }
73
74 pub fn should_flush(&self) -> bool {
75 if self.CurrentBatch.is_empty() {
76 return false;
77 }
78
79 if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
80 return true;
81 }
82
83 if let Some(start_time) = self.BatchStartTime {
84 let elapsed = start_time.elapsed();
85
86 if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
87 return true;
88 }
89 }
90
91 false
92 }
93
94 pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
95 if self.CurrentBatch.is_empty() {
96 return Err("No messages in batch to flush".to_string());
97 }
98
99 let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
100
101 let total_size = self.BatchSizeBytes;
102
103 self.BatchStartTime = None;
104
105 self.BatchSizeBytes = 0;
106
107 let config = bincode::config::standard();
108
109 let serialized_batch =
110 encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
111
112 let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
113 self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
114 } else {
115 Ok((None, CompressionInfo::none()))
116 }?;
117
118 Ok(CompressedBatch {
119 messages_count:BatchMessages.len(),
120 original_size:total_size,
121 compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size),
122 compressed_data:CompressedData,
123 compression_info,
124 timestamp:std::time::SystemTime::now()
125 .duration_since(std::time::UNIX_EPOCH)
126 .unwrap_or_default()
127 .as_millis() as u64,
128 })
129 }
130
131 fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
132 match self.Config.Algorithm {
133 CompressionAlgorithm::Brotli => self.compress_brotli(data),
134
135 CompressionAlgorithm::Gzip => self.compress_gzip(data),
136
137 CompressionAlgorithm::Zlib => self.compress_zlib(data),
138 }
139 }
140
141 fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
142 let mut params = BrotliEncoderParams::default();
143
144 params.quality = self.Config.CompressionLevel as i32;
145
146 let mut compressed = Vec::new();
147
148 {
149 let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), ¶ms);
150
151 std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
152
153 writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
154 }
155
156 let ratio = data.len() as f64 / compressed.len() as f64;
157
158 Ok((
159 compressed,
160 CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
161 ))
162 }
163
164 fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
165 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
166
167 encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
168
169 let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
170
171 let ratio = data.len() as f64 / compressed.len() as f64;
172
173 Ok((
174 compressed,
175 CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
176 ))
177 }
178
179 fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
180 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
181
182 encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
183
184 let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
185
186 let ratio = data.len() as f64 / compressed.len() as f64;
187
188 Ok((
189 compressed,
190 CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
191 ))
192 }
193
194 pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
195 let data = if let Some(ref compressed_data) = batch.compressed_data {
196 self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
197 } else {
198 encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
199 };
200
201 let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
202 .map_err(|e| format!("Failed to deserialize batch: {}", e))?;
203
204 Ok(decoded)
205 }
206
207 fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
208 match algorithm {
209 "brotli" => self.decompress_brotli(data),
210
211 "gzip" => self.decompress_gzip(data),
212
213 "zlib" => self.decompress_zlib(data),
214
215 _ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
216 }
217 }
218
219 fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
220 let mut decompressed = Vec::new();
221
222 let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
223
224 std::io::Read::read_to_end(&mut reader, &mut decompressed)
225 .map_err(|e| format!("Brotli decompression failed: {}", e))?;
226
227 Ok(decompressed)
228 }
229
230 fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
231 use flate2::read::GzDecoder;
232
233 let mut decoder = GzDecoder::new(data);
234
235 let mut decompressed = Vec::new();
236
237 decoder
238 .read_to_end(&mut decompressed)
239 .map_err(|e| format!("Gzip decompression failed: {}", e))?;
240
241 Ok(decompressed)
242 }
243
244 fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
245 use flate2::read::ZlibDecoder;
246
247 let mut decoder = ZlibDecoder::new(data);
248
249 let mut decompressed = Vec::new();
250
251 decoder
252 .read_to_end(&mut decompressed)
253 .map_err(|e| format!("Zlib decompression failed: {}", e))?;
254
255 Ok(decompressed)
256 }
257
258 pub fn get_batch_stats(&self) -> BatchStats {
259 BatchStats {
260 messages_count:self.CurrentBatch.len(),
261
262 total_size_bytes:self.BatchSizeBytes,
263
264 batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
265 }
266 }
267
268 pub fn clear_batch(&mut self) {
269 self.CurrentBatch.clear();
270
271 self.BatchStartTime = None;
272
273 self.BatchSizeBytes = 0;
274 }
275
276 pub fn compress_single_message(
277 message_data:&[u8],
278
279 algorithm:CompressionAlgorithm,
280
281 level:CompressionLevel,
282 ) -> Result<(Vec<u8>, CompressionInfo), String> {
283 let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
284
285 let compressor = Self::new(config);
286
287 compressor.compress_data(message_data)
288 }
289
290 pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
291 if compressed_size == 0 {
292 return 0.0;
293 }
294
295 original_size as f64 / compressed_size as f64
296 }
297
298 pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
299 (original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
300 }
301}