1use async_trait::async_trait;
89use serde::{Deserialize, Serialize};
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Duration, SystemTime};
93use tokio::sync::RwLock;
94use tokio::task::JoinHandle;
95
96#[cfg(feature = "pyo3")]
97use pyo3::PyResult;
98#[cfg(feature = "service_mesh")]
99use hyper;
100
101use crate::core::{DMSCResult, DMSCError};
102use crate::observability::{DMSCTracer, DMSCSpanKind, DMSCSpanStatus};
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct DMSCHealthCheckConfig {
110 pub endpoint: String,
112 pub method: String,
114 pub timeout: Duration,
116 pub expected_status_code: u16,
118 pub expected_response_body: Option<String>,
120 pub headers: HashMap<String, String>,
122}
123
124impl Default for DMSCHealthCheckConfig {
125 fn default() -> Self {
131 Self {
132 endpoint: "/health".to_string(),
133 method: "GET".to_string(),
134 timeout: Duration::from_secs(5),
135 expected_status_code: 200,
136 expected_response_body: None,
137 headers: HashMap::new(),
138 }
139 }
140}
141
142#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
147#[derive(Debug, Clone)]
148pub struct DMSCHealthCheckResult {
149 pub service_name: String,
151 pub endpoint: String,
153 pub is_healthy: bool,
155 pub status_code: Option<u16>,
157 pub response_time: Duration,
159 pub error_message: Option<String>,
161 pub timestamp: SystemTime,
163}
164
165#[cfg(feature = "pyo3")]
166#[pyo3::prelude::pymethods]
167impl DMSCHealthCheckResult {
168 fn get_service_name(&self) -> String {
169 self.service_name.clone()
170 }
171
172 fn get_endpoint(&self) -> String {
173 self.endpoint.clone()
174 }
175
176 fn get_is_healthy(&self) -> bool {
177 self.is_healthy
178 }
179
180 fn get_status_code(&self) -> Option<u16> {
181 self.status_code
182 }
183
184 fn get_response_time_ms(&self) -> u64 {
185 self.response_time.as_millis() as u64
186 }
187
188 fn get_error_message(&self) -> Option<String> {
189 self.error_message.clone()
190 }
191}
192
193#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
198pub enum DMSCHealthCheckType {
199 Http,
201 Tcp,
203 Grpc,
205 Custom,
207}
208
209#[async_trait]
214pub trait DMSCHealthCheckProvider: Send + Sync {
215 async fn check_health(&self, endpoint: &str, config: &DMSCHealthCheckConfig) -> DMSCResult<DMSCHealthCheckResult>;
226}
227
228pub struct DMSCHttpHealthCheckProvider;
232
233#[async_trait]
234impl DMSCHealthCheckProvider for DMSCHttpHealthCheckProvider {
235 #[cfg(feature = "service_mesh")]
246 async fn check_health(&self, endpoint: &str, _config: &DMSCHealthCheckConfig) -> DMSCResult<DMSCHealthCheckResult> {
247 let start_time = SystemTime::now();
248
249 let client = hyper::Client::new();
250
251 let uri: hyper::Uri = endpoint.parse()
252 .map_err(|e| DMSCError::ServiceMesh(format!("Invalid URI: {e}")))?;
253
254 let req = hyper::Request::builder()
255 .method(_config.method.as_str())
256 .uri(uri)
257 .body(hyper::body::Body::empty())
258 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to build request: {e}")))?;
259
260 match client.request(req).await {
261 Ok(response) => {
262 let status_code = response.status().as_u16();
263 let is_healthy = status_code == _config.expected_status_code;
264 let response_time = SystemTime::now().duration_since(start_time)
265 .unwrap_or(Duration::from_secs(0));
266
267 let error_message = if !is_healthy {
268 Some(format!("Expected status code {}, got {}", _config.expected_status_code, status_code))
269 } else {
270 None
271 };
272
273 Ok(DMSCHealthCheckResult {
274 service_name: "unknown".to_string(),
275 endpoint: endpoint.to_string(),
276 is_healthy,
277 status_code: Some(status_code),
278 response_time,
279 error_message,
280 timestamp: SystemTime::now(),
281 })
282 }
283 Err(e) => {
284 let response_time = SystemTime::now().duration_since(start_time)
285 .unwrap_or(Duration::from_secs(0));
286
287 Ok(DMSCHealthCheckResult {
288 service_name: "unknown".to_string(),
289 endpoint: endpoint.to_string(),
290 is_healthy: false,
291 status_code: None,
292 response_time,
293 error_message: Some(e.to_string()),
294 timestamp: SystemTime::now(),
295 })
296 }
297 }
298 }
299
300 #[cfg(not(feature = "service_mesh"))]
301 async fn check_health(&self, endpoint: &str, _config: &DMSCHealthCheckConfig) -> DMSCResult<DMSCHealthCheckResult> {
302 Ok(DMSCHealthCheckResult {
304 service_name: "unknown".to_string(),
305 endpoint: endpoint.to_string(),
306 is_healthy: true,
307 status_code: Some(_config.expected_status_code),
308 response_time: Duration::from_secs(0),
309 error_message: None,
310 timestamp: SystemTime::now(),
311 })
312 }
313}
314
315pub struct DMSCTcpHealthCheckProvider;
319
320#[async_trait]
321impl DMSCHealthCheckProvider for DMSCTcpHealthCheckProvider {
322 async fn check_health(&self, endpoint: &str, _config: &DMSCHealthCheckConfig) -> DMSCResult<DMSCHealthCheckResult> {
333 let start_time = SystemTime::now();
334
335 match tokio::net::TcpStream::connect(endpoint).await {
336 Ok(_) => {
337 let response_time = SystemTime::now().duration_since(start_time)
338 .unwrap_or(Duration::from_secs(0));
339
340 Ok(DMSCHealthCheckResult {
341 service_name: "unknown".to_string(),
342 endpoint: endpoint.to_string(),
343 is_healthy: true,
344 status_code: None,
345 response_time,
346 error_message: None,
347 timestamp: SystemTime::now(),
348 })
349 }
350 Err(e) => {
351 let response_time = SystemTime::now().duration_since(start_time)
352 .unwrap_or(Duration::from_secs(0));
353
354 Ok(DMSCHealthCheckResult {
355 service_name: "unknown".to_string(),
356 endpoint: endpoint.to_string(),
357 is_healthy: false,
358 status_code: None,
359 response_time,
360 error_message: Some(e.to_string()),
361 timestamp: SystemTime::now(),
362 })
363 }
364 }
365 }
366}
367
368pub struct DMSCGrpcHealthCheckProvider;
372
373#[async_trait]
374impl DMSCHealthCheckProvider for DMSCGrpcHealthCheckProvider {
375 async fn check_health(&self, endpoint: &str, _config: &DMSCHealthCheckConfig) -> DMSCResult<DMSCHealthCheckResult> {
386 let start_time = SystemTime::now();
387
388 match tokio::net::TcpStream::connect(endpoint).await {
391 Ok(_) => {
392 let response_time = SystemTime::now().duration_since(start_time)
393 .unwrap_or(Duration::from_secs(0));
394
395 Ok(DMSCHealthCheckResult {
396 service_name: "unknown".to_string(),
397 endpoint: endpoint.to_string(),
398 is_healthy: true,
399 status_code: None,
400 response_time,
401 error_message: None,
402 timestamp: SystemTime::now(),
403 })
404 }
405 Err(e) => {
406 let response_time = SystemTime::now().duration_since(start_time)
407 .unwrap_or(Duration::from_secs(0));
408
409 Ok(DMSCHealthCheckResult {
410 service_name: "unknown".to_string(),
411 endpoint: endpoint.to_string(),
412 is_healthy: false,
413 status_code: None,
414 response_time,
415 error_message: Some(e.to_string()),
416 timestamp: SystemTime::now(),
417 })
418 }
419 }
420 }
421}
422
423#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
428pub struct DMSCHealthChecker {
429 check_interval: Duration,
430 providers: Arc<RwLock<HashMap<DMSCHealthCheckType, Box<dyn DMSCHealthCheckProvider>>>>,
431 check_results: Arc<RwLock<HashMap<String, Vec<DMSCHealthCheckResult>>>>,
432 background_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
433 tracer: Option<Arc<DMSCTracer>>,
434}
435
436impl DMSCHealthChecker {
437 pub fn new(check_interval: Duration) -> Self {
438 let mut providers: HashMap<DMSCHealthCheckType, Box<dyn DMSCHealthCheckProvider>> = HashMap::new();
439 providers.insert(DMSCHealthCheckType::Http, Box::new(DMSCHttpHealthCheckProvider));
440 providers.insert(DMSCHealthCheckType::Tcp, Box::new(DMSCTcpHealthCheckProvider));
441 providers.insert(DMSCHealthCheckType::Grpc, Box::new(DMSCGrpcHealthCheckProvider));
442
443 Self {
444 check_interval,
445 providers: Arc::new(RwLock::new(providers)),
446 check_results: Arc::new(RwLock::new(HashMap::new())),
447 background_tasks: Arc::new(RwLock::new(Vec::new())),
448 tracer: None,
449 }
450 }
451
452 pub fn with_tracer(mut self, tracer: Arc<DMSCTracer>) -> Self {
453 self.tracer = Some(tracer);
454 self
455 }
456
457 pub fn set_tracer(&mut self, tracer: Arc<DMSCTracer>) {
458 self.tracer = Some(tracer);
459 }
460
461
462
463 pub async fn register_health_check(
478 &self,
479 service_name: &str,
480 endpoint: &str,
481 check_type: DMSCHealthCheckType,
482 config: DMSCHealthCheckConfig,
483 ) -> DMSCResult<()> {
484 let span_id = if let Some(tracer) = &self.tracer {
485 let span_id = tracer.start_span_from_context(
486 format!("health_check:{}", service_name),
487 DMSCSpanKind::Internal,
488 );
489 if let Some(ref sid) = span_id {
490 let _ = tracer.span_mut(sid, |span| {
491 span.set_attribute("service_name".to_string(), service_name.to_string());
492 span.set_attribute("endpoint".to_string(), endpoint.to_string());
493 span.set_attribute("check_type".to_string(), format!("{:?}", check_type));
494 });
495 }
496 span_id
497 } else {
498 None
499 };
500
501 let result = self.register_health_check_internal(service_name, endpoint, check_type, config).await;
502
503 if let (Some(tracer), Some(sid)) = (&self.tracer, span_id) {
504 let status = match &result {
505 Ok(_) => DMSCSpanStatus::Ok,
506 Err(e) => DMSCSpanStatus::Error(e.to_string()),
507 };
508 let _ = tracer.end_span(&sid, status);
509 }
510
511 result
512 }
513
514 async fn register_health_check_internal(
515 &self,
516 service_name: &str,
517 endpoint: &str,
518 check_type: DMSCHealthCheckType,
519 config: DMSCHealthCheckConfig,
520 ) -> DMSCResult<()> {
521 let providers = self.providers.read().await;
522 let provider = providers.get(&check_type)
523 .ok_or_else(|| DMSCError::ServiceMesh(format!("Health check provider for {check_type:?} not found")))?;
524
525 let result = provider.check_health(endpoint, &config).await?;
526
527 let mut check_results = self.check_results.write().await;
528 let service_results = check_results.entry(service_name.to_string())
529 .or_insert_with(Vec::new);
530 service_results.push(result);
531
532 Ok(())
533 }
534
535 pub async fn start_health_check(&self, service_name: &str, endpoint: &str) -> DMSCResult<()> {
548 let mut tasks = self.background_tasks.write().await;
549
550 let service_name_clone = service_name.to_string();
551 let endpoint_clone = endpoint.to_string();
552 let check_interval = self.check_interval;
553 let providers = Arc::clone(&self.providers);
554 let check_results = Arc::clone(&self.check_results);
555
556 let check_type = if endpoint.starts_with("grpc://") || endpoint.starts_with("grpcs://") {
558 DMSCHealthCheckType::Grpc
559 } else if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
560 DMSCHealthCheckType::Http
561 } else {
562 DMSCHealthCheckType::Tcp
564 };
565
566 let task = tokio::spawn(async move {
567 let mut interval = tokio::time::interval(check_interval);
568 let config = DMSCHealthCheckConfig::default();
569
570 loop {
571 interval.tick().await;
572
573 let providers_guard = providers.read().await;
574 if let Some(provider) = providers_guard.get(&check_type) {
575 match provider.check_health(&endpoint_clone, &config).await {
576 Ok(result) => {
577 let mut results = check_results.write().await;
578 let service_results = results.entry(service_name_clone.clone())
579 .or_insert_with(Vec::new);
580
581 service_results.push(result);
583
584 if service_results.len() > 100 {
586 service_results.drain(0..service_results.len() - 100);
587 }
588 }
589 Err(e) => {
590 log::warn!("Health check failed for {endpoint_clone}: {e}");
591 }
592 }
593 }
594 }
595 });
596
597 tasks.push(task);
598 Ok(())
599 }
600
601 pub async fn stop_health_check(&self, service_name: &str, _endpoint: &str) -> DMSCResult<()> {
615 let mut results = self.check_results.write().await;
616 results.remove(service_name);
617 Ok(())
618 }
619
620 pub async fn start_health_check_with_type(
635 &self,
636 service_name: &str,
637 endpoint: &str,
638 check_type: DMSCHealthCheckType
639 ) -> DMSCResult<()> {
640 let mut tasks = self.background_tasks.write().await;
641
642 let service_name_clone = service_name.to_string();
643 let endpoint_clone = endpoint.to_string();
644 let check_interval = self.check_interval;
645 let providers = Arc::clone(&self.providers);
646 let check_results = Arc::clone(&self.check_results);
647 let check_type_clone = check_type;
648
649 let task = tokio::spawn(async move {
650 let mut interval = tokio::time::interval(check_interval);
651 let config = DMSCHealthCheckConfig::default();
652
653 loop {
654 interval.tick().await;
655
656 let providers_guard = providers.read().await;
657 if let Some(provider) = providers_guard.get(&check_type_clone) {
658 match provider.check_health(&endpoint_clone, &config).await {
659 Ok(result) => {
660 let mut results = check_results.write().await;
661 let service_results = results.entry(service_name_clone.clone())
662 .or_insert_with(Vec::new);
663
664 service_results.push(result);
666
667 if service_results.len() > 100 {
669 service_results.drain(0..service_results.len() - 100);
670 }
671 }
672 Err(e) => {
673 log::warn!("Health check failed for {endpoint_clone}: {e}");
674 }
675 }
676 }
677 }
678 });
679
680 tasks.push(task);
681 Ok(())
682 }
683
684 pub async fn get_health_status(&self, service_name: &str) -> DMSCResult<Vec<DMSCHealthCheckResult>> {
694 let check_results = self.check_results.read().await;
695 let results = check_results.get(service_name)
696 .cloned()
697 .unwrap_or_default();
698
699 Ok(results)
700 }
701
702 pub async fn get_latest_health_status(&self, service_name: &str) -> DMSCResult<Option<DMSCHealthCheckResult>> {
712 let check_results = self.check_results.read().await;
713 let latest_result = check_results.get(service_name)
714 .and_then(|results| results.last().cloned());
715
716 Ok(latest_result)
717 }
718
719 pub async fn get_health_status_within(&self, service_name: &str, time_window: Duration) -> DMSCResult<Vec<DMSCHealthCheckResult>> {
730 let check_results = self.check_results.read().await;
731 let now = SystemTime::now();
732
733 let results = check_results.get(service_name)
734 .map(|results| {
735 results.iter()
736 .filter(|r| {
737 if let Ok(elapsed) = now.duration_since(r.timestamp) {
738 elapsed <= time_window
739 } else {
740 false
741 }
742 })
743 .cloned()
744 .collect()
745 })
746 .unwrap_or_default();
747
748 Ok(results)
749 }
750
751 pub async fn get_service_health_summary(&self, service_name: &str) -> DMSCResult<DMSCHealthSummary> {
764 let results = self.get_health_status(service_name).await?;
765
766 if results.is_empty() {
767 return Ok(DMSCHealthSummary {
768 service_name: service_name.to_string(),
769 total_checks: 0,
770 healthy_checks: 0,
771 unhealthy_checks: 0,
772 success_rate: 0.0,
773 average_response_time: Duration::from_secs(0),
774 last_check_time: None,
775 overall_status: DMSCHealthStatus::Unknown,
776 });
777 }
778
779 let total_checks = results.len();
780 let healthy_checks = results.iter().filter(|r| r.is_healthy).count();
781 let unhealthy_checks = total_checks - healthy_checks;
782 let success_rate = (healthy_checks as f64) / (total_checks as f64) * 100.0;
783
784 let total_response_time: Duration = results.iter()
785 .map(|r| r.response_time)
786 .sum();
787 let average_response_time = total_response_time / total_checks as u32;
788
789 let last_check_time = results.last().map(|r| r.timestamp);
790
791 let overall_status = if success_rate >= 80.0 {
792 DMSCHealthStatus::Healthy
793 } else if success_rate >= 50.0 {
794 DMSCHealthStatus::Degraded
795 } else {
796 DMSCHealthStatus::Unhealthy
797 };
798
799 Ok(DMSCHealthSummary {
800 service_name: service_name.to_string(),
801 total_checks,
802 healthy_checks,
803 unhealthy_checks,
804 success_rate,
805 average_response_time,
806 last_check_time,
807 overall_status,
808 })
809 }
810
811 pub async fn start_background_tasks(&self) -> DMSCResult<()> {
820 let check_results = Arc::clone(&self.check_results);
822 let cleanup_interval = self.check_interval * 10; let cleanup_task = tokio::spawn(async move {
825 let mut interval = tokio::time::interval(cleanup_interval);
826
827 loop {
828 interval.tick().await;
829
830 let mut results = check_results.write().await;
831 let now = SystemTime::now();
832 let max_age = Duration::from_secs(3600); for service_results in results.values_mut() {
836 service_results.retain(|result| {
837 now.duration_since(result.timestamp)
838 .map(|age| age < max_age)
839 .unwrap_or(false)
840 });
841 }
842
843 results.retain(|_, results| !results.is_empty());
845 }
846 });
847
848 let mut tasks = self.background_tasks.write().await;
850 tasks.push(cleanup_task);
851
852 log::info!("Background health check tasks started successfully");
853 Ok(())
854 }
855
856 pub async fn stop_background_tasks(&self) -> DMSCResult<()> {
864 let mut tasks = self.background_tasks.write().await;
865 for task in tasks.drain(..) {
866 task.abort();
867 }
868 Ok(())
869 }
870
871 pub async fn health_check(&self) -> DMSCResult<bool> {
877 Ok(true)
878 }
879}
880
881#[cfg(feature = "pyo3")]
882#[pyo3::prelude::pymethods]
884impl DMSCHealthChecker {
885 #[new]
886 fn py_new(check_interval: u64) -> PyResult<Self> {
887 Ok(Self::new(Duration::from_secs(check_interval)))
888 }
889
890 #[pyo3(name = "get_service_health_summary")]
892 fn get_service_health_summary_impl(&self, service_name: String) -> PyResult<DMSCHealthSummary> {
893 let rt = tokio::runtime::Runtime::new().map_err(|e| {
894 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
895 })?;
896
897 rt.block_on(async {
898 self.get_service_health_summary(&service_name)
899 .await
900 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to get health summary: {e}")))
901 })
902 }
903
904 #[pyo3(name = "start_health_check")]
906 fn start_health_check_impl(&self, service_name: String, endpoint: String) -> PyResult<()> {
907 let rt = tokio::runtime::Runtime::new().map_err(|e| {
908 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
909 })?;
910
911 rt.block_on(async {
912 self.start_health_check(&service_name, &endpoint)
913 .await
914 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to start health check: {e}")))
915 })
916 }
917
918 #[pyo3(name = "stop_health_check")]
920 fn stop_health_check_impl(&self, service_name: String, endpoint: String) -> PyResult<()> {
921 let rt = tokio::runtime::Runtime::new().map_err(|e| {
922 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
923 })?;
924
925 rt.block_on(async {
926 self.stop_health_check(&service_name, &endpoint)
927 .await
928 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to stop health check: {e}")))
929 })
930 }
931
932 #[pyo3(name = "get_health_status")]
934 fn get_health_status_impl(&self, service_name: String) -> PyResult<Vec<DMSCHealthCheckResult>> {
935 let rt = tokio::runtime::Runtime::new().map_err(|e| {
936 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
937 })?;
938
939 rt.block_on(async {
940 self.get_health_status(&service_name)
941 .await
942 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to get health status: {e}")))
943 })
944 }
945}
946
947#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
951#[derive(Debug, Clone)]
952pub enum DMSCHealthStatus {
953 Healthy,
955 Degraded,
957 Unhealthy,
959 Unknown,
961}
962
963#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
968#[derive(Debug, Clone)]
969pub struct DMSCHealthSummary {
970 pub service_name: String,
972 pub total_checks: usize,
974 pub healthy_checks: usize,
976 pub unhealthy_checks: usize,
978 pub success_rate: f64,
980 pub average_response_time: Duration,
982 pub last_check_time: Option<SystemTime>,
984 pub overall_status: DMSCHealthStatus,
986}
987
988#[cfg(feature = "pyo3")]
989#[pyo3::prelude::pymethods]
990impl DMSCHealthSummary {
991 fn get_service_name(&self) -> String {
992 self.service_name.clone()
993 }
994
995 fn get_total_checks(&self) -> usize {
996 self.total_checks
997 }
998
999 fn get_healthy_checks(&self) -> usize {
1000 self.healthy_checks
1001 }
1002
1003 fn get_unhealthy_checks(&self) -> usize {
1004 self.unhealthy_checks
1005 }
1006
1007 fn get_success_rate(&self) -> f64 {
1008 self.success_rate
1009 }
1010
1011 fn get_average_response_time_ms(&self) -> u64 {
1012 self.average_response_time.as_millis() as u64
1013 }
1014
1015 fn get_overall_status(&self) -> String {
1016 match self.overall_status {
1017 DMSCHealthStatus::Healthy => "Healthy".to_string(),
1018 DMSCHealthStatus::Degraded => "Degraded".to_string(),
1019 DMSCHealthStatus::Unhealthy => "Unhealthy".to_string(),
1020 DMSCHealthStatus::Unknown => "Unknown".to_string(),
1021 }
1022 }
1023}