dmsc/service_mesh/
mod.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18//! # Service Mesh Module
19//! 
20//! This module provides a comprehensive service mesh implementation for DMSC, offering service discovery,
21//! health checking, traffic management, load balancing, and circuit breaking capabilities for distributed systems.
22//! 
23//! ## Key Components
24//! 
25//! - **DMSCServiceMesh**: Main service mesh struct implementing the DMSCModule trait
26//! - **DMSCServiceMeshConfig**: Configuration for service mesh behavior
27//! - **DMSCServiceEndpoint**: Service endpoint representation
28//! - **DMSCServiceHealthStatus**: Enum defining service health statuses
29//! - **DMSCServiceDiscovery**: Service discovery component
30//! - **DMSCServiceInstance**: Service instance representation
31//! - **DMSCServiceStatus**: Service status enum
32//! - **DMSCHealthChecker**: Health checking component
33//! - **DMSCHealthCheckResult**: Health check result structure
34//! - **DMSCHealthSummary**: Health summary structure
35//! - **DMSCHealthStatus**: Health status enum
36//! - **DMSCTrafficManager**: Traffic management component
37//! - **DMSCTrafficRoute**: Traffic route definition
38//! - **DMSCMatchCriteria**: Match criteria for traffic routing
39//! - **DMSCRouteAction**: Route action for traffic routing
40//! - **DMSCCircuitBreaker**: Circuit breaker for preventing cascading failures
41//! - **DMSCLoadBalancer**: Load balancer for distributing traffic across services
42//! - **DMSCLoadBalancerStrategy**: Load balancing strategies
43//! 
44//! ## Design Principles
45//! 
46//! 1. **Service Discovery**: Automatic discovery of services and their endpoints
47//! 2. **Health Monitoring**: Continuous health checks for service endpoints
48//! 3. **Traffic Management**: Intelligent routing and load balancing of traffic
49//! 4. **Resilience**: Circuit breaking and retry mechanisms for service resilience
50//! 5. **Configurable**: Highly configurable service mesh behavior
51//! 6. **Async-First**: All service mesh operations are asynchronous
52//! 7. **Modular Design**: Separate components for service discovery, health checking, and traffic management
53//! 8. **Service Module Integration**: Implements DMSCModule trait for seamless integration into DMSC
54//! 9. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
55//! 10. **Critical Component**: Marked as critical for the system's operation
56//! 
57//! ## Usage
58//! 
59//! ```rust
60//! use dmsc::prelude::*;
61//! use dmsc::service_mesh::{DMSCServiceMesh, DMSCServiceMeshConfig};
62//! 
63//! async fn example() -> DMSCResult<()> {
64//!     // Create service mesh configuration
65//!     let mesh_config = DMSCServiceMeshConfig::default();
66//!     
67//!     // Create service mesh instance
68//!     let service_mesh = DMSCServiceMesh::new(mesh_config)?;
69//!     
70//!     // Register services
71//!     service_mesh.register_service("user-service", "http://user-service:8080", 100).await?;
72//!     service_mesh.register_service("order-service", "http://order-service:8080", 100).await?;
73//!     service_mesh.register_service("payment-service", "http://payment-service:8080", 100).await?;
74//!     
75//!     // Discover services
76//!     let user_service_endpoints = service_mesh.discover_service("user-service").await?;
77//!     println!("User service endpoints: {:?}", user_service_endpoints);
78//!     
79//!     // Call a service
80//!     let request_data = r#"{ "user_id": "123" }"#.as_bytes().to_vec();
81//!     let response = service_mesh.call_service("user-service", request_data).await?;
82//!     println!("Service response: {}", String::from_utf8_lossy(&response));
83//!     
84//!     // Get service mesh components for advanced configuration
85//!     let health_checker = service_mesh.get_health_checker();
86//!     let traffic_manager = service_mesh.get_traffic_manager();
87//!     let circuit_breaker = service_mesh.get_circuit_breaker();
88//!     let load_balancer = service_mesh.get_load_balancer();
89//!     
90//!     // Example: Configure traffic manager
91//!     // traffic_manager.add_route(route).await?;
92//!     
93//!     Ok(())
94//! }
95//! ```
96
97use async_trait::async_trait;
98use serde::{Deserialize, Serialize};
99use std::collections::HashMap;
100use std::sync::Arc;
101use std::time::{Duration, SystemTime};
102use tokio::sync::RwLock;
103
104#[cfg(feature = "pyo3")]
105use pyo3::PyResult;
106
107use crate::core::{DMSCModule, DMSCResult, DMSCError};
108use crate::gateway::{DMSCCircuitBreaker, DMSCCircuitBreakerConfig, DMSCLoadBalancer, DMSCLoadBalancerStrategy};
109use crate::gateway::load_balancer::DMSCBackendServer;
110use crate::observability::{DMSCTracer, DMSCSpanKind, DMSCSpanStatus};
111
112pub mod service_discovery;
113pub mod health_check;
114pub mod traffic_management;
115
116pub use service_discovery::{DMSCServiceDiscovery, DMSCServiceInstance, DMSCServiceStatus};
117pub use health_check::{DMSCHealthChecker, DMSCHealthCheckResult, DMSCHealthSummary, DMSCHealthStatus};    
118pub use traffic_management::{DMSCTrafficRoute, DMSCMatchCriteria, DMSCRouteAction, DMSCWeightedDestination, DMSCTrafficManager};
119
120/// Configuration for the service mesh.
121/// 
122/// This struct defines the configuration options for the service mesh, including service discovery,
123/// health checking, traffic management, circuit breaking, and load balancing settings.
124#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct DMSCServiceMeshConfig {
127    /// Whether to enable service discovery
128    pub enable_service_discovery: bool,
129    /// Whether to enable health checking
130    pub enable_health_check: bool,
131    /// Whether to enable traffic management
132    pub enable_traffic_management: bool,
133    /// Interval between health checks
134    pub health_check_interval: Duration,
135    /// Configuration for circuit breakers
136    pub circuit_breaker_config: DMSCCircuitBreakerConfig,
137    /// Load balancing strategy to use
138    pub load_balancer_strategy: DMSCLoadBalancerStrategy,
139    /// Maximum number of retry attempts for failed requests
140    pub max_retry_attempts: u32,
141    /// Timeout for retry attempts
142    pub retry_timeout: Duration,
143}
144
145impl Default for DMSCServiceMeshConfig {
146    /// Returns the default configuration for the service mesh.
147    /// 
148    /// Default values:
149    /// - enable_service_discovery: true
150    /// - enable_health_check: true
151    /// - enable_traffic_management: true
152    /// - health_check_interval: 30 seconds
153    /// - circuit_breaker_config: Default circuit breaker config
154    /// - load_balancer_strategy: RoundRobin
155    /// - max_retry_attempts: 3
156    /// - retry_timeout: 5 seconds
157    fn default() -> Self {
158        Self {
159            enable_service_discovery: true,
160            enable_health_check: true,
161            enable_traffic_management: true,
162            health_check_interval: Duration::from_secs(30),
163            circuit_breaker_config: DMSCCircuitBreakerConfig::default(),
164            load_balancer_strategy: DMSCLoadBalancerStrategy::RoundRobin,
165            max_retry_attempts: 3,
166            retry_timeout: Duration::from_secs(5),
167        }
168    }
169}
170
171/// Service endpoint representation.
172/// 
173/// This struct represents a service endpoint with its name, URL, weight, metadata, health status,
174/// and last health check time.
175#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
176#[derive(Debug, Clone)]
177pub struct DMSCServiceEndpoint {
178    /// Name of the service
179    pub service_name: String,
180    /// Endpoint URL
181    pub endpoint: String,
182    /// Weight for load balancing
183    pub weight: u32,
184    /// Metadata associated with the endpoint
185    pub metadata: HashMap<String, String>,
186    /// Health status of the endpoint
187    pub health_status: DMSCServiceHealthStatus,
188    /// Time of the last health check
189    pub last_health_check: SystemTime,
190}
191
192/// Service health status enum.
193/// 
194/// This enum defines the possible health statuses for a service endpoint.
195#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
196#[derive(Debug, Clone, PartialEq)]
197pub enum DMSCServiceHealthStatus {
198    /// Service is healthy and available
199    Healthy,
200    /// Service is unhealthy and unavailable
201    Unhealthy,
202    /// Service health status is unknown
203    Unknown,
204}
205
206#[cfg(feature = "pyo3")]
207#[pyo3::prelude::pymethods]
208impl DMSCServiceEndpoint {
209    #[new]
210    fn py_new(
211        service_name: String,
212        endpoint: String,
213        weight: u32,
214    ) -> Self {
215        Self {
216            service_name,
217            endpoint,
218            weight,
219            metadata: HashMap::new(),
220            health_status: DMSCServiceHealthStatus::Unknown,
221            last_health_check: SystemTime::now(),
222        }
223    }
224
225    #[getter]
226    fn service_name(&self) -> &str {
227        &self.service_name
228    }
229
230    #[getter]
231    fn endpoint(&self) -> &str {
232        &self.endpoint
233    }
234
235    #[getter]
236    fn weight(&self) -> u32 {
237        self.weight
238    }
239
240    #[getter]
241    fn health_status(&self) -> DMSCServiceHealthStatus {
242        self.health_status.clone()
243    }
244}
245
246/// Service discovery cache entry
247/// 
248/// This struct represents a cached entry for service discovery results.
249#[derive(Debug, Clone)]
250struct ServiceDiscoveryCacheEntry {
251    /// Discovered service endpoints
252    endpoints: Vec<DMSCServiceEndpoint>,
253    /// Cache entry expiration time
254    expiration: SystemTime,
255}
256
257/// Service mesh statistics.
258#[derive(Debug, Clone)]
259#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
260pub struct DMSCServiceMeshStats {
261    /// Total number of registered services
262    pub total_services: usize,
263    /// Total number of registered endpoints
264    pub total_endpoints: usize,
265    /// Number of healthy endpoints
266    pub healthy_endpoints: usize,
267    /// Number of unhealthy endpoints
268    pub unhealthy_endpoints: usize,
269}
270
271#[cfg(feature = "pyo3")]
272#[pyo3::prelude::pymethods]
273impl DMSCServiceMeshStats {
274    #[new]
275    fn py_new() -> Self {
276        Self {
277            total_services: 0,
278            total_endpoints: 0,
279            healthy_endpoints: 0,
280            unhealthy_endpoints: 0,
281        }
282    }
283
284    #[getter]
285    fn total_services(&self) -> usize {
286        self.total_services
287    }
288
289    #[getter]
290    fn total_endpoints(&self) -> usize {
291        self.total_endpoints
292    }
293
294    #[getter]
295    fn healthy_endpoints(&self) -> usize {
296        self.healthy_endpoints
297    }
298
299    #[getter]
300    fn unhealthy_endpoints(&self) -> usize {
301        self.unhealthy_endpoints
302    }
303}
304
305/// Main service mesh struct implementing the DMSCModule trait.
306/// 
307/// This struct provides comprehensive service mesh functionality, including service discovery,
308/// health checking, traffic management, load balancing, and circuit breaking.
309#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
310pub struct DMSCServiceMesh {
311    config: DMSCServiceMeshConfig,
312    service_discovery: Arc<DMSCServiceDiscovery>,
313    health_checker: Arc<DMSCHealthChecker>,
314    traffic_manager: Arc<DMSCTrafficManager>,
315    circuit_breaker: Arc<DMSCCircuitBreaker>,
316    load_balancer: Arc<DMSCLoadBalancer>,
317    services: Arc<RwLock<HashMap<String, Vec<DMSCServiceEndpoint>>>>,
318    discovery_cache: Arc<RwLock<HashMap<String, ServiceDiscoveryCacheEntry>>>,
319    cache_expiration: Duration,
320    tracer: Option<Arc<DMSCTracer>>,
321}
322
323impl DMSCServiceMesh {
324    pub fn new(config: DMSCServiceMeshConfig) -> DMSCResult<Self> {
325        let service_discovery = Arc::new(DMSCServiceDiscovery::new(config.enable_service_discovery));
326        let health_checker = Arc::new(DMSCHealthChecker::new(config.health_check_interval));
327        let traffic_manager = Arc::new(DMSCTrafficManager::new(config.enable_traffic_management));
328        let circuit_breaker = Arc::new(DMSCCircuitBreaker::new(config.circuit_breaker_config.clone()));
329        let load_balancer = Arc::new(DMSCLoadBalancer::new(config.load_balancer_strategy.clone()));
330        
331        Ok(Self {
332            config,
333            service_discovery,
334            health_checker,
335            traffic_manager,
336            circuit_breaker,
337            load_balancer,
338            services: Arc::new(RwLock::new(HashMap::new())),
339            discovery_cache: Arc::new(RwLock::new(HashMap::new())),
340            cache_expiration: Duration::from_secs(30),
341            tracer: None,
342        })
343    }
344    
345    pub fn with_tracer(mut self, tracer: Arc<DMSCTracer>) -> Self {
346        self.tracer = Some(tracer.clone());
347        let mut traffic_manager = DMSCTrafficManager::new(self.config.enable_traffic_management);
348        traffic_manager.set_tracer(tracer);
349        self.traffic_manager = Arc::new(traffic_manager);
350        self
351    }
352    
353    pub fn set_tracer(&mut self, tracer: Arc<DMSCTracer>) {
354        self.tracer = Some(tracer.clone());
355        let mut traffic_manager = DMSCTrafficManager::new(self.config.enable_traffic_management);
356        traffic_manager.set_tracer(tracer);
357        self.traffic_manager = Arc::new(traffic_manager);
358    }
359
360    /// Registers a service endpoint with the service mesh.
361    /// 
362    /// # Parameters
363    /// 
364    /// - `service_name`: The name of the service
365    /// - `endpoint`: The endpoint URL of the service
366    /// - `weight`: The weight of the endpoint for load balancing
367    /// - `metadata`: Optional metadata associated with the service
368    /// 
369    /// # Returns
370    /// 
371    /// A `DMSCResult<()>` indicating success or failure
372    pub async fn register_service(&self, service_name: &str, endpoint: &str, weight: u32, metadata: Option<HashMap<String, String>>) -> DMSCResult<()> {
373        if service_name.is_empty() {
374            return Err(DMSCError::ServiceMesh("Service name cannot be empty".to_string()));
375        }
376        if endpoint.is_empty() {
377            return Err(DMSCError::ServiceMesh("Endpoint cannot be empty".to_string()));
378        }
379        if weight == 0 {
380            return Err(DMSCError::ServiceMesh("Weight must be greater than zero".to_string()));
381        }
382
383        let service_endpoint = DMSCServiceEndpoint {
384            service_name: service_name.to_string(),
385            endpoint: endpoint.to_string(),
386            weight,
387            metadata: metadata.unwrap_or_default(),
388            health_status: DMSCServiceHealthStatus::Unknown,
389            last_health_check: SystemTime::now(),
390        };
391
392        let mut services = self.services.write().await;
393        services.entry(service_name.to_string())
394            .or_insert_with(Vec::new)
395            .push(service_endpoint);
396
397        if self.config.enable_health_check {
398            self.health_checker.start_health_check(service_name, endpoint).await?;
399        }
400
401        Ok(())
402    }
403    
404    /// Registers a service with full metadata including version information.
405    pub async fn register_versioned_service(&self, service_name: &str, version: &str, endpoint: &str, weight: u32, metadata: Option<HashMap<String, String>>) -> DMSCResult<()> {
406        let mut enriched_metadata = metadata.unwrap_or_default();
407        enriched_metadata.insert("version".to_string(), version.to_string());
408        
409        self.register_service(service_name, endpoint, weight, Some(enriched_metadata)).await
410    }
411    
412    /// Unregisters a service endpoint from the service mesh.
413    pub async fn unregister_service(&self, service_name: &str, endpoint: &str) -> DMSCResult<()> {
414        let mut services = self.services.write().await;
415        
416        if let Some(endpoints) = services.get_mut(service_name) {
417            endpoints.retain(|ep| ep.endpoint != endpoint);
418            
419            if endpoints.is_empty() {
420                services.remove(service_name);
421            }
422            
423            if self.config.enable_health_check {
424                self.health_checker.stop_health_check(service_name, endpoint).await?;
425            }
426        }
427        
428        Ok(())
429    }
430    
431    /// Gets all registered endpoints for a service regardless of health status.
432    pub async fn get_all_endpoints(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceEndpoint>> {
433        let services = self.services.read().await;
434        
435        services.get(service_name)
436            .cloned()
437            .ok_or_else(|| DMSCError::ServiceMesh(format!("Service '{service_name}' not found")))
438    }
439    
440    /// Gets service mesh statistics.
441    pub async fn get_stats(&self) -> DMSCServiceMeshStats {
442        let services = self.services.read().await;
443        let healthy_count = services.values()
444            .flat_map(|endpoints| endpoints.iter())
445            .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Healthy)
446            .count();
447        
448        DMSCServiceMeshStats {
449            total_services: services.len(),
450            total_endpoints: services.values().map(|v| v.len()).sum(),
451            healthy_endpoints: healthy_count,
452            unhealthy_endpoints: services.values()
453                .flat_map(|endpoints| endpoints.iter())
454                .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Unhealthy)
455                .count(),
456        }
457    }
458
459    /// Discovers healthy endpoints for a service.
460    /// 
461    /// # Parameters
462    /// 
463    /// - `service_name`: The name of the service to discover
464    /// 
465    /// # Returns
466    /// 
467    /// A `DMSCResult<Vec<DMSCServiceEndpoint>>` containing the healthy endpoints for the service
468    pub async fn discover_service(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceEndpoint>> {
469        if !self.config.enable_service_discovery {
470            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
471        }
472
473        // Check cache first
474        {
475            let cache = self.discovery_cache.read().await;
476            if let Some(entry) = cache.get(service_name) {
477                if entry.expiration > SystemTime::now() {
478                    // Cache is still valid, return cached endpoints
479                    return Ok(entry.endpoints.clone());
480                }
481            }
482        }
483
484        // Cache miss or expired, perform regular service discovery
485        let services = self.services.read().await;
486        let endpoints = services.get(service_name)
487            .ok_or_else(|| DMSCError::ServiceMesh(format!("Service '{service_name}' not found")))?
488            .clone();
489
490        let healthy_endpoints: Vec<DMSCServiceEndpoint> = endpoints
491            .into_iter()
492            .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Healthy)
493            .collect();
494
495        if healthy_endpoints.is_empty() {
496            return Err(DMSCError::ServiceMesh(format!("No healthy endpoints for service '{service_name}'")));
497        }
498
499        // Cache the discovered endpoints
500        let expiration = SystemTime::now() + self.cache_expiration;
501        let cache_entry = ServiceDiscoveryCacheEntry {
502            endpoints: healthy_endpoints.clone(),
503            expiration,
504        };
505        
506        let mut cache = self.discovery_cache.write().await;
507        cache.insert(service_name.to_string(), cache_entry);
508
509        Ok(healthy_endpoints)
510    }
511
512    /// Calls a service with the given request data.
513    /// 
514    /// This method performs the following steps:
515    /// 1. Discovers healthy endpoints for the service
516    /// 2. Selects a server using the load balancer
517    /// 3. Checks the circuit breaker status
518    /// 4. Executes the service call with retry logic
519    /// 5. Records success/failure with the circuit breaker
520    /// 
521    /// # Parameters
522    /// 
523    /// - `service_name`: The name of the service to call
524    /// - `request_data`: The request data to send to the service
525    /// 
526    /// # Returns
527    /// 
528    /// A `DMSCResult<Vec<u8>>` containing the response from the service
529    pub async fn call_service(&self, service_name: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
530        let span_id = if let Some(tracer) = &self.tracer {
531            let span_id = tracer.start_span_from_context(
532                format!("call_service:{}", service_name),
533                DMSCSpanKind::Client,
534            );
535            if let Some(ref sid) = span_id {
536                let _ = tracer.span_mut(sid, |span| {
537                    span.set_attribute("service_name".to_string(), service_name.to_string());
538                    span.set_attribute("request_size".to_string(), request_data.len().to_string());
539                });
540            }
541            span_id
542        } else {
543            None
544        };
545
546        let result = self.call_service_internal(service_name, request_data).await;
547
548        if let (Some(tracer), Some(sid)) = (&self.tracer, span_id) {
549            let status = match &result {
550                Ok(_) => DMSCSpanStatus::Ok,
551                Err(e) => DMSCSpanStatus::Error(e.to_string()),
552            };
553            let _ = tracer.end_span(&sid, status);
554        }
555
556        result
557    }
558    
559    async fn call_service_internal(&self, service_name: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
560        let endpoints = self.discover_service(service_name).await?;
561        
562        let mut existing_servers = self.load_balancer.get_healthy_servers().await;
563        existing_servers.retain(|s| !s.id.starts_with(&format!("{service_name}-")));
564        
565        for ep in &endpoints {
566            if ep.health_status == DMSCServiceHealthStatus::Healthy {
567                let server = DMSCBackendServer {
568                    id: format!("{}-{}", service_name, ep.endpoint),
569                    url: ep.endpoint.clone(),
570                    weight: ep.weight,
571                    max_connections: 100,
572                    health_check_path: "/health".to_string(),
573                    is_healthy: true,
574                };
575                self.load_balancer.add_server(server).await;
576            }
577        }
578
579        let selected_server = match self.load_balancer.select_server(None).await {
580            Ok(server) => server,
581            Err(_) => return Err(DMSCError::ServiceMesh("No available backend server".to_string())),
582        };
583
584        if !self.circuit_breaker.allow_request() {
585            return Err(DMSCError::ServiceMesh("Circuit breaker is open".to_string()));
586        }
587
588        let result = self.execute_service_call(&selected_server.url, request_data.clone()).await;
589
590        match &result {
591            Ok(_) => {
592                self.circuit_breaker.record_success();
593            }
594            Err(_) => {
595                self.circuit_breaker.record_failure();
596            }
597        }
598
599        result
600    }
601
602    /// Executes a service call with retry logic.
603    /// 
604    /// This method attempts to call a service endpoint with exponential backoff retry logic.
605    /// 
606    /// # Parameters
607    /// 
608    /// - `endpoint`: The endpoint URL to call
609    /// - `request_data`: The request data to send
610    /// 
611    /// # Returns
612    /// 
613    /// A `DMSCResult<Vec<u8>>` containing the response from the service
614    async fn execute_service_call(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
615        let mut last_error = None;
616        
617        for attempt in 0..self.config.max_retry_attempts {
618            match self.traffic_manager.route_request(endpoint, request_data.clone()).await {
619                Ok(response) => return Ok(response),
620                Err(_e) => {
621                    let sanitized_error = DMSCError::ServiceMesh(format!("Retry attempt {} failed", attempt + 1));
622                    last_error = Some(sanitized_error);
623                    if attempt < self.config.max_retry_attempts - 1 {
624                        tokio::time::sleep(Duration::from_millis(100 * (attempt + 1) as u64)).await;
625                    }
626                }
627            }
628        }
629
630        Err(last_error.unwrap_or_else(|| DMSCError::ServiceMesh("All retry attempts failed".to_string())))
631    }
632
633    /// Updates the health status of a service endpoint.
634    /// 
635    /// # Parameters
636    /// 
637    /// - `service_name`: The name of the service
638    /// - `endpoint`: The endpoint URL
639    /// - `is_healthy`: Whether the endpoint is healthy
640    /// 
641    /// # Returns
642    /// 
643    /// A `DMSCResult<()>` indicating success or failure
644    pub async fn update_service_health(&self, service_name: &str, endpoint: &str, is_healthy: bool) -> DMSCResult<()> {
645        let mut services = self.services.write().await;
646        if let Some(endpoints) = services.get_mut(service_name) {
647            if let Some(service_ep) = endpoints.iter_mut().find(|ep| ep.endpoint == endpoint) {
648                service_ep.health_status = if is_healthy {
649                    DMSCServiceHealthStatus::Healthy
650                } else {
651                    DMSCServiceHealthStatus::Unhealthy
652                };
653                service_ep.last_health_check = SystemTime::now();
654            }
655        }
656        Ok(())
657    }
658
659    /// Returns a reference to the circuit breaker.
660    /// 
661    /// # Returns
662    /// 
663    /// A reference to the `DMSCCircuitBreaker` instance
664    pub fn get_circuit_breaker(&self) -> &DMSCCircuitBreaker {
665        &self.circuit_breaker
666    }
667
668    /// Returns a reference to the load balancer.
669    /// 
670    /// # Returns
671    /// 
672    /// A reference to the `DMSCLoadBalancer` instance
673    pub fn get_load_balancer(&self) -> &DMSCLoadBalancer {
674        &self.load_balancer
675    }
676
677    /// Returns a reference to the health checker.
678    /// 
679    /// # Returns
680    /// 
681    /// A reference to the `DMSCHealthChecker` instance
682    pub fn get_health_checker(&self) -> &DMSCHealthChecker {
683        &self.health_checker
684    }
685
686    /// Returns a reference to the traffic manager.
687    /// 
688    /// # Returns
689    /// 
690    /// A reference to the `DMSCTrafficManager` instance
691    pub fn get_traffic_manager(&self) -> &DMSCTrafficManager {
692        &self.traffic_manager
693    }
694
695    /// Returns a reference to the service discovery component.
696    /// 
697    /// # Returns
698    /// 
699    /// A reference to the `DMSCServiceDiscovery` instance
700    pub fn get_service_discovery(&self) -> &DMSCServiceDiscovery {
701        &self.service_discovery
702    }
703}
704
705#[cfg(feature = "pyo3")]
706/// Python bindings for DMSCServiceMesh
707#[pyo3::prelude::pymethods]
708impl DMSCServiceMesh {
709    #[new]
710    fn py_new(config: DMSCServiceMeshConfig) -> PyResult<Self> {
711        match Self::new(config) {
712            Ok(mesh) => Ok(mesh),
713            Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create service mesh: {e}"))),
714        }
715    }
716    
717    /// Register a service from Python
718    #[pyo3(name = "register_service")]
719    fn register_service_impl(&self, service_name: String, endpoint: String, weight: u32) -> PyResult<()> {
720        let rt = tokio::runtime::Runtime::new().map_err(|e| {
721            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
722        })?;
723        
724        rt.block_on(async {
725            self.register_service(&service_name, &endpoint, weight, None)
726                .await
727                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to register service: {e}")))
728        })
729    }
730    
731    /// Discover services from Python
732    #[pyo3(name = "discover_service")]
733    fn discover_service_impl(&self, service_name: String) -> PyResult<Vec<DMSCServiceEndpoint>> {
734        let rt = tokio::runtime::Runtime::new().map_err(|e| {
735            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
736        })?;
737        
738        rt.block_on(async {
739            self.discover_service(&service_name)
740                .await
741                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to discover service: {e}")))
742        })
743    }
744    
745    /// Update service health from Python
746    #[pyo3(name = "update_service_health")]
747    fn update_service_health_impl(&self, service_name: String, endpoint: String, is_healthy: bool) -> PyResult<()> {
748        let rt = tokio::runtime::Runtime::new().map_err(|e| {
749            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
750        })?;
751        
752        rt.block_on(async {
753            self.update_service_health(&service_name, &endpoint, is_healthy)
754                .await
755                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to update health: {e}")))
756        })
757    }
758    
759    /// Get the service mesh configuration
760    fn get_config(&self) -> DMSCServiceMeshConfig {
761        self.config.clone()
762    }
763}
764
765#[async_trait]
766impl DMSCModule for DMSCServiceMesh {
767    /// Returns the name of the service mesh module.
768    /// 
769    /// # Returns
770    /// 
771    /// The module name as a string
772    fn name(&self) -> &str {
773        "DMSC.ServiceMesh"
774    }
775
776    /// Indicates whether the service mesh module is critical.
777    /// 
778    /// The service mesh is marked as critical because it's essential for the operation
779    /// of distributed services in the system.
780    /// 
781    /// # Returns
782    /// 
783    /// `true` since service mesh is critical
784    fn is_critical(&self) -> bool {
785        true
786    }
787
788    /// Starts the service mesh module.
789    /// 
790    /// This method starts background tasks for service discovery, health checking,
791    /// and traffic management if they are enabled.
792    /// 
793    /// # Parameters
794    /// 
795    /// - `_ctx`: Service context (not used in this implementation)
796    /// 
797    /// # Returns
798    /// 
799    /// A `DMSCResult<()>` indicating success or failure
800    async fn start(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> DMSCResult<()> {
801        if self.config.enable_health_check {
802            self.health_checker.start_background_tasks().await?;
803        }
804        
805        if self.config.enable_service_discovery {
806            self.service_discovery.start_background_tasks().await?;
807        }
808        
809        if self.config.enable_traffic_management {
810            self.traffic_manager.start_background_tasks().await?;
811        }
812
813        Ok(())
814    }
815
816    /// Shuts down the service mesh module.
817    /// 
818    /// This method stops background tasks for service discovery, health checking,
819    /// and traffic management if they are enabled.
820    /// 
821    /// # Parameters
822    /// 
823    /// - `_ctx`: Service context (not used in this implementation)
824    /// 
825    /// # Returns
826    /// 
827    /// A `DMSCResult<()>` indicating success or failure
828    async fn shutdown(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> DMSCResult<()> {
829        if self.config.enable_health_check {
830            self.health_checker.stop_background_tasks().await?;
831        }
832        
833        if self.config.enable_service_discovery {
834            self.service_discovery.stop_background_tasks().await?;
835        }
836        
837        if self.config.enable_traffic_management {
838            self.traffic_manager.stop_background_tasks().await?;
839        }
840
841        Ok(())
842    }
843}