dmsc/service_mesh/
traffic_management.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18//! # Traffic Management Module
19//! 
20//! This module provides traffic management functionality for the DMSC service mesh. It allows
21//! configuring and managing traffic routes, traffic splits, circuit breakers, rate limits,
22//! and fault injection for services in the mesh.
23//! 
24//! ## Key Components
25//! 
26//! - **DMSCTrafficRoute**: Configuration for routing traffic between services
27//! - **DMSCMatchCriteria**: Criteria for matching requests to routes
28//! - **DMSCRouteAction**: Action to take for matched requests
29//! - **DMSCWeightedDestination**: Weighted destination for traffic splitting
30//! - **DMSCRetryPolicy**: Configuration for request retries
31//! - **DMSCFaultInjection**: Configuration for fault injection
32//! - **DMSCTrafficSplit**: Configuration for splitting traffic between service subsets
33//! - **DMSCSubset**: Service subset definition for traffic splitting
34//! - **DMSCTrafficManager**: Main traffic management service
35//! - **DMSCCircuitBreakerConfig**: Configuration for circuit breakers
36//! - **DMSCRateLimitConfig**: Configuration for rate limiting
37//! 
38//! ## Design Principles
39//! 
40//! 1. **Declarative Configuration**: Traffic rules are defined declaratively
41//! 2. **Flexible Routing**: Supports multiple routing actions (route, redirect, direct response)
42//! 3. **Traffic Splitting**: Weighted traffic splitting between service subsets
43//! 4. **Resilience**: Built-in retry policies and circuit breakers
44//! 5. **Fault Injection**: Support for fault injection for testing resilience
45//! 6. **Rate Limiting**: Protection against excessive traffic
46//! 7. **Timeout Management**: Configurable request timeouts
47//! 8. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
48//! 9. **Graceful Shutdown**: Proper cleanup of background tasks
49//! 10. **Extensible**: Easy to add new traffic management features
50//! 
51//! ## Usage
52//! 
53//! ```rust
54//! use dmsc::prelude::*;
55//! use std::time::Duration;
56//! 
57//! async fn example() -> DMSCResult<()> {
58//!     // Create a traffic manager
59//!     let traffic_manager = DMSCTrafficManager::new(true);
60//!     
61//!     // Create a traffic route
62//!     let route = DMSCTrafficRoute {
63//!         name: "http-route".to_string(),
64//!         source_service: "gateway".to_string(),
65//!         destination_service: "backend".to_string(),
66//!         match_criteria: DMSCMatchCriteria {
67//!             path_prefix: Some("/api".to_string()),
68//!             headers: HashMap::new(),
69//!             method: Some("GET".to_string()),
70//!             query_parameters: HashMap::new(),
71//!         },
72//!         route_action: DMSCRouteAction::Route(vec![DMSCWeightedDestination {
73//!             service: "backend-v1".to_string(),
74//!             weight: 80,
75//!             subset: None,
76//!         }, DMSCWeightedDestination {
77//!             service: "backend-v2".to_string(),
78//!             weight: 20,
79//!             subset: None,
80//!         }]),
81//!         retry_policy: Some(DMSCRetryPolicy {
82//!             attempts: 3,
83//!             per_try_timeout: Duration::from_secs(1),
84//!             retry_on: vec!["5xx".to_string()],
85//!         }),
86//!         timeout: Some(Duration::from_secs(5)),
87//!         fault_injection: None,
88//!     };
89//!     
90//!     // Add the route
91//!     traffic_manager.add_traffic_route(route).await?;
92//!     
93//!     // Set a circuit breaker
94//!     let cb_config = DMSCCircuitBreakerConfig {
95//!         consecutive_errors: 5,
96//!         interval: Duration::from_secs(10),
97//!         base_ejection_time: Duration::from_secs(30),
98//!         max_ejection_percent: 50.0,
99//!     };
100//!     traffic_manager.set_circuit_breaker_config("backend", cb_config).await?;
101//!     
102//!     Ok(())
103//! }
104//! ```
105
106use serde::{Deserialize, Serialize};
107use std::collections::HashMap;
108use std::sync::Arc;
109use std::time::Duration;
110use tokio::sync::RwLock;
111use tokio::task::JoinHandle;
112#[cfg(feature = "http_client")]
113use reqwest;
114
115#[cfg(feature = "pyo3")]
116use pyo3::PyResult;
117
118use crate::core::{DMSCResult, DMSCError};
119use crate::observability::{DMSCTracer, DMSCSpanKind, DMSCSpanStatus};
120#[cfg(feature = "http_client")]
121use crate::observability::DMSCContextCarrier;
122
123#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DMSCTrafficRoute {
126    pub name: String,
127    pub source_service: String,
128    pub destination_service: String,
129    pub match_criteria: DMSCMatchCriteria,
130    pub route_action: DMSCRouteAction,
131    pub retry_policy: Option<DMSCRetryPolicy>,
132    pub timeout: Option<Duration>,
133    pub fault_injection: Option<DMSCFaultInjection>,
134}
135
136#[cfg(feature = "pyo3")]
137#[pyo3::prelude::pymethods]
138impl DMSCTrafficRoute {
139    #[new]
140    fn py_new(name: String, source_service: String, destination_service: String) -> Self {
141        Self {
142            name,
143            source_service,
144            destination_service,
145            match_criteria: DMSCMatchCriteria {
146                path_prefix: None,
147                headers: HashMap::new(),
148                method: None,
149                query_parameters: HashMap::new(),
150            },
151            route_action: DMSCRouteAction::Route(vec![]),
152            retry_policy: None,
153            timeout: None,
154            fault_injection: None,
155        }
156    }
157    
158    fn get_name(&self) -> String {
159        self.name.clone()
160    }
161    
162    fn get_source_service(&self) -> String {
163        self.source_service.clone()
164    }
165    
166    fn get_destination_service(&self) -> String {
167        self.destination_service.clone()
168    }
169}
170
171#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct DMSCMatchCriteria {
174    pub path_prefix: Option<String>,
175    pub headers: HashMap<String, String>,
176    pub method: Option<String>,
177    pub query_parameters: HashMap<String, String>,
178}
179
180#[cfg(feature = "pyo3")]
181#[pyo3::prelude::pymethods]
182impl DMSCMatchCriteria {
183    #[new]
184    fn py_new() -> Self {
185        Self {
186            path_prefix: None,
187            headers: HashMap::new(),
188            method: None,
189            query_parameters: HashMap::new(),
190        }
191    }
192    
193    fn get_path_prefix(&self) -> Option<String> {
194        self.path_prefix.clone()
195    }
196    
197    fn get_method(&self) -> Option<String> {
198        self.method.clone()
199    }
200}
201
202#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum DMSCRouteAction {
205    Route(Vec<DMSCWeightedDestination>),
206    Redirect(String),
207    DirectResponse(u16, String),
208}
209
210#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct DMSCWeightedDestination {
213    pub service: String,
214    pub weight: u32,
215    pub subset: Option<String>,
216}
217
218#[cfg(feature = "pyo3")]
219#[pyo3::prelude::pymethods]
220impl DMSCWeightedDestination {
221    #[new]
222    fn py_new(service: String, weight: u32) -> Self {
223        Self {
224            service,
225            weight,
226            subset: None,
227        }
228    }
229    
230    fn get_service(&self) -> String {
231        self.service.clone()
232    }
233    
234    fn get_weight(&self) -> u32 {
235        self.weight
236    }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DMSCRetryPolicy {
241    pub attempts: u32,
242    pub per_try_timeout: Duration,
243    pub retry_on: Vec<String>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct DMSCFaultInjection {
248    pub delay: Option<DMSCDelayFault>,
249    pub abort: Option<DMSCAbortFault>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct DMSCDelayFault {
254    pub percentage: f64,
255    pub fixed_delay: Duration,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct DMSCAbortFault {
260    pub percentage: f64,
261    pub http_status: u16,
262}
263
264#[derive(Debug, Clone)]
265pub struct DMSCTrafficSplit {
266    pub service: String,
267    pub subsets: HashMap<String, DMSCSubset>,
268    pub default_subset: String,
269}
270
271#[derive(Debug, Clone)]
272pub struct DMSCSubset {
273    pub name: String,
274    pub labels: HashMap<String, String>,
275    pub weight: u32,
276}
277
278#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
279pub struct DMSCTrafficManager {
280    enabled: bool,
281    routes: Arc<RwLock<HashMap<String, Vec<DMSCTrafficRoute>>>>,
282    traffic_splits: Arc<RwLock<HashMap<String, DMSCTrafficSplit>>>,
283    circuit_breakers: Arc<RwLock<HashMap<String, DMSCCircuitBreakerConfig>>>,
284    rate_limits: Arc<RwLock<HashMap<String, DMSCRateLimitConfig>>>,
285    background_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
286    #[cfg(feature = "http_client")]
287    http_client: reqwest::Client,
288    tracer: Option<Arc<DMSCTracer>>,
289}
290
291#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct DMSCCircuitBreakerConfig {
294    pub consecutive_errors: u32,
295    pub interval: Duration,
296    pub base_ejection_time: Duration,
297    pub max_ejection_percent: f64,
298}
299
300#[cfg(feature = "pyo3")]
301#[pyo3::prelude::pymethods]
302impl DMSCCircuitBreakerConfig {
303    #[new]
304    fn py_new(consecutive_errors: u32, max_ejection_percent: f64) -> Self {
305        Self {
306            consecutive_errors,
307            interval: Duration::from_secs(10),
308            base_ejection_time: Duration::from_secs(30),
309            max_ejection_percent,
310        }
311    }
312    
313    fn get_consecutive_errors(&self) -> u32 {
314        self.consecutive_errors
315    }
316    
317    fn get_max_ejection_percent(&self) -> f64 {
318        self.max_ejection_percent
319    }
320}
321
322#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct DMSCRateLimitConfig {
325    pub requests_per_second: u32,
326    pub burst_size: u32,
327    pub window: Duration,
328}
329
330impl DMSCTrafficManager {
331    pub fn new(enabled: bool) -> Self {
332        Self {
333            enabled,
334            routes: Arc::new(RwLock::new(HashMap::new())),
335            traffic_splits: Arc::new(RwLock::new(HashMap::new())),
336            circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
337            rate_limits: Arc::new(RwLock::new(HashMap::new())),
338            background_tasks: Arc::new(RwLock::new(Vec::new())),
339            #[cfg(feature = "http_client")]
340            http_client: reqwest::Client::builder()
341                .timeout(Duration::from_secs(30))
342                .connect_timeout(Duration::from_secs(10))
343                .build()
344                .unwrap_or_else(|_| reqwest::Client::new()),
345            tracer: None,
346        }
347    }
348    
349    pub fn with_tracer(mut self, tracer: Arc<DMSCTracer>) -> Self {
350        self.tracer = Some(tracer);
351        self
352    }
353    
354    pub fn set_tracer(&mut self, tracer: Arc<DMSCTracer>) {
355        self.tracer = Some(tracer);
356    }
357
358    pub async fn add_traffic_route(&self, route: DMSCTrafficRoute) -> DMSCResult<()> {
359        if !self.enabled {
360            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
361        }
362
363        let mut routes = self.routes.write().await;
364        routes.entry(route.source_service.clone())
365            .or_insert_with(Vec::new)
366            .push(route);
367
368        Ok(())
369    }
370
371    pub async fn remove_traffic_route(&self, source_service: &str, route_name: &str) -> DMSCResult<()> {
372        if !self.enabled {
373            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
374        }
375
376        let mut routes = self.routes.write().await;
377        if let Some(service_routes) = routes.get_mut(source_service) {
378            service_routes.retain(|r| r.name != route_name);
379            
380            if service_routes.is_empty() {
381                routes.remove(source_service);
382            }
383        }
384
385        Ok(())
386    }
387
388    pub async fn get_traffic_routes(&self, source_service: &str) -> DMSCResult<Vec<DMSCTrafficRoute>> {
389        if !self.enabled {
390            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
391        }
392
393        let routes = self.routes.read().await;
394        let service_routes = routes.get(source_service)
395            .cloned()
396            .unwrap_or_default();
397
398        Ok(service_routes)
399    }
400
401    pub async fn create_traffic_split(&self, split: DMSCTrafficSplit) -> DMSCResult<()> {
402        if !self.enabled {
403            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
404        }
405
406        let mut traffic_splits = self.traffic_splits.write().await;
407        traffic_splits.insert(split.service.clone(), split);
408
409        Ok(())
410    }
411
412    pub async fn get_traffic_split(&self, service: &str) -> DMSCResult<Option<DMSCTrafficSplit>> {
413        if !self.enabled {
414            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
415        }
416
417        let traffic_splits = self.traffic_splits.read().await;
418        Ok(traffic_splits.get(service).cloned())
419    }
420
421    pub async fn route_request(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
422        let span_id = if let Some(tracer) = &self.tracer {
423            let span_id = tracer.start_span_from_context(
424                format!("route_request:{}", endpoint),
425                DMSCSpanKind::Client,
426            );
427            if let Some(ref sid) = span_id {
428                let _ = tracer.span_mut(sid, |span| {
429                    span.set_attribute("endpoint".to_string(), endpoint.to_string());
430                    span.set_attribute("request_size".to_string(), request_data.len().to_string());
431                });
432            }
433            span_id
434        } else {
435            None
436        };
437
438        let result = self.route_request_internal(endpoint, request_data).await;
439
440        if let (Some(tracer), Some(sid)) = (&self.tracer, span_id) {
441            let status = match &result {
442                Ok(_) => DMSCSpanStatus::Ok,
443                Err(e) => DMSCSpanStatus::Error(e.to_string()),
444            };
445            let _ = tracer.end_span(&sid, status);
446        }
447
448        result
449    }
450    
451    async fn route_request_internal(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
452        if !self.enabled {
453            return Ok(request_data);
454        }
455
456        if let Some(fault_injection) = self.should_inject_fault() {
457            self.inject_fault(&fault_injection).await?;
458        }
459
460        if self.should_rate_limit(endpoint).await? {
461            return Err(DMSCError::ServiceMesh("Rate limit exceeded".to_string()));
462        }
463
464        let transformed_request = self.apply_traffic_policies(request_data).await;
465        
466        if let Some(matching_route) = self.find_matching_route(endpoint).await {
467            return self.apply_route(&matching_route, endpoint, transformed_request).await;
468        }
469        
470        self.make_http_request(endpoint, transformed_request).await
471    }
472    
473    /// Finds a matching traffic route for the given endpoint
474    async fn find_matching_route(&self, endpoint: &str) -> Option<DMSCTrafficRoute> {
475        let routes = self.routes.read().await;
476        
477        // Iterate through all routes to find a match
478        for (_source_service, service_routes) in &*routes {
479            for route in service_routes {
480                if self.is_route_match(route, endpoint) {
481                    return Some(route.clone());
482                }
483            }
484        }
485        
486        None
487    }
488    
489    /// Checks if a route matches the given endpoint
490    fn is_route_match(&self, _route: &DMSCTrafficRoute, _endpoint: &str) -> bool {
491        #[cfg(feature = "http_client")]
492        if let Ok(url) = _endpoint.parse::<reqwest::Url>() {
493            let host = url.host_str().unwrap_or("");
494            if _route.destination_service.contains(host) {
495                return true;
496            }
497        }
498        false
499    }
500    
501    /// Applies a matched route to the request
502    async fn apply_route(&self, route: &DMSCTrafficRoute, original_endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
503        // Handle different route actions
504        match &route.route_action {
505            DMSCRouteAction::Route(destinations) => {
506                // Select destination index based on weights
507                let selected_index = self.select_destination_index(destinations).await;
508                let mut selected_destination = destinations[selected_index].clone();
509                
510                // Apply traffic splitting if configured
511                if let Some(split_destination) = self.apply_traffic_split(&selected_destination.service).await {
512                    // Override service name with split destination
513                    selected_destination.service = split_destination;
514                }
515                
516                // Replace endpoint with selected destination
517                let new_endpoint = self.replace_endpoint(original_endpoint, &selected_destination).await;
518                
519                // Apply retry policy if configured
520                if let Some(retry_policy) = &route.retry_policy {
521                    self.retry_request(new_endpoint.as_str(), request_data, retry_policy).await
522                } else {
523                    // Perform HTTP call with selected destination
524                    self.make_http_request(new_endpoint.as_str(), request_data).await
525                }
526            },
527            DMSCRouteAction::Redirect(redirect_uri) => {
528                // Handle redirect action
529                Err(DMSCError::ServiceMesh(format!("Redirect to: {}", redirect_uri)))
530            },
531            DMSCRouteAction::DirectResponse(_status, body) => {
532                // Return direct response without making a network call
533                Ok(body.clone().into())
534            }
535        }
536    }
537    
538    /// Selects a destination index based on weights
539    async fn select_destination_index(&self, destinations: &[DMSCWeightedDestination]) -> usize {
540        if destinations.len() == 1 {
541            return 0;
542        }
543        
544        // Calculate total weight
545        let total_weight: u32 = destinations.iter().map(|d| d.weight).sum();
546        
547        // Select random destination based on weights
548        use rand::Rng;
549        let mut rng = rand::thread_rng();
550        let mut current_weight = 0;
551        let random_weight = rng.gen_range(0..total_weight);
552        
553        for (index, destination) in destinations.iter().enumerate() {
554            current_weight += destination.weight;
555            if random_weight < current_weight {
556                return index;
557            }
558        }
559        
560        // Fallback to first destination
561        0
562    }
563    
564    /// Replaces the original endpoint with the selected destination
565    async fn replace_endpoint(&self, original_endpoint: &str, _destination: &DMSCWeightedDestination) -> String {
566        // Simple replacement logic for demonstration
567        // In a full implementation, this would use a more sophisticated approach
568        original_endpoint.to_string()
569    }
570    
571    /// Retries a request according to the retry policy
572    async fn retry_request(&self, endpoint: &str, request_data: Vec<u8>, retry_policy: &DMSCRetryPolicy) -> DMSCResult<Vec<u8>> {
573        let max_attempts = retry_policy.attempts;
574        
575        for attempt in 1..=max_attempts {
576            let result = self.make_http_request(endpoint, request_data.clone()).await;
577            
578            match result {
579                Ok(response) => return Ok(response),
580                Err(e) => {
581                    // Check if retry should be attempted
582                    if attempt < max_attempts && self.should_retry(&e, retry_policy) {
583                        // Wait before retry (exponential backoff)
584                        let delay = Duration::from_millis(100 * 2u64.pow(attempt - 1));
585                        tokio::time::sleep(delay).await;
586                        continue;
587                    }
588                    return Err(e);
589                }
590            }
591        }
592        
593        Err(DMSCError::ServiceMesh("All retry attempts failed".to_string()))
594    }
595    
596    /// Checks if a request should be retried based on the error and retry policy
597    fn should_retry(&self, _error: &DMSCError, retry_policy: &DMSCRetryPolicy) -> bool {
598        // Check if error should be retried based on retry_on conditions
599        // Simple implementation for demonstration
600        retry_policy.retry_on.iter().any(|s| s == "5xx" || s == "all")
601    }
602
603    #[cfg(feature = "http_client")]
604    async fn make_http_request(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
605        let url = endpoint.parse::<reqwest::Url>()
606            .map_err(|e| DMSCError::ServiceMesh(format!("Invalid endpoint URL: {e}")))?;
607        
608        let mut request_builder = self.http_client
609            .post(url)
610            .header("Content-Type", "application/octet-stream");
611        
612        if let Some(_tracer) = &self.tracer {
613            let mut headers = HashMap::new();
614            DMSCContextCarrier::inject_current_into_headers(&mut headers);
615            for (key, value) in headers {
616                request_builder = request_builder.header(key, value);
617            }
618        }
619        
620        let response = request_builder
621            .body(request_data)
622            .send()
623            .await
624            .map_err(|e| DMSCError::ServiceMesh(format!("HTTP request failed: {e}")))?;
625        
626        if !response.status().is_success() {
627            return Err(DMSCError::ServiceMesh(format!(
628                "HTTP request failed with status: {}", 
629                response.status()
630            )));
631        }
632        
633        let response_data = response
634            .bytes()
635            .await
636            .map_err(|e| DMSCError::ServiceMesh(format!("Failed to read response body: {e}")))?
637            .to_vec();
638        
639        Ok(response_data)
640    }
641    
642    #[cfg(not(feature = "http_client"))]
643    async fn make_http_request(&self, _endpoint: &str, _request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
644        Err(DMSCError::ServiceMesh(format!("HTTP client is not enabled. Enable the 'http_client' feature to use HTTP requests.")))
645    }
646
647    async fn apply_traffic_policies(&self, request_data: Vec<u8>) -> Vec<u8> {
648        request_data
649    }
650    
651    /// Applies traffic splitting to determine the destination service
652    /// based on configured traffic splits and weights
653    async fn apply_traffic_split(&self, service: &str) -> Option<String> {
654        let traffic_splits = self.traffic_splits.read().await;
655        
656        if let Some(traffic_split) = traffic_splits.get(service) {
657            // Calculate total weight for all subsets
658            let total_weight: u32 = traffic_split.subsets.values()
659                .map(|subset| subset.weight)
660                .sum();
661            
662            if total_weight == 0 {
663                // If total weight is 0, use default subset
664                Some(traffic_split.default_subset.clone())
665            } else {
666                // Select random destination based on weights
667                use rand::Rng;
668                let mut rng = rand::thread_rng();
669                let random_weight = rng.gen_range(0..total_weight);
670                
671                let mut current_weight = 0;
672                for subset in traffic_split.subsets.values() {
673                    current_weight += subset.weight;
674                    if random_weight < current_weight {
675                        return Some(subset.name.clone());
676                    }
677                }
678                
679                // Fallback to default subset
680                Some(traffic_split.default_subset.clone())
681            }
682        } else {
683            None
684        }
685    }
686
687    fn should_inject_fault(&self) -> Option<DMSCFaultInjection> {
688        use rand::Rng;
689        let mut rng = rand::thread_rng();
690        
691        if rng.gen_bool(0.01) {
692            Some(DMSCFaultInjection {
693                delay: Some(DMSCDelayFault {
694                    percentage: 0.5,
695                    fixed_delay: Duration::from_millis(100),
696                }),
697                abort: None,
698            })
699        } else {
700            None
701        }
702    }
703
704    async fn inject_fault(&self, fault: &DMSCFaultInjection) -> DMSCResult<()> {
705        if let Some(delay) = &fault.delay {
706            use rand::Rng;
707            let mut rng = rand::thread_rng();
708            
709            if rng.gen_bool(delay.percentage) {
710                tokio::time::sleep(delay.fixed_delay).await;
711            }
712        }
713
714        if let Some(abort) = &fault.abort {
715            use rand::Rng;
716            let mut rng = rand::thread_rng();
717            
718            if rng.gen_bool(abort.percentage) {
719                return Err(DMSCError::ServiceMesh(format!("Fault injection: HTTP {}", abort.http_status)));
720            }
721        }
722
723        Ok(())
724    }
725
726    /// Implements a sliding window rate limiter using the leaky bucket algorithm
727    async fn should_rate_limit(&self, endpoint: &str) -> DMSCResult<bool> {
728        let rate_limits = self.rate_limits.read().await;
729        
730        // Check if there's a rate limit configured for this endpoint
731        if let Some(config) = rate_limits.get(endpoint) {
732            // Use a thread-safe per-endpoint rate limiter with sliding window
733            use std::sync::atomic::{AtomicU64, Ordering};
734            use std::collections::HashMap;
735            use std::sync::Arc;
736            
737            // Store rate limiters in a thread-safe map
738            static RATE_LIMITERS: std::sync::Mutex<Option<HashMap<String, Arc<RateLimiter>>>> = 
739                std::sync::Mutex::new(None);
740            
741            // Rate limiter implementation using leaky bucket algorithm
742            struct RateLimiter {
743                capacity: u32,
744                rate: f64, // requests per second
745                tokens: AtomicU64, // current tokens available
746                last_update: AtomicU64, // last update time in milliseconds
747            }
748            
749            impl RateLimiter {
750                fn new(config: &DMSCRateLimitConfig) -> Self {
751                    let rate = config.requests_per_second as f64;
752                    Self {
753                        capacity: config.burst_size,
754                        rate,
755                        tokens: AtomicU64::new(config.burst_size as u64),
756                        last_update: AtomicU64::new(
757                            std::time::SystemTime::now()
758                                .duration_since(std::time::UNIX_EPOCH)
759                                .unwrap_or(std::time::Duration::from_secs(0))
760                                .as_millis() as u64
761                        ),
762                    }
763                }
764                
765                fn try_acquire(&self) -> bool {
766                    let now = std::time::SystemTime::now()
767                        .duration_since(std::time::UNIX_EPOCH)
768                        .unwrap_or(std::time::Duration::from_secs(0))
769                        .as_millis() as u64;
770                    let last = self.last_update.load(Ordering::Acquire);
771                    let elapsed = now - last;
772                    
773                    // Calculate tokens to add based on elapsed time (in milliseconds) and rate
774                    let tokens_to_add = (elapsed as f64 / 1000.0) * self.rate;
775                    let tokens_to_add = tokens_to_add as u64;
776                    
777                    let current = self.tokens.load(Ordering::Acquire);
778                    let new_tokens = std::cmp::min(current.saturating_add(tokens_to_add), self.capacity as u64);
779                    
780                    // Try to acquire one token
781                    if new_tokens > 0 {
782                        if self.tokens.compare_exchange(current, new_tokens - 1, 
783                                                       Ordering::AcqRel, Ordering::Acquire).is_ok() {
784                            // Update last update time if we successfully acquired a token
785                            self.last_update.store(now, Ordering::Release);
786                            return true;
787                        }
788                    }
789                    
790                    false
791                }
792            }
793            
794            let mut limiters = RATE_LIMITERS.lock()
795                .map_err(|e| DMSCError::ServiceMesh(format!("Failed to acquire rate limiter lock: {}", e)))?;
796            if limiters.is_none() {
797                *limiters = Some(HashMap::new());
798            }
799            
800            let limiters = limiters.as_mut()
801                .ok_or_else(|| DMSCError::InvalidState("Rate limiters not initialized".to_string()))?;
802            
803            // Get or create rate limiter for this endpoint
804            let limiter = limiters.entry(endpoint.to_string())
805                .or_insert_with(|| Arc::new(RateLimiter::new(config)));
806            
807            // Try to acquire a token
808            Ok(!limiter.try_acquire())
809        } else {
810            Ok(false) // No rate limit configured
811        }
812    }
813
814    pub async fn set_circuit_breaker_config(&self, service: &str, config: DMSCCircuitBreakerConfig) -> DMSCResult<()> {
815        if !self.enabled {
816            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
817        }
818
819        let mut circuit_breakers = self.circuit_breakers.write().await;
820        circuit_breakers.insert(service.to_string(), config);
821
822        Ok(())
823    }
824
825    pub async fn set_rate_limit_config(&self, service: &str, config: DMSCRateLimitConfig) -> DMSCResult<()> {
826        if !self.enabled {
827            return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
828        }
829
830        let mut rate_limits = self.rate_limits.write().await;
831        rate_limits.insert(service.to_string(), config);
832
833        Ok(())
834    }
835
836    pub async fn get_circuit_breaker_config(&self, service: &str) -> DMSCResult<Option<DMSCCircuitBreakerConfig>> {
837        let circuit_breakers = self.circuit_breakers.read().await;
838        Ok(circuit_breakers.get(service).cloned())
839    }
840
841    pub async fn get_rate_limit_config(&self, service: &str) -> DMSCResult<Option<DMSCRateLimitConfig>> {
842        let rate_limits = self.rate_limits.read().await;
843        Ok(rate_limits.get(service).cloned())
844    }
845
846    pub async fn start_background_tasks(&self) -> DMSCResult<()> {
847        if !self.enabled {
848            return Ok(());
849        }
850
851        Ok(())
852    }
853
854    pub async fn stop_background_tasks(&self) -> DMSCResult<()> {
855        let mut tasks = self.background_tasks.write().await;
856        for task in tasks.drain(..) {
857            task.abort();
858        }
859        Ok(())
860    }
861
862    pub async fn health_check(&self) -> DMSCResult<bool> {
863        Ok(self.enabled)
864    }
865}
866
867#[cfg(feature = "pyo3")]
868/// Python bindings for DMSCTrafficManager
869#[pyo3::prelude::pymethods]
870impl DMSCTrafficManager {
871    #[new]
872    fn py_new(enabled: bool) -> PyResult<Self> {
873        Ok(Self::new(enabled))
874    }
875    
876    /// Add traffic route from Python
877    #[pyo3(name = "add_traffic_route")]
878    fn add_traffic_route_impl(&self, route: DMSCTrafficRoute) -> PyResult<()> {
879        let rt = tokio::runtime::Runtime::new().map_err(|e| {
880            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
881        })?;
882        
883        rt.block_on(async {
884            self.add_traffic_route(route)
885                .await
886                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to add traffic route: {e}")))
887        })
888    }
889    
890    /// Get traffic routes from Python
891    #[pyo3(name = "get_traffic_routes")]
892    fn get_traffic_routes_impl(&self, service_name: String) -> PyResult<Vec<DMSCTrafficRoute>> {
893        let rt = tokio::runtime::Runtime::new().map_err(|e| {
894            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
895        })?;
896        
897        rt.block_on(async {
898            self.get_traffic_routes(&service_name)
899                .await
900                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to get traffic routes: {e}")))
901        })
902    }
903    
904    /// Remove traffic route from Python
905    #[pyo3(name = "remove_traffic_route")]
906    fn remove_traffic_route_impl(&self, source_service: String, route_name: String) -> PyResult<()> {
907        let rt = tokio::runtime::Runtime::new().map_err(|e| {
908            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
909        })?;
910        
911        rt.block_on(async {
912            self.remove_traffic_route(&source_service, &route_name)
913                .await
914                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to remove traffic route: {e}")))
915        })
916    }
917    
918    /// Set circuit breaker config from Python
919    #[pyo3(name = "set_circuit_breaker_config")]
920    fn set_circuit_breaker_config_impl(&self, service: String, config: DMSCCircuitBreakerConfig) -> PyResult<()> {
921        let rt = tokio::runtime::Runtime::new().map_err(|e| {
922            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
923        })?;
924        
925        rt.block_on(async {
926            self.set_circuit_breaker_config(&service, config)
927                .await
928                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to set circuit breaker config: {e}")))
929        })
930    }
931    
932    /// Set rate limit config from Python
933    #[pyo3(name = "set_rate_limit_config")]
934    fn set_rate_limit_config_impl(&self, service: String, config: DMSCRateLimitConfig) -> PyResult<()> {
935        let rt = tokio::runtime::Runtime::new().map_err(|e| {
936            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
937        })?;
938        
939        rt.block_on(async {
940            self.set_rate_limit_config(&service, config)
941                .await
942                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to set rate limit config: {e}")))
943        })
944    }
945}