dmsc/cache/
manager.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::collections::HashMap;
23use tokio::sync::{RwLock, broadcast};
24use crate::cache::core::{DMSCCache, DMSCCacheStats};
25
26#[cfg(feature = "pyo3")]
27use pyo3::prelude::*;
28
29
30/// # DMSC Cache Manager
31/// 
32/// This file implements a cache manager that coordinates different cache backends with 
33/// consistency support across multiple instances. It provides a unified interface for cache 
34/// operations while ensuring cache consistency through event-driven architecture.
35/// 
36/// ## Design Principles
37/// 1. **Consistency First**: Ensures cache consistency across multiple instances using broadcast events
38/// 2. **Unified Interface**: Provides a consistent API regardless of the underlying cache backend
39/// 3. **Event-Driven Architecture**: Uses broadcast channels for efficient cache invalidation
40/// 4. **Thread Safety**: Implements thread-safe operations using Arc and RwLock
41/// 5. **Flexibility**: Supports any backend implementing the DMSCCache trait
42/// 6. **Scalability**: Designed to handle high-throughput cache operations
43/// 
44/// ## Usage Examples
45/// ```rust
46/// // Create a cache manager with a Redis backend
47/// let redis_backend = Arc::new(DMSCRedisBackend::new(config).await?);
48/// let mut cache_manager = DMSCCacheManager::new(redis_backend);
49/// 
50/// // Start the consistency listener
51/// let listener_handle = cache_manager.start_consistency_listener().await;
52/// 
53/// // Set a value in cache
54/// cache_manager.set("user:123", &User { id: 123, name: "John" }, Some(3600)).await?;
55/// 
56/// // Get a value from cache
57/// let user: Option<User> = cache_manager.get("user:123").await?;
58/// 
59/// // Delete a value and invalidate across all instances
60/// cache_manager.delete("user:123").await?;
61/// 
62/// // Clear cache and broadcast to all instances
63/// cache_manager.clear().await?;
64/// ```
65/// Cache event type for maintaining cache consistency across instances
66/// 
67/// This enum defines the events that are broadcasted to ensure all cache instances
68/// remain consistent. Each event triggers a corresponding action on all cache instances.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
71pub enum DMSCCacheEvent {
72    /// Invalidate a specific cache key
73    /// 
74    /// **Parameters:**
75    /// - `key`: The cache key to invalidate
76    Invalidate { key: String },
77    
78    /// Invalidate all cache keys matching a pattern
79    /// 
80    /// **Parameters:**
81    /// - `pattern`: The pattern to match cache keys (supports wildcards depending on backend)
82    InvalidatePattern { pattern: String },
83    
84    /// Clear all cache entries
85    Clear(),
86}
87
88/// Cache manager that coordinates different cache backends with consistency support
89/// 
90/// This struct provides a unified interface for cache operations while ensuring cache
91/// consistency across multiple instances through event-driven architecture. It wraps
92/// any backend implementing the DMSCCache trait and adds consistency guarantees.
93#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
94pub struct DMSCCacheManager {
95    /// The underlying cache backend implementation
96    backend: Arc<dyn DMSCCache + Send + Sync>,
97    
98    /// Broadcast sender for cache consistency events
99    event_sender: broadcast::Sender<DMSCCacheEvent>,
100    
101    /// Broadcast receiver for cache consistency events (used internally)
102    event_receiver: Option<broadcast::Receiver<DMSCCacheEvent>>,
103    
104    /// Map of subscribers to cache events (for internal use)
105    _subscribers: Arc<RwLock<HashMap<String, broadcast::Receiver<DMSCCacheEvent>>>>,
106}
107
108impl DMSCCacheManager {
109    /// Create a new cache manager with the specified backend
110    /// 
111    /// **Parameters:**
112    /// - `backend`: The underlying cache backend implementation
113    /// 
114    /// **Returns:**
115    /// - A new instance of `DMSCCacheManager`
116    pub fn new(backend: Arc<dyn DMSCCache + Send + Sync>) -> Self {
117        let (sender, receiver) = broadcast::channel(100);
118        
119        Self {
120            backend,
121            event_sender: sender,
122            event_receiver: Some(receiver),
123            _subscribers: Arc::new(RwLock::new(HashMap::new())),
124        }
125    }
126    
127    /// Start the cache consistency event listener
128    /// 
129    /// This method starts a background task that listens for cache consistency events
130    /// and applies them to the underlying cache backend. This ensures that all cache
131    /// instances remain consistent across the system.
132    /// 
133    /// **Returns:**
134    /// - A `JoinHandle` for the background task
135    pub async fn start_consistency_listener(&mut self) -> tokio::task::JoinHandle<()> {
136        let backend = self.backend.clone();
137        let mut receiver = match self.event_receiver.take() {
138            Some(r) => r,
139            None => {
140                log::error!("[DMSC.Cache] Event receiver already started or not initialized");
141                return tokio::spawn(async {});
142            }
143        };
144        
145        log::info!("[DMSC.Cache] Starting cache consistency event listener");
146        
147        tokio::spawn(async move {
148            let mut event_count = 0;
149            while let Ok(event) = receiver.recv().await {
150                event_count += 1;
151                
152                match event {
153                    DMSCCacheEvent::Invalidate { key } => {
154                        log::info!("[DMSC.Cache] Processing invalidate event for key: {key}");
155                        if let Err(e) = backend.delete(&key).await {
156                            log::error!("[DMSC.Cache] Failed to invalidate cache key {key}: {e}");
157                        } else {
158                            log::info!("[DMSC.Cache] Successfully invalidated cache key: {key}");
159                        }
160                    },
161                    DMSCCacheEvent::InvalidatePattern { pattern } => {
162                        log::info!("[DMSC.Cache] Processing invalidate pattern event: {pattern}");
163                        match backend.delete_by_pattern(&pattern).await {
164                            Ok(count) => {
165                                log::info!("[DMSC.Cache] Successfully invalidated {} cache keys matching pattern: {pattern}", count);
166                            }
167                            Err(e) => {
168                                log::error!("[DMSC.Cache] Failed to invalidate cache pattern {pattern}: {e}");
169                            }
170                        }
171                    },
172                    DMSCCacheEvent::Clear() => {
173                        log::info!("[DMSC.Cache] Processing clear cache event");
174                        if let Err(e) = backend.clear().await {
175                            log::error!("[DMSC.Cache] Failed to clear cache: {e}");
176                        } else {
177                            log::info!("[DMSC.Cache] Successfully cleared cache");
178                        }
179                    },
180                }
181                
182                // Log event processing statistics periodically
183                if event_count % 100 == 0 {
184                    log::info!("[DMSC.Cache] Processed {event_count} cache consistency events");
185                }
186            }
187            
188            log::info!("[DMSC.Cache] Cache consistency event listener stopped after processing {event_count} events");
189        })
190    }
191    
192    /// Subscribe to cache consistency events
193    /// 
194    /// This method allows external components to subscribe to cache consistency events,
195    /// enabling them to react to cache changes in real-time.
196    /// 
197    /// **Returns:**
198    /// - A broadcast receiver for cache events
199    pub fn subscribe(&self) -> broadcast::Receiver<DMSCCacheEvent> {
200        self.event_sender.subscribe()
201    }
202    
203    /// Publish a cache consistency event
204    /// 
205    /// This method publishes a cache event to all subscribers, ensuring cache consistency
206    /// across all instances.
207    /// 
208    /// **Parameters:**
209    /// - `event`: The cache event to publish
210    pub fn publish_event(&self, event: DMSCCacheEvent) {
211        let event_type = match &event {
212            DMSCCacheEvent::Invalidate { key } => format!("Invalidate(key: {key})"),
213            DMSCCacheEvent::InvalidatePattern { pattern } => format!("InvalidatePattern(pattern: {pattern})"),
214            DMSCCacheEvent::Clear() => "Clear".to_string(),
215        };
216        
217        log::info!("[DMSC.Cache] Publishing cache event: {event_type}");
218        let _ = self.event_sender.send(event);
219    }
220    
221    /// Get a value from cache
222    /// 
223    /// This method retrieves a value from the cache using the specified key. If the key
224    /// exists and the value is valid, it is deserialized and returned. Otherwise, `None`
225    /// is returned.
226    /// 
227    /// **Parameters:**
228    /// - `key`: The cache key to retrieve
229    /// 
230    /// **Returns:**
231    /// - `Ok(Some(T))` if the key exists and the value is valid
232    /// - `Ok(None)` if the key does not exist
233    /// - `Err(DMSCError)` if an error occurs during retrieval or deserialization
234    pub async fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> crate::core::DMSCResult<Option<T>> {
235        log::debug!("[DMSC.Cache] Getting cache key: {key}");
236        
237        match self.backend.get(key).await? {
238            Some(cached_value) => {
239                log::debug!("[DMSC.Cache] Cache hit for key: {key}");
240                let value = serde_json::from_str(&cached_value)
241                    .map_err(|e| crate::core::DMSCError::Other(format!("Deserialization error: {e}")))?;
242                Ok(Some(value))
243            }
244            None => {
245                log::debug!("[DMSC.Cache] Cache miss for key: {key}");
246                Ok(None)
247            }
248        }
249    }
250    
251    /// Set a value in cache with optional TTL
252    /// 
253    /// This method stores a value in the cache with the specified key and optional TTL.
254    /// It also publishes an invalidate event to ensure cache consistency across all instances.
255    /// 
256    /// **Parameters:**
257    /// - `key`: The cache key to set
258    /// - `value`: The value to store (must implement Serialize)
259    /// - `ttl_seconds`: Optional time-to-live in seconds
260    /// 
261    /// **Returns:**
262    /// - `Ok(())` if the value was successfully stored
263    /// - `Err(DMSCError)` if an error occurs during serialization or storage
264    pub async fn set<T: serde::Serialize>(&self, key: &str, value: &T, ttl_seconds: Option<u64>) -> crate::core::DMSCResult<()> {
265        log::debug!("[DMSC.Cache] Setting cache key: {key} with TTL: {ttl_seconds:?}");
266        
267        let serialized = serde_json::to_string(value)
268            .map_err(|e| crate::core::DMSCError::Other(format!("Serialization error: {e}")))?;
269        
270        let result = self.backend.set(key, &serialized, ttl_seconds).await;
271        
272        match &result {
273            Ok(_) => log::debug!("[DMSC.Cache] Successfully set cache key: {key}"),
274            Err(e) => log::error!("[DMSC.Cache] Failed to set cache key {key}: {e}"),
275        }
276        
277        // Publish invalidate event to ensure consistency across instances
278        self.publish_event(DMSCCacheEvent::Invalidate { key: key.to_string() });
279        
280        result
281    }
282    
283    /// Delete a value from cache
284    /// 
285    /// This method deletes a value from the cache using the specified key. It also
286    /// publishes an invalidate event to ensure cache consistency across all instances.
287    /// 
288    /// **Parameters:**
289    /// - `key`: The cache key to delete
290    /// 
291    /// **Returns:**
292    /// - `Ok(true)` if the key was found and deleted
293    /// - `Ok(false)` if the key didn't exist
294    /// - `Err(DMSCError)` if an error occurs during deletion
295    pub async fn delete(&self, key: &str) -> crate::core::DMSCResult<bool> {
296        log::debug!("[DMSC.Cache] Deleting cache key: {key}");
297        
298        let result = self.backend.delete(key).await;
299        
300        match &result {
301            Ok(true) => log::debug!("[DMSC.Cache] Successfully deleted cache key: {key}"),
302            Ok(false) => log::debug!("[DMSC.Cache] Cache key not found for deletion: {key}"),
303            Err(e) => log::error!("[DMSC.Cache] Failed to delete cache key {key}: {e}"),
304        }
305        
306        // Publish invalidate event to ensure consistency across instances
307        self.publish_event(DMSCCacheEvent::Invalidate { key: key.to_string() });
308        
309        result
310    }
311    
312    /// Check if a key exists in cache
313    /// 
314    /// This method checks if the specified key exists in the cache.
315    /// 
316    /// **Parameters:**
317    /// - `key`: The cache key to check
318    /// 
319    /// **Returns:**
320    /// - `true` if the key exists, `false` otherwise
321    pub async fn exists(&self, key: &str) -> bool {
322        self.backend.exists(key).await
323    }
324    
325    /// Clear all cache entries
326    /// 
327    /// This method clears all entries from the cache. It also publishes a clear event
328    /// to ensure cache consistency across all instances.
329    /// 
330    /// **Returns:**
331    /// - `Ok(())` if the cache was successfully cleared
332    /// - `Err(DMSCError)` if an error occurs during clearing
333    pub async fn clear(&self) -> crate::core::DMSCResult<()> {
334        log::info!("[DMSC.Cache] Clearing all cache entries");
335        
336        let result = self.backend.clear().await;
337        
338        match &result {
339            Ok(_) => log::info!("[DMSC.Cache] Successfully cleared all cache entries"),
340            Err(e) => log::error!("[DMSC.Cache] Failed to clear cache: {e}"),
341        }
342        
343        // Publish clear event to ensure consistency across instances
344        self.publish_event(DMSCCacheEvent::Clear());
345        
346        result
347    }
348    
349    /// Invalidate cache entries matching a pattern
350    /// 
351    /// This method invalidates all cache entries matching the specified pattern. It
352    /// publishes an invalidate pattern event to ensure cache consistency across all instances.
353    /// 
354    /// **Parameters:**
355    /// - `pattern`: The pattern to match cache keys (supports wildcards depending on backend)
356    /// 
357    /// **Returns:**
358    /// - `Ok(())` if the invalidation event was successfully published
359    pub async fn invalidate_pattern(&self, pattern: &str) -> crate::core::DMSCResult<()> {
360        // Publish invalidate pattern event to ensure consistency across instances
361        self.publish_event(DMSCCacheEvent::InvalidatePattern { pattern: pattern.to_string() });
362        
363        Ok(())
364    }
365    
366    /// Get cache statistics
367    /// 
368    /// This method retrieves statistics about the cache, including hit rate, miss rate,
369    /// and the number of entries.
370    /// 
371    /// **Returns:**
372    /// - A `DMSCCacheStats` struct containing the cache statistics
373    pub async fn stats(&self) -> DMSCCacheStats {
374        let stats = self.backend.stats().await;
375        
376        // Log cache statistics for monitoring
377        log::info!("[DMSC.Cache] Cache Statistics: hits={}, misses={}, entries={}, hit_rate={:.2}%", 
378                 stats.hits, stats.misses, stats.entries, stats.avg_hit_rate * 100.0);
379        
380        // Monitor cache performance
381        if stats.hits + stats.misses > 0 {
382            let current_hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
383            if current_hit_rate < 0.5 && stats.hits + stats.misses > 100 {
384                log::warn!("[DMSC.Cache] Warning: Low cache hit rate ({:.2}%) with {} total operations", 
385                         current_hit_rate * 100.0, stats.hits + stats.misses);
386            }
387        }
388        
389        stats
390    }
391    
392    /// Cleanup expired cache entries
393    /// 
394    /// This method removes all expired entries from the cache.
395    /// 
396    /// **Returns:**
397    /// - `Ok(usize)` with the number of expired entries cleaned up
398    /// - `Err(DMSCError)` if an error occurs during cleanup
399    pub async fn cleanup_expired(&self) -> crate::core::DMSCResult<usize> {
400        let cleaned = self.backend.cleanup_expired().await?;
401        
402        // Log cleanup results for monitoring
403        if cleaned > 0 {
404            log::info!("[DMSC.Cache] Cleanup completed: {cleaned} expired entries removed");
405        }
406        
407        Ok(cleaned)
408    }
409    
410    /// Get a value from cache or set it if it doesn't exist
411    /// 
412    /// This method retrieves a value from the cache using the specified key. If the key
413    /// exists and the value is valid, it is returned. Otherwise, the provided factory
414    /// function is called to generate the value, which is then stored in the cache and
415    /// returned.
416    /// 
417    /// **Parameters:**
418    /// - `key`: The cache key to retrieve or set
419    /// - `ttl_seconds`: Optional time-to-live in seconds for the new value
420    /// - `factory`: A function that generates the value if it doesn't exist in cache
421    /// 
422    /// **Returns:**
423    /// - `Ok(T)` with the retrieved or generated value
424    /// - `Err(DMSCError)` if an error occurs during retrieval, generation, or storage
425    pub async fn get_or_set<T, F>(&self, key: &str, ttl_seconds: Option<u64>, factory: F) -> crate::core::DMSCResult<T>
426    where
427        T: serde::Serialize + serde::de::DeserializeOwned + Clone,
428        F: FnOnce() -> crate::core::DMSCResult<T>,
429    {
430        log::debug!("[DMSC.Cache] get_or_set operation for key: {key} with TTL: {ttl_seconds:?}");
431        
432        // Try to get from cache first
433        if let Some(value) = self.get::<T>(key).await? {
434            log::debug!("[DMSC.Cache] get_or_set cache hit for key: {key}");
435            return Ok(value);
436        }
437        
438        log::debug!("[DMSC.Cache] get_or_set cache miss for key: {key}, generating value");
439        
440        // If not found, generate the value
441        let value = factory()?;
442        
443        // Store in cache
444        self.set(key, &value, ttl_seconds).await?;
445        
446        log::debug!("[DMSC.Cache] get_or_set successfully generated and cached value for key: {key}");
447        Ok(value)
448    }
449}
450
451#[cfg(feature = "pyo3")]
452#[pymethods]
453impl DMSCCacheManager {
454    #[new]
455    fn py_new() -> Self {
456        use crate::cache::backends::DMSCMemoryCache;
457        let backend = std::sync::Arc::new(DMSCMemoryCache::new());
458        Self::new(backend)
459    }
460    
461    #[pyo3(name = "get")]
462    fn get_impl(&self, key: String) -> pyo3::PyResult<Option<String>> {
463        let rt = tokio::runtime::Runtime::new().map_err(|e| {
464            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
465        })?;
466        
467        rt.block_on(async {
468            self.get::<String>(&key).await.map_err(|e| {
469                pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
470            })
471        })
472    }
473    
474    #[pyo3(name = "set")]
475    fn set_impl(&self, key: String, value: String, ttl_seconds: Option<u64>) -> pyo3::PyResult<()> {
476        let rt = tokio::runtime::Handle::current();
477        
478        rt.block_on(async {
479            self.set(&key, &value, ttl_seconds).await.map_err(|e| {
480                pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
481            })
482        })
483    }
484    
485    #[pyo3(name = "delete")]
486    fn delete_impl(&self, key: String) -> pyo3::PyResult<bool> {
487        let rt = tokio::runtime::Runtime::new().map_err(|e| {
488            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
489        })?;
490        
491        rt.block_on(async {
492            self.delete(&key).await.map_err(|e| {
493                pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
494            })
495        })
496    }
497    
498    #[pyo3(name = "exists")]
499    fn exists_impl(&self, key: String) -> pyo3::PyResult<bool> {
500        let rt = tokio::runtime::Runtime::new().map_err(|e| {
501            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
502        })?;
503        
504        Ok(rt.block_on(async {
505            self.exists(&key).await
506        }))
507    }
508    
509    #[pyo3(name = "clear")]
510    fn clear_impl(&self) -> pyo3::PyResult<()> {
511        let rt = tokio::runtime::Runtime::new().map_err(|e| {
512            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
513        })?;
514        
515        rt.block_on(async {
516            self.clear().await.map_err(|e| {
517                pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
518            })
519        })
520    }
521    
522    #[pyo3(name = "stats")]
523    fn stats_impl(&self) -> pyo3::PyResult<DMSCCacheStats> {
524        let rt = tokio::runtime::Runtime::new().map_err(|e| {
525            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
526        })?;
527        
528        Ok(rt.block_on(async {
529            self.stats().await
530        }))
531    }
532    
533    #[pyo3(name = "cleanup_expired")]
534    fn cleanup_expired_impl(&self) -> pyo3::PyResult<usize> {
535        let rt = tokio::runtime::Runtime::new().map_err(|e| {
536            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
537        })?;
538        
539        rt.block_on(async {
540            self.cleanup_expired().await.map_err(|e| {
541                pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
542            })
543        })
544    }
545}