dmsc/service_mesh/
service_discovery.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, SystemTime};
22use tokio::sync::RwLock;
23use tokio::task::JoinHandle;
24
25#[cfg(feature = "etcd")]
26use etcd_client::{Client, PutOptions};
27#[cfg(feature = "etcd")]
28use tokio::sync::Mutex;
29
30use crate::core::{DMSCResult, DMSCError};
31
32#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DMSCServiceInstance {
35    pub id: String,
36    pub service_name: String,
37    pub host: String,
38    pub port: u16,
39    pub metadata: HashMap<String, String>,
40    pub registered_at: SystemTime,
41    pub last_heartbeat: SystemTime,
42    pub status: DMSCServiceStatus,
43}
44
45#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
47pub enum DMSCServiceStatus {
48    Starting,
49    Running,
50    Stopping,
51    Stopped,
52    Unhealthy,
53}
54
55#[cfg(feature = "pyo3")]
56#[pyo3::prelude::pymethods]
57impl DMSCServiceInstance {
58    #[new]
59    fn py_new(
60        id: String,
61        service_name: String,
62        host: String,
63        port: u16,
64    ) -> Self {
65        Self {
66            id,
67            service_name,
68            host,
69            port,
70            metadata: HashMap::new(),
71            registered_at: SystemTime::now(),
72            last_heartbeat: SystemTime::now(),
73            status: DMSCServiceStatus::Starting,
74        }
75    }
76
77    #[getter]
78    fn id(&self) -> &str {
79        &self.id
80    }
81
82    #[getter]
83    fn service_name(&self) -> &str {
84        &self.service_name
85    }
86
87    #[getter]
88    fn host(&self) -> &str {
89        &self.host
90    }
91
92    #[getter]
93    fn port(&self) -> u16 {
94        self.port
95    }
96
97    #[getter]
98    fn status(&self) -> DMSCServiceStatus {
99        self.status.clone()
100    }
101}
102
103#[derive(Clone)]
104pub struct DMSCServiceRegistry {
105    services: Arc<RwLock<HashMap<String, Vec<DMSCServiceInstance>>>>,
106    instance_index: Arc<RwLock<HashMap<String, DMSCServiceInstance>>>,
107    #[cfg(feature = "etcd")]
108    etcd_client: Option<Arc<Mutex<Client>>>,
109    _etcd_prefix: String,
110}
111
112impl std::fmt::Debug for DMSCServiceRegistry {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        let mut debug_struct = f.debug_struct("DMSCServiceRegistry");
115        debug_struct.field("services", &self.services)
116            .field("instance_index", &self.instance_index);
117        
118        #[cfg(feature = "etcd")]
119        debug_struct.field("etcd_client", &self.etcd_client.is_some());
120        
121        debug_struct.field("_etcd_prefix", &self._etcd_prefix)
122            .finish()
123    }
124}
125
126impl Default for DMSCServiceRegistry {
127    fn default() -> Self {
128        Self::new(None, "/dms/services".to_string())
129    }
130}
131
132impl DMSCServiceRegistry {
133    #[cfg(feature = "etcd")]
134    pub fn new(etcd_client: Option<Client>, etcd_prefix: String) -> Self {
135        Self {
136            services: Arc::new(RwLock::new(HashMap::new())),
137            instance_index: Arc::new(RwLock::new(HashMap::new())),
138            etcd_client: etcd_client.map(|c| Arc::new(Mutex::new(c))),
139            _etcd_prefix: etcd_prefix,
140        }
141    }
142    
143    #[cfg(not(feature = "etcd"))]
144    pub fn new(_etcd_client: Option<()>, etcd_prefix: String) -> Self {
145        Self {
146            services: Arc::new(RwLock::new(HashMap::new())),
147            instance_index: Arc::new(RwLock::new(HashMap::new())),
148            _etcd_prefix: etcd_prefix,
149        }
150    }
151
152    pub async fn register_service(&self, instance: DMSCServiceInstance) -> DMSCResult<()> {
153        // Update in-memory registry
154        let mut services = self.services.write().await;
155        let mut instance_index = self.instance_index.write().await;
156
157        services.entry(instance.service_name.clone())
158            .or_insert_with(Vec::new)
159            .push(instance.clone());
160
161        instance_index.insert(instance.id.clone(), instance.clone());
162        
163        // Persist to etcd if client is available
164        #[cfg(feature = "etcd")]
165        if let Some(client) = &self.etcd_client {
166            let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance.id);
167            let value = serde_json::to_string(&instance)?;
168            
169            // Set with TTL of 5 minutes (300 seconds)
170            let mut client_guard = client.lock().await;
171            client_guard.put(key.as_bytes(), value.as_bytes(), Some(PutOptions::new().with_lease(300)))
172                .await
173                .map_err(|e| DMSCError::ServiceMesh(format!("Failed to register service in etcd: {}", e)))?;
174            drop(client_guard);
175        }
176
177        Ok(())
178    }
179
180    pub async fn deregister_service(&self, instance_id: &str) -> DMSCResult<()> {
181        let mut instance_index = self.instance_index.write().await;
182        
183        if let Some(instance) = instance_index.remove(instance_id) {
184            // Update in-memory registry
185            let mut services = self.services.write().await;
186            if let Some(instances) = services.get_mut(&instance.service_name) {
187                instances.retain(|inst| inst.id != instance_id);
188                
189                if instances.is_empty() {
190                    services.remove(&instance.service_name);
191                }
192            }
193            
194            // Remove from etcd if client is available
195            #[cfg(feature = "etcd")]
196            if let Some(client) = &self.etcd_client {
197                let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
198                let mut client_guard = client.lock().await;
199                client_guard.delete(key.as_bytes(), None)
200                    .await
201                    .map_err(|e| DMSCError::ServiceMesh(format!("Failed to deregister service in etcd: {}", e)))?;
202                drop(client_guard);
203            }
204        }
205
206        Ok(())
207    }
208
209    pub async fn get_service_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
210        let services = self.services.read().await;
211        let instances = services.get(service_name)
212            .cloned()
213            .unwrap_or_default();
214
215        Ok(instances)
216    }
217
218    pub async fn get_all_services(&self) -> DMSCResult<Vec<String>> {
219        let services = self.services.read().await;
220        let service_names: Vec<String> = services.keys().cloned().collect();
221        Ok(service_names)
222    }
223
224    pub async fn update_heartbeat(&self, instance_id: &str) -> DMSCResult<()> {
225        let mut instance_index = self.instance_index.write().await;
226        
227        if let Some(instance) = instance_index.get_mut(instance_id) {
228            instance.last_heartbeat = SystemTime::now();
229            
230            // Update in etcd if client is available
231            #[cfg(feature = "etcd")]
232            if let Some(client) = &self.etcd_client {
233                let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
234                let value = serde_json::to_string(instance)?;
235                let mut client_guard = client.lock().await;
236                client_guard.put(key.as_bytes(), value.as_bytes(), None)
237                    .await
238                    .map_err(|e| DMSCError::ServiceMesh(format!("Failed to update service heartbeat in etcd: {}", e)))?;
239                drop(client_guard);
240            }
241        }
242
243        Ok(())
244    }
245    
246    pub async fn update_instance_status(&self, instance_id: &str, status: DMSCServiceStatus) -> DMSCResult<()> {
247        let mut instance_index = self.instance_index.write().await;
248        
249        if let Some(instance) = instance_index.get_mut(instance_id) {
250            instance.status = status;
251            instance.last_heartbeat = SystemTime::now();
252            
253            // Update in etcd if client is available
254            #[cfg(feature = "etcd")]
255            if let Some(client) = &self.etcd_client {
256                let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
257                let value = serde_json::to_string(instance)?;
258                let mut client_guard = client.lock().await;
259                client_guard.put(key.as_bytes(), value.as_bytes(), None)
260                    .await
261                    .map_err(|e| DMSCError::ServiceMesh(format!("Failed to update service status in etcd: {}", e)))?;
262                drop(client_guard);
263            }
264        }
265
266        Ok(())
267    }
268
269    pub async fn get_healthy_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
270        let instances = self.get_service_instances(service_name).await?;
271        let healthy_instances: Vec<DMSCServiceInstance> = instances
272            .into_iter()
273            .filter(|inst| inst.status == DMSCServiceStatus::Running)
274            .collect();
275
276        Ok(healthy_instances)
277    }
278
279    pub async fn cleanup_expired_instances(&self, expiration_duration: Duration) -> DMSCResult<()> {
280        let now = SystemTime::now();
281        let mut expired_instances = Vec::new();
282
283        {
284            let instance_index = self.instance_index.read().await;
285            for (id, instance) in instance_index.iter() {
286                if let Ok(elapsed) = now.duration_since(instance.last_heartbeat) {
287                    if elapsed > expiration_duration {
288                        expired_instances.push(id.clone());
289                    }
290                }
291            }
292        }
293
294        for instance_id in expired_instances {
295            self.deregister_service(&instance_id).await?;
296        }
297
298        Ok(())
299    }
300    
301    /// Sync registry from etcd
302    #[cfg(feature = "etcd")]
303    pub async fn sync_from_etcd(&self) -> DMSCResult<()> {
304        if let Some(client) = &self.etcd_client {
305            // List all services from etcd
306            let prefix = format!("{}/", self._etcd_prefix);
307            let mut client_guard = client.lock().await;
308            let response = client_guard.get(prefix.as_bytes(), Some(etcd_client::GetOptions::new().with_prefix()))
309                .await
310                .map_err(|e| DMSCError::ServiceMesh(format!("Failed to sync from etcd: {}", e)))?;
311            drop(client_guard);
312            
313            // Clear current in-memory registry
314            let mut services = self.services.write().await;
315            let mut instance_index = self.instance_index.write().await;
316            services.clear();
317            instance_index.clear();
318            
319            // Reconstruct from etcd data
320            for kv in response.kvs() {
321                let instance: DMSCServiceInstance = serde_json::from_slice(kv.value())?;
322                
323                services.entry(instance.service_name.clone())
324                    .or_insert_with(Vec::new)
325                    .push(instance.clone());
326                
327                instance_index.insert(instance.id.clone(), instance);
328            }
329        }
330        
331        Ok(())
332    }
333    
334    /// Start etcd watcher to sync changes in real-time
335    #[cfg(feature = "etcd")]
336    pub async fn start_etcd_watcher(&self) -> DMSCResult<JoinHandle<()>> {
337        if let Some(client) = &self.etcd_client {
338            let client = client.clone();
339            let prefix = self._etcd_prefix.clone();
340            let registry = self.clone();
341            
342            let handle = tokio::spawn(async move {
343                let prefix = format!("{}/", prefix);
344                
345                loop {
346                    let mut client_guard = client.lock().await;
347                    let (_watcher, mut stream) = client_guard.watch(prefix.as_bytes(), Some(etcd_client::WatchOptions::new().with_prefix()))
348                        .await
349                        .expect("Failed to start etcd watcher");
350                    drop(client_guard); // Release the lock before awaiting
351                    
352                    while let Ok(Some(watch_response)) = stream.message().await {
353                        for event in watch_response.events() {
354                            match event.event_type() {
355                                etcd_client::EventType::Put => {
356                                    // Update or add instance
357                                    if let Some(kv) = event.kv() {
358                                        if let Ok(instance) = serde_json::from_slice(kv.value()) {
359                                            let _ = registry.register_service(instance).await;
360                                        }
361                                    }
362                                },
363                                etcd_client::EventType::Delete => {
364                                    // Delete instance - we'd need to parse the key to get instance ID
365                                    // This is simplified for now
366                                },
367                            }
368                        }
369                    }
370                }
371            });
372            
373            Ok(handle)
374        } else {
375            Err(DMSCError::ServiceMesh("No etcd client available".to_string()))
376        }
377    }
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct DMSCEtcdConfig {
382    pub endpoints: Vec<String>,
383    pub username: Option<String>,
384    pub password: Option<String>,
385    pub prefix: String,
386}
387
388impl Default for DMSCEtcdConfig {
389    fn default() -> Self {
390        Self {
391            endpoints: vec!["http://localhost:2379".to_string()],
392            username: None,
393            password: None,
394            prefix: "/dms/services".to_string(),
395        }
396    }
397}
398
399#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
400#[derive(Debug, Clone)]
401pub struct DMSCServiceDiscovery {
402    enabled: bool,
403    registry: Arc<DMSCServiceRegistry>,
404    background_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
405    cleanup_interval: Duration,
406    _etcd_config: Option<DMSCEtcdConfig>,
407}
408
409impl DMSCServiceDiscovery {
410    pub fn new(enabled: bool) -> Self {
411        Self {
412            enabled,
413            registry: Arc::new(DMSCServiceRegistry::new(None, "/dms/services".to_string())),
414            background_tasks: Arc::new(RwLock::new(Vec::new())),
415            cleanup_interval: Duration::from_secs(60),
416            _etcd_config: None,
417        }
418    }
419    
420    #[cfg(feature = "etcd")]
421    pub async fn new_with_etcd(enabled: bool, etcd_config: DMSCEtcdConfig) -> DMSCResult<Self> {
422        // Create etcd client
423        let client = Client::connect(etcd_config.endpoints.clone(), None)
424            .await
425            .map_err(|e| DMSCError::ServiceMesh(format!("Failed to connect to etcd: {}", e)))?;
426        
427        let registry = Arc::new(DMSCServiceRegistry::new(Some(client), etcd_config.prefix.clone()));
428        
429        let discovery = Self {
430            enabled,
431            registry,
432            background_tasks: Arc::new(RwLock::new(Vec::new())),
433            cleanup_interval: Duration::from_secs(60),
434            _etcd_config: Some(etcd_config),
435        };
436        
437        // Sync from etcd on startup
438        discovery.registry.sync_from_etcd().await?;
439        
440        Ok(discovery)
441    }
442
443    pub async fn register_service(
444        &self,
445        service_name: &str,
446        host: &str,
447        port: u16,
448        metadata: HashMap<String, String>,
449    ) -> DMSCResult<String> {
450        if !self.enabled {
451            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
452        }
453
454        let instance_id = format!("{service_name}:{host}:{port}");
455        let instance = DMSCServiceInstance {
456            id: instance_id.clone(),
457            service_name: service_name.to_string(),
458            host: host.to_string(),
459            port,
460            metadata,
461            registered_at: SystemTime::now(),
462            last_heartbeat: SystemTime::now(),
463            status: DMSCServiceStatus::Starting,
464        };
465
466        self.registry.register_service(instance).await?;
467        Ok(instance_id)
468    }
469
470    pub async fn deregister_service(&self, instance_id: &str) -> DMSCResult<()> {
471        if !self.enabled {
472            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
473        }
474
475        self.registry.deregister_service(instance_id).await
476    }
477
478    pub async fn discover_service(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
479        if !self.enabled {
480            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
481        }
482
483        self.registry.get_healthy_instances(service_name).await
484    }
485
486    pub async fn update_heartbeat(&self, instance_id: &str) -> DMSCResult<()> {
487        if !self.enabled {
488            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
489        }
490
491        self.registry.update_heartbeat(instance_id).await
492    }
493    
494    pub async fn set_service_status(&self, instance_id: &str, status: DMSCServiceStatus) -> DMSCResult<()> {
495        if !self.enabled {
496            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
497        }
498
499        self.registry.update_instance_status(instance_id, status).await
500    }
501
502    pub async fn get_service_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
503        if !self.enabled {
504            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
505        }
506
507        self.registry.get_service_instances(service_name).await
508    }
509
510    pub async fn get_all_services(&self) -> DMSCResult<Vec<String>> {
511        if !self.enabled {
512            return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
513        }
514
515        self.registry.get_all_services().await
516    }
517
518    pub async fn start_background_tasks(&self) -> DMSCResult<()> {
519        if !self.enabled {
520            return Ok(());
521        }
522
523        let registry_clone = Arc::clone(&self.registry);
524
525        let cleanup_interval = self.cleanup_interval;
526
527        // Start cleanup task
528        let cleanup_task = tokio::spawn(async move {
529            let mut interval = tokio::time::interval(cleanup_interval);
530            loop {
531                interval.tick().await;
532                if let Err(e) = registry_clone.cleanup_expired_instances(Duration::from_secs(300)).await {
533                    log::warn!("Failed to cleanup expired instances: {e}");
534                }
535            }
536        });
537
538        let mut tasks = self.background_tasks.write().await;
539        tasks.push(cleanup_task);
540        
541        // Start etcd watcher if etcd is configured
542        #[cfg(feature = "etcd")]
543        if self._etcd_config.is_some() {
544            let watcher_task = self.registry.start_etcd_watcher().await?;
545            tasks.push(watcher_task);
546            
547            // Start periodic sync from etcd (every 30 seconds)
548            let registry_clone = Arc::clone(&self.registry);
549            let sync_task = tokio::spawn(async move {
550                let mut interval = tokio::time::interval(Duration::from_secs(30));
551                loop {
552                    interval.tick().await;
553                    if let Err(e) = registry_clone.sync_from_etcd().await {
554                        log::warn!("Failed to sync from etcd: {e}");
555                    }
556                }
557            });
558            tasks.push(sync_task);
559        }
560
561        Ok(())
562    }
563
564    pub async fn stop_background_tasks(&self) -> DMSCResult<()> {
565        let mut tasks = self.background_tasks.write().await;
566        for task in tasks.drain(..) {
567            task.abort();
568        }
569        Ok(())
570    }
571
572    pub async fn health_check(&self) -> DMSCResult<bool> {
573        Ok(self.enabled)
574    }
575}