1use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, SystemTime};
22use tokio::sync::RwLock;
23use tokio::task::JoinHandle;
24
25#[cfg(feature = "etcd")]
26use etcd_client::{Client, PutOptions};
27#[cfg(feature = "etcd")]
28use tokio::sync::Mutex;
29
30use crate::core::{DMSCResult, DMSCError};
31
32#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DMSCServiceInstance {
35 pub id: String,
36 pub service_name: String,
37 pub host: String,
38 pub port: u16,
39 pub metadata: HashMap<String, String>,
40 pub registered_at: SystemTime,
41 pub last_heartbeat: SystemTime,
42 pub status: DMSCServiceStatus,
43}
44
45#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
47pub enum DMSCServiceStatus {
48 Starting,
49 Running,
50 Stopping,
51 Stopped,
52 Unhealthy,
53}
54
55#[cfg(feature = "pyo3")]
56#[pyo3::prelude::pymethods]
57impl DMSCServiceInstance {
58 #[new]
59 fn py_new(
60 id: String,
61 service_name: String,
62 host: String,
63 port: u16,
64 ) -> Self {
65 Self {
66 id,
67 service_name,
68 host,
69 port,
70 metadata: HashMap::new(),
71 registered_at: SystemTime::now(),
72 last_heartbeat: SystemTime::now(),
73 status: DMSCServiceStatus::Starting,
74 }
75 }
76
77 #[getter]
78 fn id(&self) -> &str {
79 &self.id
80 }
81
82 #[getter]
83 fn service_name(&self) -> &str {
84 &self.service_name
85 }
86
87 #[getter]
88 fn host(&self) -> &str {
89 &self.host
90 }
91
92 #[getter]
93 fn port(&self) -> u16 {
94 self.port
95 }
96
97 #[getter]
98 fn status(&self) -> DMSCServiceStatus {
99 self.status.clone()
100 }
101}
102
103#[derive(Clone)]
104pub struct DMSCServiceRegistry {
105 services: Arc<RwLock<HashMap<String, Vec<DMSCServiceInstance>>>>,
106 instance_index: Arc<RwLock<HashMap<String, DMSCServiceInstance>>>,
107 #[cfg(feature = "etcd")]
108 etcd_client: Option<Arc<Mutex<Client>>>,
109 _etcd_prefix: String,
110}
111
112impl std::fmt::Debug for DMSCServiceRegistry {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 let mut debug_struct = f.debug_struct("DMSCServiceRegistry");
115 debug_struct.field("services", &self.services)
116 .field("instance_index", &self.instance_index);
117
118 #[cfg(feature = "etcd")]
119 debug_struct.field("etcd_client", &self.etcd_client.is_some());
120
121 debug_struct.field("_etcd_prefix", &self._etcd_prefix)
122 .finish()
123 }
124}
125
126impl Default for DMSCServiceRegistry {
127 fn default() -> Self {
128 Self::new(None, "/dms/services".to_string())
129 }
130}
131
132impl DMSCServiceRegistry {
133 #[cfg(feature = "etcd")]
134 pub fn new(etcd_client: Option<Client>, etcd_prefix: String) -> Self {
135 Self {
136 services: Arc::new(RwLock::new(HashMap::new())),
137 instance_index: Arc::new(RwLock::new(HashMap::new())),
138 etcd_client: etcd_client.map(|c| Arc::new(Mutex::new(c))),
139 _etcd_prefix: etcd_prefix,
140 }
141 }
142
143 #[cfg(not(feature = "etcd"))]
144 pub fn new(_etcd_client: Option<()>, etcd_prefix: String) -> Self {
145 Self {
146 services: Arc::new(RwLock::new(HashMap::new())),
147 instance_index: Arc::new(RwLock::new(HashMap::new())),
148 _etcd_prefix: etcd_prefix,
149 }
150 }
151
152 pub async fn register_service(&self, instance: DMSCServiceInstance) -> DMSCResult<()> {
153 let mut services = self.services.write().await;
155 let mut instance_index = self.instance_index.write().await;
156
157 services.entry(instance.service_name.clone())
158 .or_insert_with(Vec::new)
159 .push(instance.clone());
160
161 instance_index.insert(instance.id.clone(), instance.clone());
162
163 #[cfg(feature = "etcd")]
165 if let Some(client) = &self.etcd_client {
166 let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance.id);
167 let value = serde_json::to_string(&instance)?;
168
169 let mut client_guard = client.lock().await;
171 client_guard.put(key.as_bytes(), value.as_bytes(), Some(PutOptions::new().with_lease(300)))
172 .await
173 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to register service in etcd: {}", e)))?;
174 drop(client_guard);
175 }
176
177 Ok(())
178 }
179
180 pub async fn deregister_service(&self, instance_id: &str) -> DMSCResult<()> {
181 let mut instance_index = self.instance_index.write().await;
182
183 if let Some(instance) = instance_index.remove(instance_id) {
184 let mut services = self.services.write().await;
186 if let Some(instances) = services.get_mut(&instance.service_name) {
187 instances.retain(|inst| inst.id != instance_id);
188
189 if instances.is_empty() {
190 services.remove(&instance.service_name);
191 }
192 }
193
194 #[cfg(feature = "etcd")]
196 if let Some(client) = &self.etcd_client {
197 let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
198 let mut client_guard = client.lock().await;
199 client_guard.delete(key.as_bytes(), None)
200 .await
201 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to deregister service in etcd: {}", e)))?;
202 drop(client_guard);
203 }
204 }
205
206 Ok(())
207 }
208
209 pub async fn get_service_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
210 let services = self.services.read().await;
211 let instances = services.get(service_name)
212 .cloned()
213 .unwrap_or_default();
214
215 Ok(instances)
216 }
217
218 pub async fn get_all_services(&self) -> DMSCResult<Vec<String>> {
219 let services = self.services.read().await;
220 let service_names: Vec<String> = services.keys().cloned().collect();
221 Ok(service_names)
222 }
223
224 pub async fn update_heartbeat(&self, instance_id: &str) -> DMSCResult<()> {
225 let mut instance_index = self.instance_index.write().await;
226
227 if let Some(instance) = instance_index.get_mut(instance_id) {
228 instance.last_heartbeat = SystemTime::now();
229
230 #[cfg(feature = "etcd")]
232 if let Some(client) = &self.etcd_client {
233 let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
234 let value = serde_json::to_string(instance)?;
235 let mut client_guard = client.lock().await;
236 client_guard.put(key.as_bytes(), value.as_bytes(), None)
237 .await
238 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to update service heartbeat in etcd: {}", e)))?;
239 drop(client_guard);
240 }
241 }
242
243 Ok(())
244 }
245
246 pub async fn update_instance_status(&self, instance_id: &str, status: DMSCServiceStatus) -> DMSCResult<()> {
247 let mut instance_index = self.instance_index.write().await;
248
249 if let Some(instance) = instance_index.get_mut(instance_id) {
250 instance.status = status;
251 instance.last_heartbeat = SystemTime::now();
252
253 #[cfg(feature = "etcd")]
255 if let Some(client) = &self.etcd_client {
256 let key = format!("{}/{}/{}", self._etcd_prefix, instance.service_name, instance_id);
257 let value = serde_json::to_string(instance)?;
258 let mut client_guard = client.lock().await;
259 client_guard.put(key.as_bytes(), value.as_bytes(), None)
260 .await
261 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to update service status in etcd: {}", e)))?;
262 drop(client_guard);
263 }
264 }
265
266 Ok(())
267 }
268
269 pub async fn get_healthy_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
270 let instances = self.get_service_instances(service_name).await?;
271 let healthy_instances: Vec<DMSCServiceInstance> = instances
272 .into_iter()
273 .filter(|inst| inst.status == DMSCServiceStatus::Running)
274 .collect();
275
276 Ok(healthy_instances)
277 }
278
279 pub async fn cleanup_expired_instances(&self, expiration_duration: Duration) -> DMSCResult<()> {
280 let now = SystemTime::now();
281 let mut expired_instances = Vec::new();
282
283 {
284 let instance_index = self.instance_index.read().await;
285 for (id, instance) in instance_index.iter() {
286 if let Ok(elapsed) = now.duration_since(instance.last_heartbeat) {
287 if elapsed > expiration_duration {
288 expired_instances.push(id.clone());
289 }
290 }
291 }
292 }
293
294 for instance_id in expired_instances {
295 self.deregister_service(&instance_id).await?;
296 }
297
298 Ok(())
299 }
300
301 #[cfg(feature = "etcd")]
303 pub async fn sync_from_etcd(&self) -> DMSCResult<()> {
304 if let Some(client) = &self.etcd_client {
305 let prefix = format!("{}/", self._etcd_prefix);
307 let mut client_guard = client.lock().await;
308 let response = client_guard.get(prefix.as_bytes(), Some(etcd_client::GetOptions::new().with_prefix()))
309 .await
310 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to sync from etcd: {}", e)))?;
311 drop(client_guard);
312
313 let mut services = self.services.write().await;
315 let mut instance_index = self.instance_index.write().await;
316 services.clear();
317 instance_index.clear();
318
319 for kv in response.kvs() {
321 let instance: DMSCServiceInstance = serde_json::from_slice(kv.value())?;
322
323 services.entry(instance.service_name.clone())
324 .or_insert_with(Vec::new)
325 .push(instance.clone());
326
327 instance_index.insert(instance.id.clone(), instance);
328 }
329 }
330
331 Ok(())
332 }
333
334 #[cfg(feature = "etcd")]
336 pub async fn start_etcd_watcher(&self) -> DMSCResult<JoinHandle<()>> {
337 if let Some(client) = &self.etcd_client {
338 let client = client.clone();
339 let prefix = self._etcd_prefix.clone();
340 let registry = self.clone();
341
342 let handle = tokio::spawn(async move {
343 let prefix = format!("{}/", prefix);
344
345 loop {
346 let mut client_guard = client.lock().await;
347 let (_watcher, mut stream) = client_guard.watch(prefix.as_bytes(), Some(etcd_client::WatchOptions::new().with_prefix()))
348 .await
349 .expect("Failed to start etcd watcher");
350 drop(client_guard); while let Ok(Some(watch_response)) = stream.message().await {
353 for event in watch_response.events() {
354 match event.event_type() {
355 etcd_client::EventType::Put => {
356 if let Some(kv) = event.kv() {
358 if let Ok(instance) = serde_json::from_slice(kv.value()) {
359 let _ = registry.register_service(instance).await;
360 }
361 }
362 },
363 etcd_client::EventType::Delete => {
364 },
367 }
368 }
369 }
370 }
371 });
372
373 Ok(handle)
374 } else {
375 Err(DMSCError::ServiceMesh("No etcd client available".to_string()))
376 }
377 }
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct DMSCEtcdConfig {
382 pub endpoints: Vec<String>,
383 pub username: Option<String>,
384 pub password: Option<String>,
385 pub prefix: String,
386}
387
388impl Default for DMSCEtcdConfig {
389 fn default() -> Self {
390 Self {
391 endpoints: vec!["http://localhost:2379".to_string()],
392 username: None,
393 password: None,
394 prefix: "/dms/services".to_string(),
395 }
396 }
397}
398
399#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
400#[derive(Debug, Clone)]
401pub struct DMSCServiceDiscovery {
402 enabled: bool,
403 registry: Arc<DMSCServiceRegistry>,
404 background_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
405 cleanup_interval: Duration,
406 _etcd_config: Option<DMSCEtcdConfig>,
407}
408
409impl DMSCServiceDiscovery {
410 pub fn new(enabled: bool) -> Self {
411 Self {
412 enabled,
413 registry: Arc::new(DMSCServiceRegistry::new(None, "/dms/services".to_string())),
414 background_tasks: Arc::new(RwLock::new(Vec::new())),
415 cleanup_interval: Duration::from_secs(60),
416 _etcd_config: None,
417 }
418 }
419
420 #[cfg(feature = "etcd")]
421 pub async fn new_with_etcd(enabled: bool, etcd_config: DMSCEtcdConfig) -> DMSCResult<Self> {
422 let client = Client::connect(etcd_config.endpoints.clone(), None)
424 .await
425 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to connect to etcd: {}", e)))?;
426
427 let registry = Arc::new(DMSCServiceRegistry::new(Some(client), etcd_config.prefix.clone()));
428
429 let discovery = Self {
430 enabled,
431 registry,
432 background_tasks: Arc::new(RwLock::new(Vec::new())),
433 cleanup_interval: Duration::from_secs(60),
434 _etcd_config: Some(etcd_config),
435 };
436
437 discovery.registry.sync_from_etcd().await?;
439
440 Ok(discovery)
441 }
442
443 pub async fn register_service(
444 &self,
445 service_name: &str,
446 host: &str,
447 port: u16,
448 metadata: HashMap<String, String>,
449 ) -> DMSCResult<String> {
450 if !self.enabled {
451 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
452 }
453
454 let instance_id = format!("{service_name}:{host}:{port}");
455 let instance = DMSCServiceInstance {
456 id: instance_id.clone(),
457 service_name: service_name.to_string(),
458 host: host.to_string(),
459 port,
460 metadata,
461 registered_at: SystemTime::now(),
462 last_heartbeat: SystemTime::now(),
463 status: DMSCServiceStatus::Starting,
464 };
465
466 self.registry.register_service(instance).await?;
467 Ok(instance_id)
468 }
469
470 pub async fn deregister_service(&self, instance_id: &str) -> DMSCResult<()> {
471 if !self.enabled {
472 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
473 }
474
475 self.registry.deregister_service(instance_id).await
476 }
477
478 pub async fn discover_service(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
479 if !self.enabled {
480 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
481 }
482
483 self.registry.get_healthy_instances(service_name).await
484 }
485
486 pub async fn update_heartbeat(&self, instance_id: &str) -> DMSCResult<()> {
487 if !self.enabled {
488 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
489 }
490
491 self.registry.update_heartbeat(instance_id).await
492 }
493
494 pub async fn set_service_status(&self, instance_id: &str, status: DMSCServiceStatus) -> DMSCResult<()> {
495 if !self.enabled {
496 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
497 }
498
499 self.registry.update_instance_status(instance_id, status).await
500 }
501
502 pub async fn get_service_instances(&self, service_name: &str) -> DMSCResult<Vec<DMSCServiceInstance>> {
503 if !self.enabled {
504 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
505 }
506
507 self.registry.get_service_instances(service_name).await
508 }
509
510 pub async fn get_all_services(&self) -> DMSCResult<Vec<String>> {
511 if !self.enabled {
512 return Err(DMSCError::ServiceMesh("Service discovery is disabled".to_string()));
513 }
514
515 self.registry.get_all_services().await
516 }
517
518 pub async fn start_background_tasks(&self) -> DMSCResult<()> {
519 if !self.enabled {
520 return Ok(());
521 }
522
523 let registry_clone = Arc::clone(&self.registry);
524
525 let cleanup_interval = self.cleanup_interval;
526
527 let cleanup_task = tokio::spawn(async move {
529 let mut interval = tokio::time::interval(cleanup_interval);
530 loop {
531 interval.tick().await;
532 if let Err(e) = registry_clone.cleanup_expired_instances(Duration::from_secs(300)).await {
533 log::warn!("Failed to cleanup expired instances: {e}");
534 }
535 }
536 });
537
538 let mut tasks = self.background_tasks.write().await;
539 tasks.push(cleanup_task);
540
541 #[cfg(feature = "etcd")]
543 if self._etcd_config.is_some() {
544 let watcher_task = self.registry.start_etcd_watcher().await?;
545 tasks.push(watcher_task);
546
547 let registry_clone = Arc::clone(&self.registry);
549 let sync_task = tokio::spawn(async move {
550 let mut interval = tokio::time::interval(Duration::from_secs(30));
551 loop {
552 interval.tick().await;
553 if let Err(e) = registry_clone.sync_from_etcd().await {
554 log::warn!("Failed to sync from etcd: {e}");
555 }
556 }
557 });
558 tasks.push(sync_task);
559 }
560
561 Ok(())
562 }
563
564 pub async fn stop_background_tasks(&self) -> DMSCResult<()> {
565 let mut tasks = self.background_tasks.write().await;
566 for task in tasks.drain(..) {
567 task.abort();
568 }
569 Ok(())
570 }
571
572 pub async fn health_check(&self) -> DMSCResult<bool> {
573 Ok(self.enabled)
574 }
575}