dmsc/observability/
metrics.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Metrics Collection and Aggregation Module
21//! 
22//! This module provides a comprehensive metrics collection and aggregation system for DMSC.
23//! It supports various metric types, sliding window aggregation, and Prometheus-compatible export.
24//! 
25//! ## Key Components
26//! 
27//! - **DMSCMetricType**: Enum defining supported metric types (Counter, Gauge, Histogram, Summary)
28//! - **DMSCMetricSample**: Represents a single metric sample with timestamp, value, and labels
29//! - **DMSCMetricConfig**: Configuration for creating metrics
30//! - **DMSCSlidingWindow**: Internal sliding time window for metric aggregation
31//! - **DMSCWindowStats**: Aggregated statistics from the sliding window
32//! - **DMSCMetric**: Individual metric with sliding window aggregation
33//! - **DMSCMetricsRegistry**: Registry for managing multiple metrics
34//! 
35//! ## Design Principles
36//! 
37//! 1. **Multiple Metric Types**: Supports Counter, Gauge, Histogram, and Summary metrics
38//! 2. **Sliding Window Aggregation**: Efficiently aggregates metrics over configurable time windows
39//! 3. **Thread Safety**: Uses Arc and RwLock for safe concurrent access
40//! 4. **Prometheus Compatible**: Exports metrics in Prometheus format
41//! 5. **Label Support**: Allows adding custom labels to metric samples
42//! 6. **Configurable**: Supports custom window sizes, bucket sizes, and other parameters
43//! 7. **Type Safety**: Strongly typed metrics with compile-time checks
44//! 8. **Efficient Memory Usage**: Automatically rotates and prunes old metric data
45//! 
46//! ## Usage
47//! 
48//! ```rust
49//! use dmsc::prelude::*;
50//! use std::time::Duration;
51//! 
52//! fn example() -> DMSCResult<()> {
53//!     // Create a metrics registry
54//!     let registry = DMSCMetricsRegistry::new();
55//!     
56//!     // Configure a counter metric
57//!     let counter_config = DMSCMetricConfig {
58//!         metric_type: DMSCMetricType::Counter,
59//!         name: "http_requests_total".to_string(),
60//!         help: "Total number of HTTP requests".to_string(),
61//!         buckets: Vec::new(),
62//!         quantiles: Vec::new(),
63//!         max_age: Duration::from_secs(300),
64//!         age_buckets: 5,
65//!     };
66//!     
67//!     // Create and register the metric
68//!     let counter = Arc::new(DMSCMetric::new(counter_config));
69//!     registry.register(counter.clone())?;
70//!     
71//!     // Record some metrics
72//!     counter.record(1.0, vec![("method".to_string(), "GET".to_string())])?;
73//!     counter.record(1.0, vec![("method".to_string(), "POST".to_string())])?;
74//!     
75//!     // Export metrics in Prometheus format
76//!     let prometheus_output = registry.export_prometheus();
77//!     println!("{}", prometheus_output);
78//!     
79//!     Ok(())
80//! }
81//! ```
82
83use std::collections::{VecDeque, HashMap};
84use std::sync::{Arc, RwLock};
85use std::time::{SystemTime, UNIX_EPOCH, Duration};
86use serde::{Serialize, Deserialize};
87
88#[cfg(feature = "pyo3")]
89use pyo3::prelude::*;
90
91use crate::core::DMSCResult;
92use crate::core::lock::RwLockExtensions;
93
94/// Metric types supported
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
97pub enum DMSCMetricType {
98    Counter,
99    Gauge,
100    Histogram,
101    Summary,
102}
103
104/// A single metric sample
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
107pub struct DMSCMetricSample {
108    pub timestamp: u64, // seconds since epoch
109    pub value: f64,
110    pub labels: Vec<(String, String)>,
111}
112
113/// Metric configuration
114#[derive(Debug, Clone)]
115#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
116pub struct DMSCMetricConfig {
117    pub metric_type: DMSCMetricType,
118    pub name: String,
119    pub help: String,
120    pub buckets: Vec<f64>, // for histogram
121    pub quantiles: Vec<f64>, // for summary
122    pub max_age: Duration, // for summary
123    pub age_buckets: usize, // for summary
124}
125
126/// Sliding time window for metric aggregation
127#[allow(dead_code)]
128struct DMSCSlidingWindow {
129    #[allow(dead_code)]
130    window_size: Duration,
131    #[allow(dead_code)]
132    bucket_size: Duration,
133    buckets: VecDeque<Vec<DMSCMetricSample>>,
134    current_bucket: Vec<DMSCMetricSample>,
135    #[allow(dead_code)]
136    last_rotation: u64,
137}
138
139impl DMSCSlidingWindow {
140    fn new(window_size: Duration, bucket_size: Duration) -> Self {
141        let bucket_count = window_size.as_secs().div_ceil(bucket_size.as_secs());
142        
143        Self {
144            window_size,
145            bucket_size,
146            buckets: VecDeque::with_capacity(bucket_count as usize),
147            current_bucket: Vec::new(),
148            last_rotation: Self::current_timestamp(),
149        }
150    }
151    
152    #[allow(dead_code)]
153    fn current_timestamp() -> u64 {
154        SystemTime::now()
155            .duration_since(UNIX_EPOCH)
156            .unwrap_or(Duration::from_secs(0))
157            .as_secs()
158    }
159    
160    #[allow(dead_code)]
161    fn rotate_if_needed(&mut self) {
162        let now = Self::current_timestamp();
163        let elapsed = now.saturating_sub(self.last_rotation);
164        
165        if elapsed >= self.bucket_size.as_secs() {
166            let rotations = elapsed / self.bucket_size.as_secs();
167            
168            for _ in 0..rotations {
169                self.buckets.push_back(std::mem::take(&mut self.current_bucket));
170                
171                // Remove old buckets outside window
172                let max_buckets = self.window_size.as_secs().div_ceil(self.bucket_size.as_secs());
173                while self.buckets.len() > max_buckets as usize {
174                    self.buckets.pop_front();
175                }
176            }
177            
178            self.last_rotation = now;
179        }
180    }
181    
182    #[allow(dead_code)]
183    fn add_sample(&mut self, sample: DMSCMetricSample) {
184        self.rotate_if_needed();
185        self.current_bucket.push(sample);
186    }
187    
188    #[allow(dead_code)]
189    fn get_samples(&self) -> Vec<DMSCMetricSample> {
190        let mut all_samples = Vec::new();
191        
192        for bucket in &self.buckets {
193            all_samples.extend(bucket.iter().cloned());
194        }
195        all_samples.extend(self.current_bucket.iter().cloned());
196        
197        all_samples
198    }
199    
200    #[allow(dead_code)]
201    fn get_window_stats(&self) -> DMSCWindowStats {
202        let samples = self.get_samples();
203        
204        if samples.is_empty() {
205            return DMSCWindowStats::default();
206        }
207        
208        let mut sorted_values: Vec<f64> = samples.iter().map(|s| s.value).collect();
209        sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
210        
211        let count = sorted_values.len();
212        let sum: f64 = sorted_values.iter().sum();
213        let min = sorted_values[0];
214        let max = sorted_values[count - 1];
215        let mean = sum / count as f64;
216        
217        // Calculate variance and standard deviation
218        let variance: f64 = sorted_values
219            .iter()
220            .map(|x| (x - mean).powi(2))
221            .sum::<f64>() / count as f64;
222        let stddev = variance.sqrt();
223        
224        // Calculate quantiles
225        let p50 = Self::quantile(&sorted_values, 0.50);
226        let p90 = Self::quantile(&sorted_values, 0.90);
227        let p95 = Self::quantile(&sorted_values, 0.95);
228        let p99 = Self::quantile(&sorted_values, 0.99);
229        
230        DMSCWindowStats {
231            count: count as u64,
232            sum,
233            min,
234            max,
235            mean,
236            stddev,
237            p50,
238            p90,
239            p95,
240            p99,
241        }
242    }
243    
244    #[allow(dead_code)]
245    fn quantile(sorted_values: &[f64], q: f64) -> f64 {
246        if sorted_values.is_empty() {
247            return 0.0;
248        }
249        
250        let index = q * (sorted_values.len() - 1) as f64;
251        let lower = index.floor() as usize;
252        let upper = index.ceil() as usize;
253        
254        if lower == upper {
255            sorted_values[lower]
256        } else {
257            let weight = index - lower as f64;
258            sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight
259        }
260    }
261}
262
263/// Window statistics
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DMSCWindowStats {
266    pub count: u64,
267    pub sum: f64,
268    pub min: f64,
269    pub max: f64,
270    pub mean: f64,
271    pub stddev: f64,
272    pub p50: f64,
273    pub p90: f64,
274    pub p95: f64,
275    pub p99: f64,
276}
277
278impl Default for DMSCWindowStats {
279    fn default() -> Self {
280        Self {
281            count: 0,
282            sum: 0.0,
283            min: 0.0,
284            max: 0.0,
285            mean: 0.0,
286            stddev: 0.0,
287            p50: 0.0,
288            p90: 0.0,
289            p95: 0.0,
290            p99: 0.0,
291        }
292    }
293}
294
295/// A single metric with sliding window aggregation
296#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
297pub struct DMSCMetric {
298    config: DMSCMetricConfig,
299    sliding_window: RwLock<DMSCSlidingWindow>,
300    total_count: RwLock<u64>,
301    #[allow(dead_code)]
302    total_sum: RwLock<f64>,
303}
304
305impl DMSCMetric {
306    pub fn new(config: DMSCMetricConfig) -> Self {
307        let sliding_window = DMSCSlidingWindow::new(
308            Duration::from_secs(300), // 5 minute window
309            Duration::from_secs(10),  // 10 second buckets
310        );
311        
312        Self {
313            config,
314            sliding_window: RwLock::new(sliding_window),
315            total_count: RwLock::new(0),
316            total_sum: RwLock::new(0.0),
317        }
318    }
319    
320    #[allow(dead_code)]
321    fn record(&self, value: f64, labels: Vec<(String, String)>) -> DMSCResult<()> {
322        let sample = DMSCMetricSample {
323            timestamp: Self::current_timestamp(),
324            value,
325            labels,
326        };
327        
328        {
329            let mut window = self.sliding_window.write_safe("sliding window")?;
330            window.add_sample(sample);
331        }
332        
333        {
334            let mut count = self.total_count.write_safe("total count")?;
335            *count += 1;
336        }
337        
338        {
339            let mut sum = self.total_sum.write_safe("total sum")?;
340            *sum += value;
341        }
342        
343        Ok(())
344    }
345    
346    #[allow(dead_code)]
347    fn get_stats(&self) -> DMSCWindowStats {
348        match self.sliding_window.read_safe("sliding window stats") {
349            Ok(window) => window.get_window_stats(),
350            Err(_) => DMSCWindowStats::default(),
351        }
352    }
353    
354    #[allow(dead_code)]
355    fn get_total_count(&self) -> u64 {
356        match self.total_count.read_safe("total count") {
357            Ok(count) => *count,
358            Err(_) => 0,
359        }
360    }
361    
362    #[allow(dead_code)]
363    fn get_total_sum(&self) -> f64 {
364        match self.total_sum.read_safe("total sum") {
365            Ok(sum) => *sum,
366            Err(_) => 0.0,
367        }
368    }
369    
370    fn get_config(&self) -> &DMSCMetricConfig {
371        &self.config
372    }
373
374    pub fn get_value(&self) -> f64 {
375        match self.total_count.read_safe("total count value") {
376            Ok(count) => *count as f64,
377            Err(_) => 0.0,
378        }
379    }
380
381    #[allow(dead_code)]
382    fn current_timestamp() -> u64 {
383        SystemTime::now()
384            .duration_since(UNIX_EPOCH)
385            .unwrap_or_default()
386            .as_secs()
387    }
388}
389
390/// Metrics registry to manage multiple metrics
391#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
392#[derive(Clone)]
393pub struct DMSCMetricsRegistry {
394    metrics: Arc<RwLock<HashMap<String, Arc<DMSCMetric>>>>,
395}
396
397impl Default for DMSCMetricsRegistry {
398    fn default() -> Self {
399        Self::new()
400    }
401}
402
403impl DMSCMetricsRegistry {
404    pub fn new() -> Self {
405        Self {
406            metrics: Arc::new(RwLock::new(HashMap::new())),
407        }
408    }
409    
410    pub fn register(&self, metric: Arc<DMSCMetric>) -> DMSCResult<()> {
411        let name = metric.get_config().name.clone();
412        let mut metrics = self.metrics.write_safe("metrics registry")?;
413        metrics.insert(name, metric);
414        Ok(())
415    }
416    
417    pub fn get_metric(&self, name: &str) -> Option<Arc<DMSCMetric>> {
418        match self.metrics.read_safe("metrics registry") {
419            Ok(metrics) => metrics.get(name).cloned(),
420            Err(_) => None,
421        }
422    }
423    
424    pub fn get_all_metrics(&self) -> HashMap<String, Arc<DMSCMetric>> {
425        match self.metrics.read_safe("metrics registry") {
426            Ok(metrics) => metrics.clone(),
427            Err(_) => HashMap::new(),
428        }
429    }
430    
431    /// Export metrics in Prometheus format
432    #[cfg(feature = "observability")]
433    pub fn export_prometheus(&self) -> String {
434        let mut output = String::new();
435        let metrics = match self.metrics.read_safe("metrics registry for export") {
436            Ok(m) => m,
437            Err(_) => return "# Error: Failed to acquire metrics registry lock".to_string(),
438        };
439        
440        for (name, metric) in metrics.iter() {
441            let config = metric.get_config();
442            
443            // Write help and type
444            output.push_str(&format!("# HELP {} {}\n", name, config.help));
445            output.push_str(&format!("# TYPE {} {:?}\n", name, config.metric_type));
446            
447            // Write metric value
448            let stats = metric.get_stats();
449            match config.metric_type {
450                DMSCMetricType::Counter => {
451                    output.push_str(&format!("{} {}\n", name, metric.get_total_count()));
452                }
453                DMSCMetricType::Gauge => {
454                    output.push_str(&format!("{} {}\n", name, stats.mean));
455                }
456                DMSCMetricType::Histogram => {
457                    output.push_str(&format!("{}_count {}\n", name, stats.count));
458                    output.push_str(&format!("{}_sum {}\n", name, stats.sum));
459                    output.push_str(&format!("{}_min {}\n", name, stats.min));
460                    output.push_str(&format!("{}_max {}\n", name, stats.max));
461                    output.push_str(&format!("{}_avg {}\n", name, stats.mean));
462                    output.push_str(&format!("{}_p50 {}\n", name, stats.p50));
463                    output.push_str(&format!("{}_p90 {}\n", name, stats.p90));
464                    output.push_str(&format!("{}_p95 {}\n", name, stats.p95));
465                    output.push_str(&format!("{}_p99 {}\n", name, stats.p99));
466                }
467                DMSCMetricType::Summary => {
468                    output.push_str(&format!("{} {}\n", name, stats.mean));
469                }
470            }
471            
472            output.push('\n');
473        }
474        
475        output
476    }
477}
478
479#[cfg(feature = "pyo3")]
480/// Python methods for DMSCMetricsRegistry
481#[pyo3::prelude::pymethods]
482impl DMSCMetricsRegistry {
483    /// Create a new metrics registry from Python
484    #[new]
485    fn py_new() -> Self {
486        Self::new()
487    }
488    
489    /// Register a metric from Python
490    #[pyo3(name = "register")]
491    fn register_py(&self, metric: &DMSCMetric) -> PyResult<()> {
492        let name = metric.config.name.clone();
493        let mut metrics = self.metrics.write_safe("metrics registry")
494            .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
495        metrics.insert(name, Arc::new(DMSCMetric::new(metric.config.clone())));
496        Ok(())
497    }
498    
499    /// Get a metric's current value by name from Python
500    #[pyo3(name = "get_metric_value")]
501    fn get_metric_value_impl(&self, name: &str) -> Option<f64> {
502        self.get_metric(name).map(|m| m.get_value())
503    }
504
505    /// Get all metric names from Python
506    #[pyo3(name = "get_all_metric_names")]
507    fn get_all_metric_names_impl(&self) -> Vec<String> {
508        let metrics = match self.metrics.read_safe("metrics registry for names") {
509            Ok(m) => m,
510            Err(_) => return Vec::new(),
511        };
512        metrics.keys().cloned().collect()
513    }
514    
515    /// Export metrics in Prometheus format from Python
516    #[pyo3(name = "export_prometheus")]
517    fn export_prometheus_impl(&self) -> String {
518        #[cfg(feature = "observability")]
519        {
520            self.export_prometheus()
521        }
522        #[cfg(not(feature = "observability"))]
523        {
524            "# Observability feature not enabled".to_string()
525        }
526    }
527    
528    /// Get metric count from Python
529    #[pyo3(name = "get_metric_count")]
530    fn get_metric_count_impl(&self) -> usize {
531        let metrics = match self.metrics.read_safe("metrics registry for count") {
532            Ok(m) => m,
533            Err(_) => return 0,
534        };
535        metrics.len()
536    }
537}
538
539#[cfg(feature = "pyo3")]
540#[pyo3::prelude::pymethods]
541impl DMSCMetricConfig {
542    #[new]
543    #[pyo3(signature = (name, metric_type, help="", buckets=None, quantiles=None))]
544    fn py_new(
545        name: String,
546        metric_type: DMSCMetricType,
547        help: &str,
548        buckets: Option<Vec<f64>>,
549        quantiles: Option<Vec<f64>>,
550    ) -> Self {
551        Self {
552            name,
553            metric_type,
554            help: help.to_string(),
555            buckets: buckets.unwrap_or_else(|| vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
556            quantiles: quantiles.unwrap_or_else(|| vec![0.5, 0.9, 0.95, 0.99]),
557            max_age: Duration::from_secs(600),
558            age_buckets: 5,
559        }
560    }
561}
562
563#[cfg(feature = "pyo3")]
564#[pyo3::prelude::pymethods]
565impl DMSCMetric {
566    #[new]
567    fn py_new(config: DMSCMetricConfig) -> Self {
568        Self::new(config)
569    }
570    
571    #[pyo3(name = "record")]
572    fn record_py(&self, value: f64) -> PyResult<()> {
573        self.record(value, vec![])
574            .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))
575    }
576    
577    #[pyo3(name = "get_value")]
578    fn get_value_py(&self) -> f64 {
579        self.get_value()
580    }
581    
582    #[pyo3(name = "get_total_count")]
583    fn get_total_count_py(&self) -> u64 {
584        self.get_total_count()
585    }
586}