1#![allow(non_snake_case)]
19
20use std::collections::{VecDeque, HashMap};
84use std::sync::{Arc, RwLock};
85use std::time::{SystemTime, UNIX_EPOCH, Duration};
86use serde::{Serialize, Deserialize};
87
88#[cfg(feature = "pyo3")]
89use pyo3::prelude::*;
90
91use crate::core::DMSCResult;
92use crate::core::lock::RwLockExtensions;
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
97pub enum DMSCMetricType {
98 Counter,
99 Gauge,
100 Histogram,
101 Summary,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
107pub struct DMSCMetricSample {
108 pub timestamp: u64, pub value: f64,
110 pub labels: Vec<(String, String)>,
111}
112
113#[derive(Debug, Clone)]
115#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
116pub struct DMSCMetricConfig {
117 pub metric_type: DMSCMetricType,
118 pub name: String,
119 pub help: String,
120 pub buckets: Vec<f64>, pub quantiles: Vec<f64>, pub max_age: Duration, pub age_buckets: usize, }
125
126#[allow(dead_code)]
128struct DMSCSlidingWindow {
129 #[allow(dead_code)]
130 window_size: Duration,
131 #[allow(dead_code)]
132 bucket_size: Duration,
133 buckets: VecDeque<Vec<DMSCMetricSample>>,
134 current_bucket: Vec<DMSCMetricSample>,
135 #[allow(dead_code)]
136 last_rotation: u64,
137}
138
139impl DMSCSlidingWindow {
140 fn new(window_size: Duration, bucket_size: Duration) -> Self {
141 let bucket_count = window_size.as_secs().div_ceil(bucket_size.as_secs());
142
143 Self {
144 window_size,
145 bucket_size,
146 buckets: VecDeque::with_capacity(bucket_count as usize),
147 current_bucket: Vec::new(),
148 last_rotation: Self::current_timestamp(),
149 }
150 }
151
152 #[allow(dead_code)]
153 fn current_timestamp() -> u64 {
154 SystemTime::now()
155 .duration_since(UNIX_EPOCH)
156 .unwrap_or(Duration::from_secs(0))
157 .as_secs()
158 }
159
160 #[allow(dead_code)]
161 fn rotate_if_needed(&mut self) {
162 let now = Self::current_timestamp();
163 let elapsed = now.saturating_sub(self.last_rotation);
164
165 if elapsed >= self.bucket_size.as_secs() {
166 let rotations = elapsed / self.bucket_size.as_secs();
167
168 for _ in 0..rotations {
169 self.buckets.push_back(std::mem::take(&mut self.current_bucket));
170
171 let max_buckets = self.window_size.as_secs().div_ceil(self.bucket_size.as_secs());
173 while self.buckets.len() > max_buckets as usize {
174 self.buckets.pop_front();
175 }
176 }
177
178 self.last_rotation = now;
179 }
180 }
181
182 #[allow(dead_code)]
183 fn add_sample(&mut self, sample: DMSCMetricSample) {
184 self.rotate_if_needed();
185 self.current_bucket.push(sample);
186 }
187
188 #[allow(dead_code)]
189 fn get_samples(&self) -> Vec<DMSCMetricSample> {
190 let mut all_samples = Vec::new();
191
192 for bucket in &self.buckets {
193 all_samples.extend(bucket.iter().cloned());
194 }
195 all_samples.extend(self.current_bucket.iter().cloned());
196
197 all_samples
198 }
199
200 #[allow(dead_code)]
201 fn get_window_stats(&self) -> DMSCWindowStats {
202 let samples = self.get_samples();
203
204 if samples.is_empty() {
205 return DMSCWindowStats::default();
206 }
207
208 let mut sorted_values: Vec<f64> = samples.iter().map(|s| s.value).collect();
209 sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
210
211 let count = sorted_values.len();
212 let sum: f64 = sorted_values.iter().sum();
213 let min = sorted_values[0];
214 let max = sorted_values[count - 1];
215 let mean = sum / count as f64;
216
217 let variance: f64 = sorted_values
219 .iter()
220 .map(|x| (x - mean).powi(2))
221 .sum::<f64>() / count as f64;
222 let stddev = variance.sqrt();
223
224 let p50 = Self::quantile(&sorted_values, 0.50);
226 let p90 = Self::quantile(&sorted_values, 0.90);
227 let p95 = Self::quantile(&sorted_values, 0.95);
228 let p99 = Self::quantile(&sorted_values, 0.99);
229
230 DMSCWindowStats {
231 count: count as u64,
232 sum,
233 min,
234 max,
235 mean,
236 stddev,
237 p50,
238 p90,
239 p95,
240 p99,
241 }
242 }
243
244 #[allow(dead_code)]
245 fn quantile(sorted_values: &[f64], q: f64) -> f64 {
246 if sorted_values.is_empty() {
247 return 0.0;
248 }
249
250 let index = q * (sorted_values.len() - 1) as f64;
251 let lower = index.floor() as usize;
252 let upper = index.ceil() as usize;
253
254 if lower == upper {
255 sorted_values[lower]
256 } else {
257 let weight = index - lower as f64;
258 sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight
259 }
260 }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DMSCWindowStats {
266 pub count: u64,
267 pub sum: f64,
268 pub min: f64,
269 pub max: f64,
270 pub mean: f64,
271 pub stddev: f64,
272 pub p50: f64,
273 pub p90: f64,
274 pub p95: f64,
275 pub p99: f64,
276}
277
278impl Default for DMSCWindowStats {
279 fn default() -> Self {
280 Self {
281 count: 0,
282 sum: 0.0,
283 min: 0.0,
284 max: 0.0,
285 mean: 0.0,
286 stddev: 0.0,
287 p50: 0.0,
288 p90: 0.0,
289 p95: 0.0,
290 p99: 0.0,
291 }
292 }
293}
294
295#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
297pub struct DMSCMetric {
298 config: DMSCMetricConfig,
299 sliding_window: RwLock<DMSCSlidingWindow>,
300 total_count: RwLock<u64>,
301 #[allow(dead_code)]
302 total_sum: RwLock<f64>,
303}
304
305impl DMSCMetric {
306 pub fn new(config: DMSCMetricConfig) -> Self {
307 let sliding_window = DMSCSlidingWindow::new(
308 Duration::from_secs(300), Duration::from_secs(10), );
311
312 Self {
313 config,
314 sliding_window: RwLock::new(sliding_window),
315 total_count: RwLock::new(0),
316 total_sum: RwLock::new(0.0),
317 }
318 }
319
320 #[allow(dead_code)]
321 fn record(&self, value: f64, labels: Vec<(String, String)>) -> DMSCResult<()> {
322 let sample = DMSCMetricSample {
323 timestamp: Self::current_timestamp(),
324 value,
325 labels,
326 };
327
328 {
329 let mut window = self.sliding_window.write_safe("sliding window")?;
330 window.add_sample(sample);
331 }
332
333 {
334 let mut count = self.total_count.write_safe("total count")?;
335 *count += 1;
336 }
337
338 {
339 let mut sum = self.total_sum.write_safe("total sum")?;
340 *sum += value;
341 }
342
343 Ok(())
344 }
345
346 #[allow(dead_code)]
347 fn get_stats(&self) -> DMSCWindowStats {
348 match self.sliding_window.read_safe("sliding window stats") {
349 Ok(window) => window.get_window_stats(),
350 Err(_) => DMSCWindowStats::default(),
351 }
352 }
353
354 #[allow(dead_code)]
355 fn get_total_count(&self) -> u64 {
356 match self.total_count.read_safe("total count") {
357 Ok(count) => *count,
358 Err(_) => 0,
359 }
360 }
361
362 #[allow(dead_code)]
363 fn get_total_sum(&self) -> f64 {
364 match self.total_sum.read_safe("total sum") {
365 Ok(sum) => *sum,
366 Err(_) => 0.0,
367 }
368 }
369
370 fn get_config(&self) -> &DMSCMetricConfig {
371 &self.config
372 }
373
374 pub fn get_value(&self) -> f64 {
375 match self.total_count.read_safe("total count value") {
376 Ok(count) => *count as f64,
377 Err(_) => 0.0,
378 }
379 }
380
381 #[allow(dead_code)]
382 fn current_timestamp() -> u64 {
383 SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap_or_default()
386 .as_secs()
387 }
388}
389
390#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
392#[derive(Clone)]
393pub struct DMSCMetricsRegistry {
394 metrics: Arc<RwLock<HashMap<String, Arc<DMSCMetric>>>>,
395}
396
397impl Default for DMSCMetricsRegistry {
398 fn default() -> Self {
399 Self::new()
400 }
401}
402
403impl DMSCMetricsRegistry {
404 pub fn new() -> Self {
405 Self {
406 metrics: Arc::new(RwLock::new(HashMap::new())),
407 }
408 }
409
410 pub fn register(&self, metric: Arc<DMSCMetric>) -> DMSCResult<()> {
411 let name = metric.get_config().name.clone();
412 let mut metrics = self.metrics.write_safe("metrics registry")?;
413 metrics.insert(name, metric);
414 Ok(())
415 }
416
417 pub fn get_metric(&self, name: &str) -> Option<Arc<DMSCMetric>> {
418 match self.metrics.read_safe("metrics registry") {
419 Ok(metrics) => metrics.get(name).cloned(),
420 Err(_) => None,
421 }
422 }
423
424 pub fn get_all_metrics(&self) -> HashMap<String, Arc<DMSCMetric>> {
425 match self.metrics.read_safe("metrics registry") {
426 Ok(metrics) => metrics.clone(),
427 Err(_) => HashMap::new(),
428 }
429 }
430
431 #[cfg(feature = "observability")]
433 pub fn export_prometheus(&self) -> String {
434 let mut output = String::new();
435 let metrics = match self.metrics.read_safe("metrics registry for export") {
436 Ok(m) => m,
437 Err(_) => return "# Error: Failed to acquire metrics registry lock".to_string(),
438 };
439
440 for (name, metric) in metrics.iter() {
441 let config = metric.get_config();
442
443 output.push_str(&format!("# HELP {} {}\n", name, config.help));
445 output.push_str(&format!("# TYPE {} {:?}\n", name, config.metric_type));
446
447 let stats = metric.get_stats();
449 match config.metric_type {
450 DMSCMetricType::Counter => {
451 output.push_str(&format!("{} {}\n", name, metric.get_total_count()));
452 }
453 DMSCMetricType::Gauge => {
454 output.push_str(&format!("{} {}\n", name, stats.mean));
455 }
456 DMSCMetricType::Histogram => {
457 output.push_str(&format!("{}_count {}\n", name, stats.count));
458 output.push_str(&format!("{}_sum {}\n", name, stats.sum));
459 output.push_str(&format!("{}_min {}\n", name, stats.min));
460 output.push_str(&format!("{}_max {}\n", name, stats.max));
461 output.push_str(&format!("{}_avg {}\n", name, stats.mean));
462 output.push_str(&format!("{}_p50 {}\n", name, stats.p50));
463 output.push_str(&format!("{}_p90 {}\n", name, stats.p90));
464 output.push_str(&format!("{}_p95 {}\n", name, stats.p95));
465 output.push_str(&format!("{}_p99 {}\n", name, stats.p99));
466 }
467 DMSCMetricType::Summary => {
468 output.push_str(&format!("{} {}\n", name, stats.mean));
469 }
470 }
471
472 output.push('\n');
473 }
474
475 output
476 }
477}
478
479#[cfg(feature = "pyo3")]
480#[pyo3::prelude::pymethods]
482impl DMSCMetricsRegistry {
483 #[new]
485 fn py_new() -> Self {
486 Self::new()
487 }
488
489 #[pyo3(name = "register")]
491 fn register_py(&self, metric: &DMSCMetric) -> PyResult<()> {
492 let name = metric.config.name.clone();
493 let mut metrics = self.metrics.write_safe("metrics registry")
494 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
495 metrics.insert(name, Arc::new(DMSCMetric::new(metric.config.clone())));
496 Ok(())
497 }
498
499 #[pyo3(name = "get_metric_value")]
501 fn get_metric_value_impl(&self, name: &str) -> Option<f64> {
502 self.get_metric(name).map(|m| m.get_value())
503 }
504
505 #[pyo3(name = "get_all_metric_names")]
507 fn get_all_metric_names_impl(&self) -> Vec<String> {
508 let metrics = match self.metrics.read_safe("metrics registry for names") {
509 Ok(m) => m,
510 Err(_) => return Vec::new(),
511 };
512 metrics.keys().cloned().collect()
513 }
514
515 #[pyo3(name = "export_prometheus")]
517 fn export_prometheus_impl(&self) -> String {
518 #[cfg(feature = "observability")]
519 {
520 self.export_prometheus()
521 }
522 #[cfg(not(feature = "observability"))]
523 {
524 "# Observability feature not enabled".to_string()
525 }
526 }
527
528 #[pyo3(name = "get_metric_count")]
530 fn get_metric_count_impl(&self) -> usize {
531 let metrics = match self.metrics.read_safe("metrics registry for count") {
532 Ok(m) => m,
533 Err(_) => return 0,
534 };
535 metrics.len()
536 }
537}
538
539#[cfg(feature = "pyo3")]
540#[pyo3::prelude::pymethods]
541impl DMSCMetricConfig {
542 #[new]
543 #[pyo3(signature = (name, metric_type, help="", buckets=None, quantiles=None))]
544 fn py_new(
545 name: String,
546 metric_type: DMSCMetricType,
547 help: &str,
548 buckets: Option<Vec<f64>>,
549 quantiles: Option<Vec<f64>>,
550 ) -> Self {
551 Self {
552 name,
553 metric_type,
554 help: help.to_string(),
555 buckets: buckets.unwrap_or_else(|| vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
556 quantiles: quantiles.unwrap_or_else(|| vec![0.5, 0.9, 0.95, 0.99]),
557 max_age: Duration::from_secs(600),
558 age_buckets: 5,
559 }
560 }
561}
562
563#[cfg(feature = "pyo3")]
564#[pyo3::prelude::pymethods]
565impl DMSCMetric {
566 #[new]
567 fn py_new(config: DMSCMetricConfig) -> Self {
568 Self::new(config)
569 }
570
571 #[pyo3(name = "record")]
572 fn record_py(&self, value: f64) -> PyResult<()> {
573 self.record(value, vec![])
574 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))
575 }
576
577 #[pyo3(name = "get_value")]
578 fn get_value_py(&self) -> f64 {
579 self.get_value()
580 }
581
582 #[pyo3(name = "get_total_count")]
583 fn get_total_count_py(&self) -> u64 {
584 self.get_total_count()
585 }
586}