dmsc/gateway/circuit_breaker.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Circuit Breaker Module
21//!
22//! This module provides robust circuit breaker implementations for fault tolerance in distributed systems.
23//! Circuit breakers prevent cascading failures by temporarily stopping requests to failing services.
24//!
25//! ## Key Components
26//!
27//! - **DMSCCircuitBreakerState**: Enum representing the three states of a circuit breaker (Closed, Open, HalfOpen)
28//! - **DMSCCircuitBreakerConfig**: Configuration for circuit breaker behavior
29//! - **DMSCCircuitBreaker**: Basic circuit breaker implementation
30//! - **DMSCAdvancedCircuitBreaker**: Advanced circuit breaker with error-type specific thresholds
31//! - **DMSCCircuitBreakerMetrics**: Metrics for monitoring circuit breaker performance
32//!
33//! ## Design Principles
34//!
35//! 1. **Fault Isolation**: Prevent cascading failures by stopping requests to failing services
36//! 2. **Automatic Recovery**: Automatically test and recover when services become healthy again
37//! 3. **Configurable Behavior**: Allow fine-tuning of failure thresholds, timeouts, and recovery parameters
38//! 4. **Metrics Collection**: Track and report circuit breaker performance for monitoring
39//! 5. **Thread Safety**: Ensure safe operation in multi-threaded environments
40//! 6. **Error Type Specificity**: Advanced implementation supports different thresholds for different error types
41//! 7. **Async Compatibility**: Designed for use with async/await patterns
42//!
43//! ## Usage
44//!
45//! ```rust
46//! use dmsc::prelude::*;
47//!
48//! async fn example() -> DMSCResult<()> {
49//! // Create a circuit breaker with default configuration
50//! let cb_config = DMSCCircuitBreakerConfig::default();
51//! let cb = DMSCCircuitBreaker::new(cb_config);
52//!
53//! // Execute a risky operation with circuit breaker protection
54//! let result = cb.execute(async || {
55//! // This could be a network request, database operation, etc.
56//! Ok("Success!")
57//! }).await;
58//!
59//! // Get circuit breaker state and metrics
60//! let state = cb.get_state().await;
61//! let metrics = cb.get_stats();
62//!
63//! println!("Circuit breaker state: {:?}", state);
64//! println!("Circuit breaker metrics: {:?}", metrics);
65//!
66//! Ok(())
67//! }
68//! ```
69
70use crate::core::DMSCResult;
71use std::collections::HashMap;
72use std::sync::Arc;
73use std::sync::atomic::{AtomicUsize, Ordering};
74use std::time::{Duration, Instant};
75use tokio::sync::RwLock;
76
77/// Represents the three states of a circuit breaker.
78///
79/// The circuit breaker transitions between these states based on the success and failure
80/// patterns of the protected operations.
81#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
82#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
83pub enum DMSCCircuitBreakerState {
84 /// **Closed State**: Normal operation. Requests are allowed to pass through.
85 /// The circuit breaker monitors for failures.
86 Closed,
87
88 /// **Open State**: The circuit breaker has detected too many failures. Requests
89 /// are rejected immediately to prevent cascading failures.
90 Open,
91
92 /// **HalfOpen State**: The circuit breaker is testing if the service has recovered.
93 /// A limited number of requests are allowed through to test the service's health.
94 HalfOpen,
95}
96
97/// Configuration for circuit breaker behavior.
98///
99/// This struct defines the thresholds and timeouts that control how the circuit breaker
100/// transitions between states.
101#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
103pub struct DMSCCircuitBreakerConfig {
104 /// Number of consecutive failures required to open the circuit breaker from Closed state.
105 pub failure_threshold: u32,
106
107 /// Number of consecutive successes required to close the circuit breaker from HalfOpen state.
108 pub success_threshold: u32,
109
110 /// Time in seconds to wait in Open state before transitioning to HalfOpen state.
111 pub timeout_seconds: u64,
112
113 /// Time window in seconds for counting failures. This defines how long failures are remembered.
114 pub monitoring_period_seconds: u64,
115}
116
117#[cfg(feature = "pyo3")]
118#[pyo3::prelude::pymethods]
119impl DMSCCircuitBreakerConfig {
120 #[new]
121 fn py_new() -> Self {
122 Self::default()
123 }
124
125 #[staticmethod]
126 fn py_new_with_values(failure_threshold: u32, success_threshold: u32, timeout_seconds: u64, monitoring_period_seconds: u64) -> Self {
127 Self {
128 failure_threshold,
129 success_threshold,
130 timeout_seconds,
131 monitoring_period_seconds,
132 }
133 }
134
135 fn get_failure_threshold(&self) -> u32 {
136 self.failure_threshold
137 }
138
139 fn set_failure_threshold(&mut self, value: u32) {
140 self.failure_threshold = value;
141 }
142
143 fn get_success_threshold(&self) -> u32 {
144 self.success_threshold
145 }
146
147 fn set_success_threshold(&mut self, value: u32) {
148 self.success_threshold = value;
149 }
150
151 fn get_timeout_seconds(&self) -> u64 {
152 self.timeout_seconds
153 }
154
155 fn set_timeout_seconds(&mut self, value: u64) {
156 self.timeout_seconds = value;
157 }
158
159 fn get_monitoring_period_seconds(&self) -> u64 {
160 self.monitoring_period_seconds
161 }
162
163 fn set_monitoring_period_seconds(&mut self, value: u64) {
164 self.monitoring_period_seconds = value;
165 }
166}
167
168impl Default for DMSCCircuitBreakerConfig {
169 /// Creates a default configuration for the circuit breaker.
170 ///
171 /// Default values:
172 /// - failure_threshold: 5 consecutive failures to open
173 /// - success_threshold: 3 consecutive successes to close
174 /// - timeout_seconds: 60 seconds before trying recovery
175 /// - monitoring_period_seconds: 30 seconds failure window
176 fn default() -> Self {
177 Self {
178 failure_threshold: 5,
179 success_threshold: 3,
180 timeout_seconds: 60,
181 monitoring_period_seconds: 30,
182 }
183 }
184}
185
186/// Internal circuit breaker statistics and state management.
187///
188/// This struct tracks all the metrics and state transitions for a circuit breaker instance.
189/// It is designed to be thread-safe for use in multi-threaded environments.
190#[derive(Debug)]
191struct CircuitBreakerStats {
192 /// Current state of the circuit breaker (Closed, Open, HalfOpen)
193 state: RwLock<DMSCCircuitBreakerState>,
194
195 /// Total count of failures since the circuit breaker was created
196 failure_count: AtomicUsize,
197
198 /// Total count of successes since the circuit breaker was created
199 success_count: AtomicUsize,
200
201 /// Timestamp of the last failure, if any
202 last_failure_time: RwLock<Option<Instant>>,
203
204 /// Timestamp of the last state change
205 last_state_change: RwLock<Instant>,
206
207 /// Number of consecutive failures in the current sequence
208 consecutive_failures: AtomicUsize,
209
210 /// Number of consecutive successes in the current sequence
211 consecutive_successes: AtomicUsize,
212}
213
214impl CircuitBreakerStats {
215 /// Creates a new circuit breaker statistics instance with default values.
216 ///
217 /// Initial state is Closed, with all counters set to zero.
218 #[allow(dead_code)]
219 fn new() -> Self {
220 Self {
221 state: RwLock::new(DMSCCircuitBreakerState::Closed),
222 failure_count: AtomicUsize::new(0),
223 success_count: AtomicUsize::new(0),
224 last_failure_time: RwLock::new(None),
225 last_state_change: RwLock::new(Instant::now()),
226 consecutive_failures: AtomicUsize::new(0),
227 consecutive_successes: AtomicUsize::new(0),
228 }
229 }
230
231 /// Records a successful operation and updates the circuit breaker state if necessary.
232 ///
233 /// - Increments total success count
234 /// - Resets consecutive failure count
235 /// - Increments consecutive success count
236 /// - Transitions from HalfOpen to Closed if success threshold is met
237 async fn record_success(&self, config: &DMSCCircuitBreakerConfig) {
238 self.success_count.fetch_add(1, Ordering::Relaxed);
239 self.consecutive_failures.store(0, Ordering::Relaxed);
240 self.consecutive_successes.fetch_add(1, Ordering::Relaxed);
241
242 let state = self.state.read().await;
243 match *state {
244 DMSCCircuitBreakerState::HalfOpen => {
245 let successes = self.consecutive_successes.load(Ordering::Relaxed);
246 if successes >= config.success_threshold as usize {
247 drop(state);
248 let mut state_write = self.state.write().await;
249 *state_write = DMSCCircuitBreakerState::Closed;
250 self.consecutive_successes.store(0, Ordering::Relaxed);
251 self.failure_count.store(0, Ordering::Relaxed);
252 *self.last_state_change.write().await = Instant::now();
253 }
254 }
255 DMSCCircuitBreakerState::Open => {
256 }
257 DMSCCircuitBreakerState::Closed => {
258 self.consecutive_failures.store(0, Ordering::Relaxed);
259 }
260 }
261 }
262
263 /// Records a failed operation and updates the circuit breaker state if necessary.
264 ///
265 /// - Increments total failure count
266 /// - Resets consecutive success count
267 /// - Increments consecutive failure count
268 /// - Updates last failure time
269 /// - Transitions from Closed to Open if failure threshold is met
270 /// - Transitions from HalfOpen to Open on any failure
271 async fn record_failure(&self, config: &DMSCCircuitBreakerConfig) {
272 self.failure_count.fetch_add(1, Ordering::Relaxed);
273 self.consecutive_successes.store(0, Ordering::Relaxed);
274 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
275 *self.last_failure_time.write().await = Some(Instant::now());
276
277 let state = self.state.read().await;
278 match *state {
279 DMSCCircuitBreakerState::Closed => {
280 let failures = self.consecutive_failures.load(Ordering::Relaxed);
281 if failures >= config.failure_threshold as usize {
282 drop(state);
283 let mut state_write = self.state.write().await;
284 *state_write = DMSCCircuitBreakerState::Open;
285 self.consecutive_failures.store(0, Ordering::Relaxed);
286 *self.last_state_change.write().await = Instant::now();
287 }
288 }
289 DMSCCircuitBreakerState::HalfOpen => {
290 // Any failure in HalfOpen state immediately opens the circuit
291 drop(state);
292 let mut state_write = self.state.write().await;
293 *state_write = DMSCCircuitBreakerState::Open;
294 *self.last_state_change.write().await = Instant::now();
295 }
296 DMSCCircuitBreakerState::Open => {
297 // Already open - no state change needed
298 }
299 }
300 }
301
302 /// Determines if the circuit breaker should attempt to reset from Open to HalfOpen state.
303 ///
304 /// Checks if the timeout period has elapsed since the last state change to Open.
305 ///
306 /// # Parameters
307 ///
308 /// - `config`: The circuit breaker configuration containing the timeout setting
309 ///
310 /// # Returns
311 ///
312 /// `true` if the timeout has elapsed and a reset should be attempted, `false` otherwise
313 async fn should_attempt_reset(&self, config: &DMSCCircuitBreakerConfig) -> bool {
314 let state = self.state.read().await;
315 if let DMSCCircuitBreakerState::Open = *state {
316 let last_change = *self.last_state_change.read().await;
317 Instant::now().duration_since(last_change) >= Duration::from_secs(config.timeout_seconds)
318 } else {
319 false
320 }
321 }
322
323 /// Transitions the circuit breaker from Open to HalfOpen state.
324 ///
325 /// This method is called when the timeout period has elapsed and the circuit breaker
326 /// should test if the service has recovered.
327 async fn transition_to_half_open(&self) {
328 let mut state = self.state.write().await;
329 *state = DMSCCircuitBreakerState::HalfOpen;
330 *self.last_state_change.write().await = Instant::now();
331 }
332
333 /// Gets the current state of the circuit breaker.
334 ///
335 /// # Returns
336 ///
337 /// The current `DMSCCircuitBreakerState` (Closed, Open, or HalfOpen)
338 async fn get_state(&self) -> DMSCCircuitBreakerState {
339 self.state.read().await.clone()
340 }
341
342 /// Gets the current metrics for the circuit breaker.
343 ///
344 /// This method provides comprehensive circuit breaker statistics including success/failure counts,
345 /// consecutive streaks, and current state information for monitoring and alerting purposes.
346 ///
347 /// # Returns
348 ///
349 /// A `DMSCCircuitBreakerMetrics` struct containing the current statistics
350 #[allow(dead_code)]
351 fn get_stats(&self) -> DMSCCircuitBreakerMetrics {
352 let state_str = match self.state.blocking_read().clone() {
353 DMSCCircuitBreakerState::Closed => "Closed",
354 DMSCCircuitBreakerState::Open => "Open",
355 DMSCCircuitBreakerState::HalfOpen => "HalfOpen",
356 };
357
358 DMSCCircuitBreakerMetrics {
359 state: state_str.to_string(),
360 failure_count: self.failure_count.load(Ordering::Relaxed),
361 success_count: self.success_count.load(Ordering::Relaxed),
362 consecutive_failures: self.consecutive_failures.load(Ordering::Relaxed),
363 consecutive_successes: self.consecutive_successes.load(Ordering::Relaxed),
364 }
365 }
366}
367
368/// Metrics for monitoring circuit breaker performance.
369///
370/// This struct contains statistics about the circuit breaker's performance, including
371/// success and failure counts, consecutive success/failure streaks, and current state.
372#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
373#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
374pub struct DMSCCircuitBreakerMetrics {
375 /// Current state of the circuit breaker as a string
376 pub state: String,
377
378 /// Total number of failures since the circuit breaker was created
379 pub failure_count: usize,
380
381 /// Total number of successes since the circuit breaker was created
382 pub success_count: usize,
383
384 /// Number of consecutive failures in the current sequence
385 pub consecutive_failures: usize,
386
387 /// Number of consecutive successes in the current sequence
388 pub consecutive_successes: usize,
389}
390
391#[cfg(feature = "pyo3")]
392#[pyo3::prelude::pymethods]
393impl DMSCCircuitBreakerMetrics {
394 #[new]
395 fn py_new(state: String, failure_count: usize, success_count: usize, consecutive_failures: usize, consecutive_successes: usize) -> Self {
396 Self {
397 state,
398 failure_count,
399 success_count,
400 consecutive_failures,
401 consecutive_successes,
402 }
403 }
404
405 fn get_state(&self) -> &str {
406 &self.state
407 }
408
409 fn get_failure_count(&self) -> usize {
410 self.failure_count
411 }
412
413 fn get_success_count(&self) -> usize {
414 self.success_count
415 }
416
417 fn get_consecutive_failures(&self) -> usize {
418 self.consecutive_failures
419 }
420
421 fn get_consecutive_successes(&self) -> usize {
422 self.consecutive_successes
423 }
424}
425
426/// Basic circuit breaker implementation.
427///
428/// This struct provides a thread-safe circuit breaker that protects against cascading failures
429/// by monitoring the success and failure patterns of operations and transitioning between states
430/// (Closed, Open, HalfOpen) based on configurable thresholds.
431#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
432pub struct DMSCCircuitBreaker {
433 /// Configuration for the circuit breaker behavior
434 config: DMSCCircuitBreakerConfig,
435
436 /// Internal statistics and state management
437 stats: Arc<CircuitBreakerStats>,
438}
439
440impl DMSCCircuitBreaker {
441 /// Creates a new circuit breaker with the specified configuration.
442 ///
443 /// # Parameters
444 ///
445 /// - `config`: The configuration for the circuit breaker behavior
446 ///
447 /// # Returns
448 ///
449 /// A new `DMSCCircuitBreaker` instance
450 pub fn new(config: DMSCCircuitBreakerConfig) -> Self {
451 Self {
452 config,
453 stats: Arc::new(CircuitBreakerStats::new()),
454 }
455 }
456
457 /// Determines if a request should be allowed to proceed based on the current circuit breaker state.
458 ///
459 /// - **Closed**: Always allows requests
460 /// - **Open**: Rejects requests unless timeout has elapsed, then transitions to HalfOpen
461 /// - **HalfOpen**: Allows limited requests to test service health
462 ///
463 /// # Returns
464 ///
465 /// `true` if the request should be allowed, `false` otherwise
466 pub fn allow_request(&self) -> bool {
467 let state = futures::executor::block_on(async {
468 let state = self.stats.state.read().await;
469 state.clone()
470 });
471
472 match state {
473 DMSCCircuitBreakerState::Closed => true,
474 DMSCCircuitBreakerState::Open => {
475 let last_change = futures::executor::block_on(async {
476 let guard = self.stats.last_state_change.read().await;
477 *guard
478 });
479 if last_change.elapsed() >= Duration::from_secs(self.config.timeout_seconds) {
480 futures::executor::block_on(async {
481 let mut state = self.stats.state.write().await;
482 *state = DMSCCircuitBreakerState::HalfOpen;
483 *self.stats.last_state_change.write().await = Instant::now();
484 });
485 true
486 } else {
487 false
488 }
489 }
490 DMSCCircuitBreakerState::HalfOpen => true,
491 }
492 }
493
494 /// Records a successful operation and updates the circuit breaker state if necessary.
495 pub fn record_success(&self) {
496 futures::executor::block_on(async {
497 self.stats.record_success(&self.config).await;
498 });
499 }
500
501 /// Records a failed operation and updates the circuit breaker state if necessary.
502 pub fn record_failure(&self) {
503 futures::executor::block_on(async {
504 self.stats.record_failure(&self.config).await;
505 });
506 }
507
508 /// Executes an operation with circuit breaker protection.
509 ///
510 /// This method wraps an async operation and automatically handles success/failure recording
511 /// and state transitions based on the operation's result.
512 ///
513 /// # Type Parameters
514 ///
515 /// - `F`: The async future type representing the operation
516 /// - `R`: The result type of the operation
517 ///
518 /// # Parameters
519 ///
520 /// - `operation`: The async operation to execute with circuit breaker protection
521 ///
522 /// # Returns
523 ///
524 /// The result of the operation, or an error if the circuit breaker is open
525 pub async fn execute<F, R>(&self, operation: F) -> DMSCResult<R>
526 where
527 F: std::future::Future<Output = DMSCResult<R>>,
528 {
529 if !self.allow_request() {
530 return Err(crate::core::DMSCError::ServiceMesh("Circuit breaker is open".to_string()));
531 }
532
533 match operation.await {
534 Ok(result) => {
535 self.record_success();
536 Ok(result)
537 }
538 Err(error) => {
539 self.record_failure();
540 Err(error)
541 }
542 }
543 }
544
545 /// Gets the current state of the circuit breaker.
546 ///
547 /// # Returns
548 ///
549 /// The current `DMSCCircuitBreakerState` (Closed, Open, or HalfOpen)
550 pub fn get_state(&self) -> DMSCCircuitBreakerState {
551 futures::executor::block_on(async {
552 self.stats.get_state().await
553 })
554 }
555
556 /// Gets the current metrics for the circuit breaker.
557 ///
558 /// # Returns
559 ///
560 /// A `DMSCCircuitBreakerMetrics` struct containing the current statistics
561 pub fn get_stats(&self) -> DMSCCircuitBreakerMetrics {
562 let state_str = match futures::executor::block_on(async {
563 let state = self.stats.state.read().await;
564 state.clone()
565 }) {
566 DMSCCircuitBreakerState::Closed => "Closed",
567 DMSCCircuitBreakerState::Open => "Open",
568 DMSCCircuitBreakerState::HalfOpen => "HalfOpen",
569 };
570
571 DMSCCircuitBreakerMetrics {
572 state: state_str.to_string(),
573 failure_count: self.stats.failure_count.load(Ordering::Relaxed),
574 success_count: self.stats.success_count.load(Ordering::Relaxed),
575 consecutive_failures: self.stats.consecutive_failures.load(Ordering::Relaxed),
576 consecutive_successes: self.stats.consecutive_successes.load(Ordering::Relaxed),
577 }
578 }
579
580 /// Gets the configuration for the circuit breaker.
581 ///
582 /// # Returns
583 ///
584 /// A reference to the `DMSCCircuitBreakerConfig` used by this circuit breaker
585 pub fn get_config(&self) -> DMSCCircuitBreakerConfig {
586 self.config.clone()
587 }
588
589 /// Resets the circuit breaker to its initial state (Closed).
590 ///
591 /// This method resets all counters and transitions the circuit breaker to Closed state.
592 pub fn reset(&self) {
593 futures::executor::block_on(async move {
594 let mut state = self.stats.state.write().await;
595 *state = DMSCCircuitBreakerState::Closed;
596 self.stats.failure_count.store(0, Ordering::Relaxed);
597 self.stats.success_count.store(0, Ordering::Relaxed);
598 self.stats.consecutive_failures.store(0, Ordering::Relaxed);
599 self.stats.consecutive_successes.store(0, Ordering::Relaxed);
600 *self.stats.last_state_change.write().await = Instant::now();
601 });
602 }
603
604 /// Forces the circuit breaker to transition to Open state.
605 ///
606 /// This method immediately opens the circuit breaker, rejecting all requests until the timeout elapses.
607 pub fn force_open(&self) {
608 futures::executor::block_on(async move {
609 let mut state = self.stats.state.write().await;
610 *state = DMSCCircuitBreakerState::Open;
611 *self.stats.last_state_change.write().await = Instant::now();
612 });
613 }
614
615 /// Forces the circuit breaker to transition to Closed state.
616 ///
617 /// This method immediately closes the circuit breaker, allowing all requests to proceed.
618 pub fn force_close(&self) {
619 futures::executor::block_on(async move {
620 let mut state = self.stats.state.write().await;
621 *state = DMSCCircuitBreakerState::Closed;
622 *self.stats.last_state_change.write().await = Instant::now();
623 });
624 }
625
626 pub fn failure_rate(&self) -> f64 {
627 let failures = self.stats.failure_count.load(Ordering::Relaxed);
628 let successes = self.stats.success_count.load(Ordering::Relaxed);
629 let total = failures + successes;
630 if total == 0 {
631 0.0
632 } else {
633 failures as f64 / total as f64
634 }
635 }
636
637 pub fn success_rate(&self) -> f64 {
638 let failures = self.stats.failure_count.load(Ordering::Relaxed);
639 let successes = self.stats.success_count.load(Ordering::Relaxed);
640 let total = failures + successes;
641 if total == 0 {
642 1.0
643 } else {
644 successes as f64 / total as f64
645 }
646 }
647
648 pub fn total_requests(&self) -> usize {
649 self.stats.failure_count.load(Ordering::Relaxed) + self.stats.success_count.load(Ordering::Relaxed)
650 }
651
652 pub fn is_open(&self) -> bool {
653 let state = futures::executor::block_on(self.stats.state.read());
654 matches!(*state, DMSCCircuitBreakerState::Open)
655 }
656
657 pub fn is_closed(&self) -> bool {
658 let state = futures::executor::block_on(self.stats.state.read());
659 matches!(*state, DMSCCircuitBreakerState::Closed)
660 }
661
662 pub fn is_half_open(&self) -> bool {
663 let state = futures::executor::block_on(self.stats.state.read());
664 matches!(*state, DMSCCircuitBreakerState::HalfOpen)
665 }
666}
667
668/// Advanced circuit breaker with separate failure thresholds for different error types.
669///
670/// This struct extends the basic circuit breaker functionality by maintaining separate statistics
671/// for different error types, allowing for more granular control over circuit breaker behavior.
672pub struct DMSCAdvancedCircuitBreaker {
673 /// Configuration for the circuit breaker behavior
674 config: DMSCCircuitBreakerConfig,
675
676 /// Error-type specific statistics and state management
677 stats_by_error: RwLock<HashMap<String, Arc<CircuitBreakerStats>>>,
678
679 /// Default statistics for unclassified errors
680 default_stats: Arc<CircuitBreakerStats>,
681}
682
683impl DMSCAdvancedCircuitBreaker {
684 /// Creates a new advanced circuit breaker with the specified configuration.
685 ///
686 /// # Parameters
687 ///
688 /// - `config`: The configuration for the circuit breaker behavior
689 ///
690 /// # Returns
691 ///
692 /// A new `DMSCAdvancedCircuitBreaker` instance
693 pub fn new(config: DMSCCircuitBreakerConfig) -> Self {
694 Self {
695 config,
696 stats_by_error: RwLock::new(HashMap::new()),
697 default_stats: Arc::new(CircuitBreakerStats::new()),
698 }
699 }
700
701 /// Gets the statistics instance for a specific error type, creating it if necessary.
702 ///
703 /// # Parameters
704 ///
705 /// - `error_type`: The error type identifier, or `None` for default statistics
706 ///
707 /// # Returns
708 ///
709 /// An `Arc<CircuitBreakerStats>` instance for the specified error type
710 async fn get_stats_for_error(&self, error_type: Option<&str>) -> Arc<CircuitBreakerStats> {
711 match error_type {
712 Some(error_type) => {
713 let mut stats_map = self.stats_by_error.write().await;
714 stats_map.entry(error_type.to_string())
715 .or_insert_with(|| Arc::new(CircuitBreakerStats::new()))
716 .clone()
717 }
718 None => self.default_stats.clone(),
719 }
720 }
721
722 /// Records a successful operation for a specific error type and updates the circuit breaker state if necessary.
723 ///
724 /// # Parameters
725 ///
726 /// - `error_type`: The error type identifier, or `None` for default statistics
727 pub async fn record_success_with_type(&self, error_type: Option<&str>) {
728 let stats = self.get_stats_for_error(error_type).await;
729 stats.record_success(&self.config).await;
730 }
731
732 /// Records a failed operation for a specific error type and updates the circuit breaker state if necessary.
733 ///
734 /// # Parameters
735 ///
736 /// - `error_type`: The error type identifier, or `None` for default statistics
737 pub async fn record_failure_with_type(&self, error_type: Option<&str>) {
738 let stats = self.get_stats_for_error(error_type).await;
739 stats.record_failure(&self.config).await;
740 }
741
742 /// Determines if a request should be allowed to proceed for a specific error type.
743 ///
744 /// # Parameters
745 ///
746 /// - `error_type`: The error type identifier, or `None` for default statistics
747 ///
748 /// # Returns
749 ///
750 /// `true` if the request should be allowed, `false` otherwise
751 pub async fn allow_request_for_type(&self, error_type: Option<&str>) -> bool {
752 let stats = self.get_stats_for_error(error_type).await;
753 let state = stats.get_state().await;
754
755 match state {
756 DMSCCircuitBreakerState::Closed => true,
757 DMSCCircuitBreakerState::Open => {
758 if stats.should_attempt_reset(&self.config).await {
759 stats.transition_to_half_open().await;
760 true
761 } else {
762 false
763 }
764 }
765 DMSCCircuitBreakerState::HalfOpen => true,
766 }
767 }
768}