1use serde::{Deserialize, Serialize};
107use std::collections::HashMap;
108use std::sync::Arc;
109use std::time::Duration;
110use tokio::sync::RwLock;
111use tokio::task::JoinHandle;
112#[cfg(feature = "http_client")]
113use reqwest;
114
115#[cfg(feature = "pyo3")]
116use pyo3::PyResult;
117
118use crate::core::{DMSCResult, DMSCError};
119use crate::observability::{DMSCTracer, DMSCSpanKind, DMSCSpanStatus};
120#[cfg(feature = "http_client")]
121use crate::observability::DMSCContextCarrier;
122
123#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DMSCTrafficRoute {
126 pub name: String,
127 pub source_service: String,
128 pub destination_service: String,
129 pub match_criteria: DMSCMatchCriteria,
130 pub route_action: DMSCRouteAction,
131 pub retry_policy: Option<DMSCRetryPolicy>,
132 pub timeout: Option<Duration>,
133 pub fault_injection: Option<DMSCFaultInjection>,
134}
135
136#[cfg(feature = "pyo3")]
137#[pyo3::prelude::pymethods]
138impl DMSCTrafficRoute {
139 #[new]
140 fn py_new(name: String, source_service: String, destination_service: String) -> Self {
141 Self {
142 name,
143 source_service,
144 destination_service,
145 match_criteria: DMSCMatchCriteria {
146 path_prefix: None,
147 headers: HashMap::new(),
148 method: None,
149 query_parameters: HashMap::new(),
150 },
151 route_action: DMSCRouteAction::Route(vec![]),
152 retry_policy: None,
153 timeout: None,
154 fault_injection: None,
155 }
156 }
157
158 fn get_name(&self) -> String {
159 self.name.clone()
160 }
161
162 fn get_source_service(&self) -> String {
163 self.source_service.clone()
164 }
165
166 fn get_destination_service(&self) -> String {
167 self.destination_service.clone()
168 }
169}
170
171#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct DMSCMatchCriteria {
174 pub path_prefix: Option<String>,
175 pub headers: HashMap<String, String>,
176 pub method: Option<String>,
177 pub query_parameters: HashMap<String, String>,
178}
179
180#[cfg(feature = "pyo3")]
181#[pyo3::prelude::pymethods]
182impl DMSCMatchCriteria {
183 #[new]
184 fn py_new() -> Self {
185 Self {
186 path_prefix: None,
187 headers: HashMap::new(),
188 method: None,
189 query_parameters: HashMap::new(),
190 }
191 }
192
193 fn get_path_prefix(&self) -> Option<String> {
194 self.path_prefix.clone()
195 }
196
197 fn get_method(&self) -> Option<String> {
198 self.method.clone()
199 }
200}
201
202#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum DMSCRouteAction {
205 Route(Vec<DMSCWeightedDestination>),
206 Redirect(String),
207 DirectResponse(u16, String),
208}
209
210#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct DMSCWeightedDestination {
213 pub service: String,
214 pub weight: u32,
215 pub subset: Option<String>,
216}
217
218#[cfg(feature = "pyo3")]
219#[pyo3::prelude::pymethods]
220impl DMSCWeightedDestination {
221 #[new]
222 fn py_new(service: String, weight: u32) -> Self {
223 Self {
224 service,
225 weight,
226 subset: None,
227 }
228 }
229
230 fn get_service(&self) -> String {
231 self.service.clone()
232 }
233
234 fn get_weight(&self) -> u32 {
235 self.weight
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DMSCRetryPolicy {
241 pub attempts: u32,
242 pub per_try_timeout: Duration,
243 pub retry_on: Vec<String>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct DMSCFaultInjection {
248 pub delay: Option<DMSCDelayFault>,
249 pub abort: Option<DMSCAbortFault>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct DMSCDelayFault {
254 pub percentage: f64,
255 pub fixed_delay: Duration,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct DMSCAbortFault {
260 pub percentage: f64,
261 pub http_status: u16,
262}
263
264#[derive(Debug, Clone)]
265pub struct DMSCTrafficSplit {
266 pub service: String,
267 pub subsets: HashMap<String, DMSCSubset>,
268 pub default_subset: String,
269}
270
271#[derive(Debug, Clone)]
272pub struct DMSCSubset {
273 pub name: String,
274 pub labels: HashMap<String, String>,
275 pub weight: u32,
276}
277
278#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
279pub struct DMSCTrafficManager {
280 enabled: bool,
281 routes: Arc<RwLock<HashMap<String, Vec<DMSCTrafficRoute>>>>,
282 traffic_splits: Arc<RwLock<HashMap<String, DMSCTrafficSplit>>>,
283 circuit_breakers: Arc<RwLock<HashMap<String, DMSCCircuitBreakerConfig>>>,
284 rate_limits: Arc<RwLock<HashMap<String, DMSCRateLimitConfig>>>,
285 background_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
286 #[cfg(feature = "http_client")]
287 http_client: reqwest::Client,
288 tracer: Option<Arc<DMSCTracer>>,
289}
290
291#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct DMSCCircuitBreakerConfig {
294 pub consecutive_errors: u32,
295 pub interval: Duration,
296 pub base_ejection_time: Duration,
297 pub max_ejection_percent: f64,
298}
299
300#[cfg(feature = "pyo3")]
301#[pyo3::prelude::pymethods]
302impl DMSCCircuitBreakerConfig {
303 #[new]
304 fn py_new(consecutive_errors: u32, max_ejection_percent: f64) -> Self {
305 Self {
306 consecutive_errors,
307 interval: Duration::from_secs(10),
308 base_ejection_time: Duration::from_secs(30),
309 max_ejection_percent,
310 }
311 }
312
313 fn get_consecutive_errors(&self) -> u32 {
314 self.consecutive_errors
315 }
316
317 fn get_max_ejection_percent(&self) -> f64 {
318 self.max_ejection_percent
319 }
320}
321
322#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct DMSCRateLimitConfig {
325 pub requests_per_second: u32,
326 pub burst_size: u32,
327 pub window: Duration,
328}
329
330impl DMSCTrafficManager {
331 pub fn new(enabled: bool) -> Self {
332 Self {
333 enabled,
334 routes: Arc::new(RwLock::new(HashMap::new())),
335 traffic_splits: Arc::new(RwLock::new(HashMap::new())),
336 circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
337 rate_limits: Arc::new(RwLock::new(HashMap::new())),
338 background_tasks: Arc::new(RwLock::new(Vec::new())),
339 #[cfg(feature = "http_client")]
340 http_client: reqwest::Client::builder()
341 .timeout(Duration::from_secs(30))
342 .connect_timeout(Duration::from_secs(10))
343 .build()
344 .unwrap_or_else(|_| reqwest::Client::new()),
345 tracer: None,
346 }
347 }
348
349 pub fn with_tracer(mut self, tracer: Arc<DMSCTracer>) -> Self {
350 self.tracer = Some(tracer);
351 self
352 }
353
354 pub fn set_tracer(&mut self, tracer: Arc<DMSCTracer>) {
355 self.tracer = Some(tracer);
356 }
357
358 pub async fn add_traffic_route(&self, route: DMSCTrafficRoute) -> DMSCResult<()> {
359 if !self.enabled {
360 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
361 }
362
363 let mut routes = self.routes.write().await;
364 routes.entry(route.source_service.clone())
365 .or_insert_with(Vec::new)
366 .push(route);
367
368 Ok(())
369 }
370
371 pub async fn remove_traffic_route(&self, source_service: &str, route_name: &str) -> DMSCResult<()> {
372 if !self.enabled {
373 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
374 }
375
376 let mut routes = self.routes.write().await;
377 if let Some(service_routes) = routes.get_mut(source_service) {
378 service_routes.retain(|r| r.name != route_name);
379
380 if service_routes.is_empty() {
381 routes.remove(source_service);
382 }
383 }
384
385 Ok(())
386 }
387
388 pub async fn get_traffic_routes(&self, source_service: &str) -> DMSCResult<Vec<DMSCTrafficRoute>> {
389 if !self.enabled {
390 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
391 }
392
393 let routes = self.routes.read().await;
394 let service_routes = routes.get(source_service)
395 .cloned()
396 .unwrap_or_default();
397
398 Ok(service_routes)
399 }
400
401 pub async fn create_traffic_split(&self, split: DMSCTrafficSplit) -> DMSCResult<()> {
402 if !self.enabled {
403 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
404 }
405
406 let mut traffic_splits = self.traffic_splits.write().await;
407 traffic_splits.insert(split.service.clone(), split);
408
409 Ok(())
410 }
411
412 pub async fn get_traffic_split(&self, service: &str) -> DMSCResult<Option<DMSCTrafficSplit>> {
413 if !self.enabled {
414 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
415 }
416
417 let traffic_splits = self.traffic_splits.read().await;
418 Ok(traffic_splits.get(service).cloned())
419 }
420
421 pub async fn route_request(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
422 let span_id = if let Some(tracer) = &self.tracer {
423 let span_id = tracer.start_span_from_context(
424 format!("route_request:{}", endpoint),
425 DMSCSpanKind::Client,
426 );
427 if let Some(ref sid) = span_id {
428 let _ = tracer.span_mut(sid, |span| {
429 span.set_attribute("endpoint".to_string(), endpoint.to_string());
430 span.set_attribute("request_size".to_string(), request_data.len().to_string());
431 });
432 }
433 span_id
434 } else {
435 None
436 };
437
438 let result = self.route_request_internal(endpoint, request_data).await;
439
440 if let (Some(tracer), Some(sid)) = (&self.tracer, span_id) {
441 let status = match &result {
442 Ok(_) => DMSCSpanStatus::Ok,
443 Err(e) => DMSCSpanStatus::Error(e.to_string()),
444 };
445 let _ = tracer.end_span(&sid, status);
446 }
447
448 result
449 }
450
451 async fn route_request_internal(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
452 if !self.enabled {
453 return Ok(request_data);
454 }
455
456 if let Some(fault_injection) = self.should_inject_fault() {
457 self.inject_fault(&fault_injection).await?;
458 }
459
460 if self.should_rate_limit(endpoint).await? {
461 return Err(DMSCError::ServiceMesh("Rate limit exceeded".to_string()));
462 }
463
464 let transformed_request = self.apply_traffic_policies(request_data).await;
465
466 if let Some(matching_route) = self.find_matching_route(endpoint).await {
467 return self.apply_route(&matching_route, endpoint, transformed_request).await;
468 }
469
470 self.make_http_request(endpoint, transformed_request).await
471 }
472
473 async fn find_matching_route(&self, endpoint: &str) -> Option<DMSCTrafficRoute> {
475 let routes = self.routes.read().await;
476
477 for (_source_service, service_routes) in &*routes {
479 for route in service_routes {
480 if self.is_route_match(route, endpoint) {
481 return Some(route.clone());
482 }
483 }
484 }
485
486 None
487 }
488
489 fn is_route_match(&self, _route: &DMSCTrafficRoute, _endpoint: &str) -> bool {
491 #[cfg(feature = "http_client")]
492 if let Ok(url) = _endpoint.parse::<reqwest::Url>() {
493 let host = url.host_str().unwrap_or("");
494 if _route.destination_service.contains(host) {
495 return true;
496 }
497 }
498 false
499 }
500
501 async fn apply_route(&self, route: &DMSCTrafficRoute, original_endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
503 match &route.route_action {
505 DMSCRouteAction::Route(destinations) => {
506 let selected_index = self.select_destination_index(destinations).await;
508 let mut selected_destination = destinations[selected_index].clone();
509
510 if let Some(split_destination) = self.apply_traffic_split(&selected_destination.service).await {
512 selected_destination.service = split_destination;
514 }
515
516 let new_endpoint = self.replace_endpoint(original_endpoint, &selected_destination).await;
518
519 if let Some(retry_policy) = &route.retry_policy {
521 self.retry_request(new_endpoint.as_str(), request_data, retry_policy).await
522 } else {
523 self.make_http_request(new_endpoint.as_str(), request_data).await
525 }
526 },
527 DMSCRouteAction::Redirect(redirect_uri) => {
528 Err(DMSCError::ServiceMesh(format!("Redirect to: {}", redirect_uri)))
530 },
531 DMSCRouteAction::DirectResponse(_status, body) => {
532 Ok(body.clone().into())
534 }
535 }
536 }
537
538 async fn select_destination_index(&self, destinations: &[DMSCWeightedDestination]) -> usize {
540 if destinations.len() == 1 {
541 return 0;
542 }
543
544 let total_weight: u32 = destinations.iter().map(|d| d.weight).sum();
546
547 use rand::Rng;
549 let mut rng = rand::thread_rng();
550 let mut current_weight = 0;
551 let random_weight = rng.gen_range(0..total_weight);
552
553 for (index, destination) in destinations.iter().enumerate() {
554 current_weight += destination.weight;
555 if random_weight < current_weight {
556 return index;
557 }
558 }
559
560 0
562 }
563
564 async fn replace_endpoint(&self, original_endpoint: &str, _destination: &DMSCWeightedDestination) -> String {
566 original_endpoint.to_string()
569 }
570
571 async fn retry_request(&self, endpoint: &str, request_data: Vec<u8>, retry_policy: &DMSCRetryPolicy) -> DMSCResult<Vec<u8>> {
573 let max_attempts = retry_policy.attempts;
574
575 for attempt in 1..=max_attempts {
576 let result = self.make_http_request(endpoint, request_data.clone()).await;
577
578 match result {
579 Ok(response) => return Ok(response),
580 Err(e) => {
581 if attempt < max_attempts && self.should_retry(&e, retry_policy) {
583 let delay = Duration::from_millis(100 * 2u64.pow(attempt - 1));
585 tokio::time::sleep(delay).await;
586 continue;
587 }
588 return Err(e);
589 }
590 }
591 }
592
593 Err(DMSCError::ServiceMesh("All retry attempts failed".to_string()))
594 }
595
596 fn should_retry(&self, _error: &DMSCError, retry_policy: &DMSCRetryPolicy) -> bool {
598 retry_policy.retry_on.iter().any(|s| s == "5xx" || s == "all")
601 }
602
603 #[cfg(feature = "http_client")]
604 async fn make_http_request(&self, endpoint: &str, request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
605 let url = endpoint.parse::<reqwest::Url>()
606 .map_err(|e| DMSCError::ServiceMesh(format!("Invalid endpoint URL: {e}")))?;
607
608 let mut request_builder = self.http_client
609 .post(url)
610 .header("Content-Type", "application/octet-stream");
611
612 if let Some(_tracer) = &self.tracer {
613 let mut headers = HashMap::new();
614 DMSCContextCarrier::inject_current_into_headers(&mut headers);
615 for (key, value) in headers {
616 request_builder = request_builder.header(key, value);
617 }
618 }
619
620 let response = request_builder
621 .body(request_data)
622 .send()
623 .await
624 .map_err(|e| DMSCError::ServiceMesh(format!("HTTP request failed: {e}")))?;
625
626 if !response.status().is_success() {
627 return Err(DMSCError::ServiceMesh(format!(
628 "HTTP request failed with status: {}",
629 response.status()
630 )));
631 }
632
633 let response_data = response
634 .bytes()
635 .await
636 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to read response body: {e}")))?
637 .to_vec();
638
639 Ok(response_data)
640 }
641
642 #[cfg(not(feature = "http_client"))]
643 async fn make_http_request(&self, _endpoint: &str, _request_data: Vec<u8>) -> DMSCResult<Vec<u8>> {
644 Err(DMSCError::ServiceMesh(format!("HTTP client is not enabled. Enable the 'http_client' feature to use HTTP requests.")))
645 }
646
647 async fn apply_traffic_policies(&self, request_data: Vec<u8>) -> Vec<u8> {
648 request_data
649 }
650
651 async fn apply_traffic_split(&self, service: &str) -> Option<String> {
654 let traffic_splits = self.traffic_splits.read().await;
655
656 if let Some(traffic_split) = traffic_splits.get(service) {
657 let total_weight: u32 = traffic_split.subsets.values()
659 .map(|subset| subset.weight)
660 .sum();
661
662 if total_weight == 0 {
663 Some(traffic_split.default_subset.clone())
665 } else {
666 use rand::Rng;
668 let mut rng = rand::thread_rng();
669 let random_weight = rng.gen_range(0..total_weight);
670
671 let mut current_weight = 0;
672 for subset in traffic_split.subsets.values() {
673 current_weight += subset.weight;
674 if random_weight < current_weight {
675 return Some(subset.name.clone());
676 }
677 }
678
679 Some(traffic_split.default_subset.clone())
681 }
682 } else {
683 None
684 }
685 }
686
687 fn should_inject_fault(&self) -> Option<DMSCFaultInjection> {
688 use rand::Rng;
689 let mut rng = rand::thread_rng();
690
691 if rng.gen_bool(0.01) {
692 Some(DMSCFaultInjection {
693 delay: Some(DMSCDelayFault {
694 percentage: 0.5,
695 fixed_delay: Duration::from_millis(100),
696 }),
697 abort: None,
698 })
699 } else {
700 None
701 }
702 }
703
704 async fn inject_fault(&self, fault: &DMSCFaultInjection) -> DMSCResult<()> {
705 if let Some(delay) = &fault.delay {
706 use rand::Rng;
707 let mut rng = rand::thread_rng();
708
709 if rng.gen_bool(delay.percentage) {
710 tokio::time::sleep(delay.fixed_delay).await;
711 }
712 }
713
714 if let Some(abort) = &fault.abort {
715 use rand::Rng;
716 let mut rng = rand::thread_rng();
717
718 if rng.gen_bool(abort.percentage) {
719 return Err(DMSCError::ServiceMesh(format!("Fault injection: HTTP {}", abort.http_status)));
720 }
721 }
722
723 Ok(())
724 }
725
726 async fn should_rate_limit(&self, endpoint: &str) -> DMSCResult<bool> {
728 let rate_limits = self.rate_limits.read().await;
729
730 if let Some(config) = rate_limits.get(endpoint) {
732 use std::sync::atomic::{AtomicU64, Ordering};
734 use std::collections::HashMap;
735 use std::sync::Arc;
736
737 static RATE_LIMITERS: std::sync::Mutex<Option<HashMap<String, Arc<RateLimiter>>>> =
739 std::sync::Mutex::new(None);
740
741 struct RateLimiter {
743 capacity: u32,
744 rate: f64, tokens: AtomicU64, last_update: AtomicU64, }
748
749 impl RateLimiter {
750 fn new(config: &DMSCRateLimitConfig) -> Self {
751 let rate = config.requests_per_second as f64;
752 Self {
753 capacity: config.burst_size,
754 rate,
755 tokens: AtomicU64::new(config.burst_size as u64),
756 last_update: AtomicU64::new(
757 std::time::SystemTime::now()
758 .duration_since(std::time::UNIX_EPOCH)
759 .unwrap_or(std::time::Duration::from_secs(0))
760 .as_millis() as u64
761 ),
762 }
763 }
764
765 fn try_acquire(&self) -> bool {
766 let now = std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap_or(std::time::Duration::from_secs(0))
769 .as_millis() as u64;
770 let last = self.last_update.load(Ordering::Acquire);
771 let elapsed = now - last;
772
773 let tokens_to_add = (elapsed as f64 / 1000.0) * self.rate;
775 let tokens_to_add = tokens_to_add as u64;
776
777 let current = self.tokens.load(Ordering::Acquire);
778 let new_tokens = std::cmp::min(current.saturating_add(tokens_to_add), self.capacity as u64);
779
780 if new_tokens > 0 {
782 if self.tokens.compare_exchange(current, new_tokens - 1,
783 Ordering::AcqRel, Ordering::Acquire).is_ok() {
784 self.last_update.store(now, Ordering::Release);
786 return true;
787 }
788 }
789
790 false
791 }
792 }
793
794 let mut limiters = RATE_LIMITERS.lock()
795 .map_err(|e| DMSCError::ServiceMesh(format!("Failed to acquire rate limiter lock: {}", e)))?;
796 if limiters.is_none() {
797 *limiters = Some(HashMap::new());
798 }
799
800 let limiters = limiters.as_mut()
801 .ok_or_else(|| DMSCError::InvalidState("Rate limiters not initialized".to_string()))?;
802
803 let limiter = limiters.entry(endpoint.to_string())
805 .or_insert_with(|| Arc::new(RateLimiter::new(config)));
806
807 Ok(!limiter.try_acquire())
809 } else {
810 Ok(false) }
812 }
813
814 pub async fn set_circuit_breaker_config(&self, service: &str, config: DMSCCircuitBreakerConfig) -> DMSCResult<()> {
815 if !self.enabled {
816 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
817 }
818
819 let mut circuit_breakers = self.circuit_breakers.write().await;
820 circuit_breakers.insert(service.to_string(), config);
821
822 Ok(())
823 }
824
825 pub async fn set_rate_limit_config(&self, service: &str, config: DMSCRateLimitConfig) -> DMSCResult<()> {
826 if !self.enabled {
827 return Err(DMSCError::ServiceMesh("Traffic management is disabled".to_string()));
828 }
829
830 let mut rate_limits = self.rate_limits.write().await;
831 rate_limits.insert(service.to_string(), config);
832
833 Ok(())
834 }
835
836 pub async fn get_circuit_breaker_config(&self, service: &str) -> DMSCResult<Option<DMSCCircuitBreakerConfig>> {
837 let circuit_breakers = self.circuit_breakers.read().await;
838 Ok(circuit_breakers.get(service).cloned())
839 }
840
841 pub async fn get_rate_limit_config(&self, service: &str) -> DMSCResult<Option<DMSCRateLimitConfig>> {
842 let rate_limits = self.rate_limits.read().await;
843 Ok(rate_limits.get(service).cloned())
844 }
845
846 pub async fn start_background_tasks(&self) -> DMSCResult<()> {
847 if !self.enabled {
848 return Ok(());
849 }
850
851 Ok(())
852 }
853
854 pub async fn stop_background_tasks(&self) -> DMSCResult<()> {
855 let mut tasks = self.background_tasks.write().await;
856 for task in tasks.drain(..) {
857 task.abort();
858 }
859 Ok(())
860 }
861
862 pub async fn health_check(&self) -> DMSCResult<bool> {
863 Ok(self.enabled)
864 }
865}
866
867#[cfg(feature = "pyo3")]
868#[pyo3::prelude::pymethods]
870impl DMSCTrafficManager {
871 #[new]
872 fn py_new(enabled: bool) -> PyResult<Self> {
873 Ok(Self::new(enabled))
874 }
875
876 #[pyo3(name = "add_traffic_route")]
878 fn add_traffic_route_impl(&self, route: DMSCTrafficRoute) -> PyResult<()> {
879 let rt = tokio::runtime::Runtime::new().map_err(|e| {
880 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
881 })?;
882
883 rt.block_on(async {
884 self.add_traffic_route(route)
885 .await
886 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to add traffic route: {e}")))
887 })
888 }
889
890 #[pyo3(name = "get_traffic_routes")]
892 fn get_traffic_routes_impl(&self, service_name: String) -> PyResult<Vec<DMSCTrafficRoute>> {
893 let rt = tokio::runtime::Runtime::new().map_err(|e| {
894 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
895 })?;
896
897 rt.block_on(async {
898 self.get_traffic_routes(&service_name)
899 .await
900 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to get traffic routes: {e}")))
901 })
902 }
903
904 #[pyo3(name = "remove_traffic_route")]
906 fn remove_traffic_route_impl(&self, source_service: String, route_name: String) -> PyResult<()> {
907 let rt = tokio::runtime::Runtime::new().map_err(|e| {
908 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
909 })?;
910
911 rt.block_on(async {
912 self.remove_traffic_route(&source_service, &route_name)
913 .await
914 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to remove traffic route: {e}")))
915 })
916 }
917
918 #[pyo3(name = "set_circuit_breaker_config")]
920 fn set_circuit_breaker_config_impl(&self, service: String, config: DMSCCircuitBreakerConfig) -> PyResult<()> {
921 let rt = tokio::runtime::Runtime::new().map_err(|e| {
922 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
923 })?;
924
925 rt.block_on(async {
926 self.set_circuit_breaker_config(&service, config)
927 .await
928 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to set circuit breaker config: {e}")))
929 })
930 }
931
932 #[pyo3(name = "set_rate_limit_config")]
934 fn set_rate_limit_config_impl(&self, service: String, config: DMSCRateLimitConfig) -> PyResult<()> {
935 let rt = tokio::runtime::Runtime::new().map_err(|e| {
936 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
937 })?;
938
939 rt.block_on(async {
940 self.set_rate_limit_config(&service, config)
941 .await
942 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to set rate limit config: {e}")))
943 })
944 }
945}