1use async_trait::async_trait;
98use serde::{Deserialize, Serialize};
99use std::collections::HashMap;
100use std::sync::Arc;
101use std::time::{Duration, SystemTime};
102use tokio::sync::RwLock;
103
104#[cfg(feature = "pyo3")]
105use pyo3::PyResult;
106
107use crate::core::{DMSCModule, DMSCResult, DMSCError};
108use crate::gateway::{DMSCCircuitBreaker, DMSCCircuitBreakerConfig, DMSCLoadBalancer, DMSCLoadBalancerStrategy};
109use crate::gateway::load_balancer::DMSCBackendServer;
110use crate::observability::{DMSCTracer, DMSCSpanKind, DMSCSpanStatus};
111
112pub mod service_discovery;
113pub mod health_check;
114pub mod traffic_management;
115
116pub use service_discovery::{DMSCServiceDiscovery, DMSCServiceInstance, DMSCServiceStatus};
117pub use health_check::{DMSCHealthChecker, DMSCHealthCheckResult, DMSCHealthSummary, DMSCHealthStatus};
118pub use traffic_management::{DMSCTrafficRoute, DMSCMatchCriteria, DMSCRouteAction, DMSCWeightedDestination, DMSCTrafficManager};
119
120#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct DMSCServiceMeshConfig {
127 pub enable_service_discovery: bool,
129 pub enable_health_check: bool,
131 pub enable_traffic_management: bool,
133 pub health_check_interval: Duration,
135 pub circuit_breaker_config: DMSCCircuitBreakerConfig,
137 pub load_balancer_strategy: DMSCLoadBalancerStrategy,
139 pub max_retry_attempts: u32,
141 pub retry_timeout: Duration,
143}
144
145impl Default for DMSCServiceMeshConfig {
146 fn default() -> Self {
158 Self {
159 enable_service_discovery: true,
160 enable_health_check: true,
161 enable_traffic_management: true,
162 health_check_interval: Duration::from_secs(30),
163 circuit_breaker_config: DMSCCircuitBreakerConfig::default(),
164 load_balancer_strategy: DMSCLoadBalancerStrategy::RoundRobin,
165 max_retry_attempts: 3,
166 retry_timeout: Duration::from_secs(5),
167 }
168 }
169}
170
171#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
176#[derive(Debug, Clone)]
177pub struct DMSCServiceEndpoint {
178 pub service_name: String,
180 pub endpoint: String,
182 pub weight: u32,
184 pub metadata: HashMap<String, String>,
186 pub health_status: DMSCServiceHealthStatus,
188 pub last_health_check: SystemTime,
190}
191
192#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
196#[derive(Debug, Clone, PartialEq)]
197pub enum DMSCServiceHealthStatus {
198 Healthy,
200 Unhealthy,
202 Unknown,
204}
205
206#[cfg(feature = "pyo3")]
207#[pyo3::prelude::pymethods]
208impl DMSCServiceEndpoint {
209 #[new]
210 fn py_new(
211 service_name: String,
212 endpoint: String,
213 weight: u32,
214 ) -> Self {
215 Self {
216 service_name,
217 endpoint,
218 weight,
219 metadata: HashMap::new(),
220 health_status: DMSCServiceHealthStatus::Unknown,
221 last_health_check: SystemTime::now(),
222 }
223 }
224
225 #[getter]
226 fn service_name(&self) -> &str {
227 &self.service_name
228 }
229
230 #[getter]
231 fn endpoint(&self) -> &str {
232 &self.endpoint
233 }
234
235 #[getter]
236 fn weight(&self) -> u32 {
237 self.weight
238 }
239
240 #[getter]
241 fn health_status(&self) -> DMSCServiceHealthStatus {
242 self.health_status.clone()
243 }
244}
245
246#[derive(Debug, Clone)]
250struct ServiceDiscoveryCacheEntry {
251 endpoints: Vec<DMSCServiceEndpoint>,
253 expiration: SystemTime,
255}
256
257#[derive(Debug, Clone)]
259#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
260pub struct DMSCServiceMeshStats {
261 pub total_services: usize,
263 pub total_endpoints: usize,
265 pub healthy_endpoints: usize,
267 pub unhealthy_endpoints: usize,
269}
270
271#[cfg(feature = "pyo3")]
272#[pyo3::prelude::pymethods]
273impl DMSCServiceMeshStats {
274 #[new]
275 fn py_new() -> Self {
276 Self {
277 total_services: 0,
278 total_endpoints: 0,
279 healthy_endpoints: 0,
280 unhealthy_endpoints: 0,
281 }
282 }
283
284 #[getter]
285 fn total_services(&self) -> usize {
286 self.total_services
287 }
288
289 #[getter]
290 fn total_endpoints(&self) -> usize {
291 self.total_endpoints
292 }
293
294 #[getter]
295 fn healthy_endpoints(&self) -> usize {
296 self.healthy_endpoints
297 }
298
299 #[getter]
300 fn unhealthy_endpoints(&self) -> usize {
301 self.unhealthy_endpoints
302 }
303}
304
305#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
310pub struct DMSCServiceMesh {
311 config: DMSCServiceMeshConfig,
312 service_discovery: Arc<DMSCServiceDiscovery>,
313 health_checker: Arc<DMSCHealthChecker>,
314 traffic_manager: Arc<DMSCTrafficManager>,
315 circuit_breaker: Arc<DMSCCircuitBreaker>,
316 load_balancer: Arc<DMSCLoadBalancer>,
317 services: Arc<RwLock<HashMap<String, Vec<DMSCServiceEndpoint>>>>,
318 discovery_cache: Arc<RwLock<HashMap<String, ServiceDiscoveryCacheEntry>>>,
319 cache_expiration: Duration,
320 tracer: Option<Arc<DMSCTracer>>,
321}
322
323impl DMSCServiceMesh {
324 pub fn new(config: DMSCServiceMeshConfig) -> DMSCResult<Self> {
325 let service_discovery = Arc::new(DMSCServiceDiscovery::new(config.enable_service_discovery));
326 let health_checker = Arc::new(DMSCHealthChecker::new(config.health_check_interval));
327 let traffic_manager = Arc::new(DMSCTrafficManager::new(config.enable_traffic_management));
328 let circuit_breaker = Arc::new(DMSCCircuitBreaker::new(config.circuit_breaker_config.clone()));
329 let load_balancer = Arc::new(DMSCLoadBalancer::new(config.load_balancer_strategy.clone()));
330
331 Ok(Self {
332 config,
333 service_discovery,
334 health_checker,
335 traffic_manager,
336 circuit_breaker,
337 load_balancer,
338 services: Arc::new(RwLock::new(HashMap::new())),
339 discovery_cache: Arc::new(RwLock::new(HashMap::new())),
340 cache_expiration: Duration::from_secs(30),
341 tracer: None,
342 })
343 }
344
345 pub fn with_tracer(mut self, tracer: Arc<DMSCTracer>) -> Self {
346 self.tracer = Some(tracer.clone());
347 let mut traffic_manager = DMSCTrafficManager::new(self.config.enable_traffic_management);
348 traffic_manager.set_tracer(tracer);
349 self.traffic_manager = Arc::new(traffic_manager);
350 self
351 }
352
353 pub fn set_tracer(&mut self, tracer: Arc<DMSCTracer>) {
354 self.tracer = Some(tracer.clone());
355 let mut traffic_manager = DMSCTrafficManager::new(self.config.enable_traffic_management);
356 traffic_manager.set_tracer(tracer);
357 self.traffic_manager = Arc::new(traffic_manager);
358 }
359
360 pub async fn register_service(&self, service_name: &str, endpoint: &str, weight: u32, metadata: Option<HashMap<String, String>>) -> DMSCResult<()> {
373 if service_name.is_empty() {
374 return Err(DMSCError::ServiceMesh("Service name cannot be empty".to_string()));
375 }
376 if endpoint.is_empty() {
377 return Err(DMSCError::ServiceMesh("Endpoint cannot be empty".to_string()));
378 }
379 if weight == 0 {
380 return Err(DMSCError::ServiceMesh("Weight must be greater than zero".to_string()));
381 }
382
383 let service_endpoint = DMSCServiceEndpoint {
384 service_name: service_name.to_string(),
385 endpoint: endpoint.to_string(),
386 weight,
387 metadata: metadata.unwrap_or_default(),
388 health_status: DMSCServiceHealthStatus::Unknown,
389 last_health_check: SystemTime::now(),
390 };
391
392 let mut services = self.services.write().await;
393 services.entry(service_name.to_string())
394 .or_insert_with(Vec::new)
395 .push(service_endpoint);
396
397 if self.config.enable_health_check {
398 self.health_checker.start_health_check(service_name, endpoint).await?;
399 }
400
401 Ok(())
402 }
403
404 pub async fn register_versioned_service(&self, service_name: &str, version: &str, endpoint: &str, weight: u32, metadata: Option<HashMap<String, String>>) -> DMSCResult<()> {
406 let mut enriched_metadata = metadata.unwrap_or_default();
407 enriched_metadata.insert("version".to_string(), version.to_string());
408
409 self.register_service(service_name, endpoint, weight, Some(enriched_metadata)).await
410 }
411
412 pub async fn unregister_service(&self, service_name: &str, endpoint: &str) -> DMSCResult<()> {
414 let mut services = self.services.write().await;
415
416 if let Some(endpoints) = services.get_mut(service_name) {
417 endpoints.retain(|ep| ep.endpoint != endpoint);
418
419 if endpoints.is_empty() {
420 services.remove(service_name);
421 }
422
423 if self.config.enable_health_check {
424 self.health_checker.stop_health_check(service_name, endpoint).await?;
425 }
426 }
427
428 Ok(())
429 }
430
431 pub async fn get_all_endpoints(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceEndpoint>> {
433 let services = self.services.read().await;
434
435 services.get(service_name)
436 .cloned()
437 .ok_or_else(|| DMSCError::ServiceMesh(format!("Service '{service_name}' not found")))
438 }
439
440 pub async fn get_stats(&self) -> DMSCServiceMeshStats {
442 let services = self.services.read().await;
443 let healthy_count = services.values()
444 .flat_map(|endpoints| endpoints.iter())
445 .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Healthy)
446 .count();
447
448 DMSCServiceMeshStats {
449 total_services: services.len(),
450 total_endpoints: services.values().map(|v| v.len()).sum(),
451 healthy_endpoints: healthy_count,
452 unhealthy_endpoints: services.values()
453 .flat_map(|endpoints| endpoints.iter())
454 .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Unhealthy)
455 .count(),
456 }
457 }
458
459 pub async fn discover_service(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceEndpoint>> {
469 if !self.config.enable_service_discovery {
470 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
471 }
472
473 {
475 let cache = self.discovery_cache.read().await;
476 if let Some(entry) = cache.get(service_name) {
477 if entry.expiration > SystemTime::now() {
478 return Ok(entry.endpoints.clone());
480 }
481 }
482 }
483
484 let services = self.services.read().await;
486 let endpoints = services.get(service_name)
487 .ok_or_else(|| DMSCError::ServiceMesh(format!("Service '{service_name}' not found")))?
488 .clone();
489
490 let healthy_endpoints: Vec<DMSCServiceEndpoint> = endpoints
491 .into_iter()
492 .filter(|ep| ep.health_status == DMSCServiceHealthStatus::Healthy)
493 .collect();
494
495 if healthy_endpoints.is_empty() {
496 return Err(DMSCError::ServiceMesh(format!("No healthy endpoints for service '{service_name}'")));
497 }
498
499 let expiration = SystemTime::now() + self.cache_expiration;
501 let cache_entry = ServiceDiscoveryCacheEntry {
502 endpoints: healthy_endpoints.clone(),
503 expiration,
504 };
505
506 let mut cache = self.discovery_cache.write().await;
507 cache.insert(service_name.to_string(), cache_entry);
508
509 Ok(healthy_endpoints)
510 }
511
512 pub async fn call_service(&self, service_name: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
530 let span_id = if let Some(tracer) = &self.tracer {
531 let span_id = tracer.start_span_from_context(
532 format!("call_service:{}", service_name),
533 DMSCSpanKind::Client,
534 );
535 if let Some(ref sid) = span_id {
536 let _ = tracer.span_mut(sid, |span| {
537 span.set_attribute("service_name".to_string(), service_name.to_string());
538 span.set_attribute("request_size".to_string(), request_data.len().to_string());
539 });
540 }
541 span_id
542 } else {
543 None
544 };
545
546 let result = self.call_service_internal(service_name, request_data).await;
547
548 if let (Some(tracer), Some(sid)) = (&self.tracer, span_id) {
549 let status = match &result {
550 Ok(_) => DMSCSpanStatus::Ok,
551 Err(e) => DMSCSpanStatus::Error(e.to_string()),
552 };
553 let _ = tracer.end_span(&sid, status);
554 }
555
556 result
557 }
558
559 async fn call_service_internal(&self, service_name: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
560 let endpoints = self.discover_service(service_name).await?;
561
562 let mut existing_servers = self.load_balancer.get_healthy_servers().await;
563 existing_servers.retain(|s| !s.id.starts_with(&format!("{service_name}-")));
564
565 for ep in &endpoints {
566 if ep.health_status == DMSCServiceHealthStatus::Healthy {
567 let server = DMSCBackendServer {
568 id: format!("{}-{}", service_name, ep.endpoint),
569 url: ep.endpoint.clone(),
570 weight: ep.weight,
571 max_connections: 100,
572 health_check_path: "/health".to_string(),
573 is_healthy: true,
574 };
575 self.load_balancer.add_server(server).await;
576 }
577 }
578
579 let selected_server = match self.load_balancer.select_server(None).await {
580 Ok(server) => server,
581 Err(_) => return Err(DMSCError::ServiceMesh("No available backend server".to_string())),
582 };
583
584 if !self.circuit_breaker.allow_request() {
585 return Err(DMSCError::ServiceMesh("Circuit breaker is open".to_string()));
586 }
587
588 let result = self.execute_service_call(&selected_server.url, request_data.clone()).await;
589
590 match &result {
591 Ok(_) => {
592 self.circuit_breaker.record_success();
593 }
594 Err(_) => {
595 self.circuit_breaker.record_failure();
596 }
597 }
598
599 result
600 }
601
602 async fn execute_service_call(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
615 let mut last_error = None;
616
617 for attempt in 0..self.config.max_retry_attempts {
618 match self.traffic_manager.route_request(endpoint, request_data.clone()).await {
619 Ok(response) => return Ok(response),
620 Err(_e) => {
621 let sanitized_error = DMSCError::ServiceMesh(format!("Retry attempt {} failed", attempt + 1));
622 last_error = Some(sanitized_error);
623 if attempt < self.config.max_retry_attempts - 1 {
624 tokio::time::sleep(Duration::from_millis(100 * (attempt + 1) as u64)).await;
625 }
626 }
627 }
628 }
629
630 Err(last_error.unwrap_or_else(|| DMSCError::ServiceMesh("All retry attempts failed".to_string())))
631 }
632
633 pub async fn update_service_health(&self, service_name: &str, endpoint: &str, is_healthy: bool) -> DMSCResult<()> {
645 let mut services = self.services.write().await;
646 if let Some(endpoints) = services.get_mut(service_name) {
647 if let Some(service_ep) = endpoints.iter_mut().find(|ep| ep.endpoint == endpoint) {
648 service_ep.health_status = if is_healthy {
649 DMSCServiceHealthStatus::Healthy
650 } else {
651 DMSCServiceHealthStatus::Unhealthy
652 };
653 service_ep.last_health_check = SystemTime::now();
654 }
655 }
656 Ok(())
657 }
658
659 pub fn get_circuit_breaker(&self) -> &DMSCCircuitBreaker {
665 &self.circuit_breaker
666 }
667
668 pub fn get_load_balancer(&self) -> &DMSCLoadBalancer {
674 &self.load_balancer
675 }
676
677 pub fn get_health_checker(&self) -> &DMSCHealthChecker {
683 &self.health_checker
684 }
685
686 pub fn get_traffic_manager(&self) -> &DMSCTrafficManager {
692 &self.traffic_manager
693 }
694
695 pub fn get_service_discovery(&self) -> &DMSCServiceDiscovery {
701 &self.service_discovery
702 }
703}
704
705#[cfg(feature = "pyo3")]
706#[pyo3::prelude::pymethods]
708impl DMSCServiceMesh {
709 #[new]
710 fn py_new(config: DMSCServiceMeshConfig) -> PyResult<Self> {
711 match Self::new(config) {
712 Ok(mesh) => Ok(mesh),
713 Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create service mesh: {e}"))),
714 }
715 }
716
717 #[pyo3(name = "register_service")]
719 fn register_service_impl(&self, service_name: String, endpoint: String, weight: u32) -> PyResult<()> {
720 let rt = tokio::runtime::Runtime::new().map_err(|e| {
721 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
722 })?;
723
724 rt.block_on(async {
725 self.register_service(&service_name, &endpoint, weight, None)
726 .await
727 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to register service: {e}")))
728 })
729 }
730
731 #[pyo3(name = "discover_service")]
733 fn discover_service_impl(&self, service_name: String) -> PyResult<Vec<DMSCServiceEndpoint>> {
734 let rt = tokio::runtime::Runtime::new().map_err(|e| {
735 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
736 })?;
737
738 rt.block_on(async {
739 self.discover_service(&service_name)
740 .await
741 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to discover service: {e}")))
742 })
743 }
744
745 #[pyo3(name = "update_service_health")]
747 fn update_service_health_impl(&self, service_name: String, endpoint: String, is_healthy: bool) -> PyResult<()> {
748 let rt = tokio::runtime::Runtime::new().map_err(|e| {
749 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
750 })?;
751
752 rt.block_on(async {
753 self.update_service_health(&service_name, &endpoint, is_healthy)
754 .await
755 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to update health: {e}")))
756 })
757 }
758
759 fn get_config(&self) -> DMSCServiceMeshConfig {
761 self.config.clone()
762 }
763}
764
765#[async_trait]
766impl DMSCModule for DMSCServiceMesh {
767 fn name(&self) -> &str {
773 "DMSC.ServiceMesh"
774 }
775
776 fn is_critical(&self) -> bool {
785 true
786 }
787
788 async fn start(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> DMSCResult<()> {
801 if self.config.enable_health_check {
802 self.health_checker.start_background_tasks().await?;
803 }
804
805 if self.config.enable_service_discovery {
806 self.service_discovery.start_background_tasks().await?;
807 }
808
809 if self.config.enable_traffic_management {
810 self.traffic_manager.start_background_tasks().await?;
811 }
812
813 Ok(())
814 }
815
816 async fn shutdown(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> DMSCResult<()> {
829 if self.config.enable_health_check {
830 self.health_checker.stop_background_tasks().await?;
831 }
832
833 if self.config.enable_service_discovery {
834 self.service_discovery.stop_background_tasks().await?;
835 }
836
837 if self.config.enable_traffic_management {
838 self.traffic_manager.stop_background_tasks().await?;
839 }
840
841 Ok(())
842 }
843}