dmsc/cache/manager.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::collections::HashMap;
23use tokio::sync::{RwLock, broadcast};
24use crate::cache::core::{DMSCCache, DMSCCacheStats};
25
26#[cfg(feature = "pyo3")]
27use pyo3::prelude::*;
28
29
30/// # DMSC Cache Manager
31///
32/// This file implements a cache manager that coordinates different cache backends with
33/// consistency support across multiple instances. It provides a unified interface for cache
34/// operations while ensuring cache consistency through event-driven architecture.
35///
36/// ## Design Principles
37/// 1. **Consistency First**: Ensures cache consistency across multiple instances using broadcast events
38/// 2. **Unified Interface**: Provides a consistent API regardless of the underlying cache backend
39/// 3. **Event-Driven Architecture**: Uses broadcast channels for efficient cache invalidation
40/// 4. **Thread Safety**: Implements thread-safe operations using Arc and RwLock
41/// 5. **Flexibility**: Supports any backend implementing the DMSCCache trait
42/// 6. **Scalability**: Designed to handle high-throughput cache operations
43///
44/// ## Usage Examples
45/// ```rust
46/// // Create a cache manager with a Redis backend
47/// let redis_backend = Arc::new(DMSCRedisBackend::new(config).await?);
48/// let mut cache_manager = DMSCCacheManager::new(redis_backend);
49///
50/// // Start the consistency listener
51/// let listener_handle = cache_manager.start_consistency_listener().await;
52///
53/// // Set a value in cache
54/// cache_manager.set("user:123", &User { id: 123, name: "John" }, Some(3600)).await?;
55///
56/// // Get a value from cache
57/// let user: Option<User> = cache_manager.get("user:123").await?;
58///
59/// // Delete a value and invalidate across all instances
60/// cache_manager.delete("user:123").await?;
61///
62/// // Clear cache and broadcast to all instances
63/// cache_manager.clear().await?;
64/// ```
65/// Cache event type for maintaining cache consistency across instances
66///
67/// This enum defines the events that are broadcasted to ensure all cache instances
68/// remain consistent. Each event triggers a corresponding action on all cache instances.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
71pub enum DMSCCacheEvent {
72 /// Invalidate a specific cache key
73 ///
74 /// **Parameters:**
75 /// - `key`: The cache key to invalidate
76 Invalidate { key: String },
77
78 /// Invalidate all cache keys matching a pattern
79 ///
80 /// **Parameters:**
81 /// - `pattern`: The pattern to match cache keys (supports wildcards depending on backend)
82 InvalidatePattern { pattern: String },
83
84 /// Clear all cache entries
85 Clear(),
86}
87
88/// Cache manager that coordinates different cache backends with consistency support
89///
90/// This struct provides a unified interface for cache operations while ensuring cache
91/// consistency across multiple instances through event-driven architecture. It wraps
92/// any backend implementing the DMSCCache trait and adds consistency guarantees.
93#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
94pub struct DMSCCacheManager {
95 /// The underlying cache backend implementation
96 backend: Arc<dyn DMSCCache + Send + Sync>,
97
98 /// Broadcast sender for cache consistency events
99 event_sender: broadcast::Sender<DMSCCacheEvent>,
100
101 /// Broadcast receiver for cache consistency events (used internally)
102 event_receiver: Option<broadcast::Receiver<DMSCCacheEvent>>,
103
104 /// Map of subscribers to cache events (for internal use)
105 _subscribers: Arc<RwLock<HashMap<String, broadcast::Receiver<DMSCCacheEvent>>>>,
106}
107
108impl DMSCCacheManager {
109 /// Create a new cache manager with the specified backend
110 ///
111 /// **Parameters:**
112 /// - `backend`: The underlying cache backend implementation
113 ///
114 /// **Returns:**
115 /// - A new instance of `DMSCCacheManager`
116 pub fn new(backend: Arc<dyn DMSCCache + Send + Sync>) -> Self {
117 let (sender, receiver) = broadcast::channel(100);
118
119 Self {
120 backend,
121 event_sender: sender,
122 event_receiver: Some(receiver),
123 _subscribers: Arc::new(RwLock::new(HashMap::new())),
124 }
125 }
126
127 /// Start the cache consistency event listener
128 ///
129 /// This method starts a background task that listens for cache consistency events
130 /// and applies them to the underlying cache backend. This ensures that all cache
131 /// instances remain consistent across the system.
132 ///
133 /// **Returns:**
134 /// - A `JoinHandle` for the background task
135 pub async fn start_consistency_listener(&mut self) -> tokio::task::JoinHandle<()> {
136 let backend = self.backend.clone();
137 let mut receiver = match self.event_receiver.take() {
138 Some(r) => r,
139 None => {
140 log::error!("[DMSC.Cache] Event receiver already started or not initialized");
141 return tokio::spawn(async {});
142 }
143 };
144
145 log::info!("[DMSC.Cache] Starting cache consistency event listener");
146
147 tokio::spawn(async move {
148 let mut event_count = 0;
149 while let Ok(event) = receiver.recv().await {
150 event_count += 1;
151
152 match event {
153 DMSCCacheEvent::Invalidate { key } => {
154 log::info!("[DMSC.Cache] Processing invalidate event for key: {key}");
155 if let Err(e) = backend.delete(&key).await {
156 log::error!("[DMSC.Cache] Failed to invalidate cache key {key}: {e}");
157 } else {
158 log::info!("[DMSC.Cache] Successfully invalidated cache key: {key}");
159 }
160 },
161 DMSCCacheEvent::InvalidatePattern { pattern } => {
162 log::info!("[DMSC.Cache] Processing invalidate pattern event: {pattern}");
163 match backend.delete_by_pattern(&pattern).await {
164 Ok(count) => {
165 log::info!("[DMSC.Cache] Successfully invalidated {} cache keys matching pattern: {pattern}", count);
166 }
167 Err(e) => {
168 log::error!("[DMSC.Cache] Failed to invalidate cache pattern {pattern}: {e}");
169 }
170 }
171 },
172 DMSCCacheEvent::Clear() => {
173 log::info!("[DMSC.Cache] Processing clear cache event");
174 if let Err(e) = backend.clear().await {
175 log::error!("[DMSC.Cache] Failed to clear cache: {e}");
176 } else {
177 log::info!("[DMSC.Cache] Successfully cleared cache");
178 }
179 },
180 }
181
182 // Log event processing statistics periodically
183 if event_count % 100 == 0 {
184 log::info!("[DMSC.Cache] Processed {event_count} cache consistency events");
185 }
186 }
187
188 log::info!("[DMSC.Cache] Cache consistency event listener stopped after processing {event_count} events");
189 })
190 }
191
192 /// Subscribe to cache consistency events
193 ///
194 /// This method allows external components to subscribe to cache consistency events,
195 /// enabling them to react to cache changes in real-time.
196 ///
197 /// **Returns:**
198 /// - A broadcast receiver for cache events
199 pub fn subscribe(&self) -> broadcast::Receiver<DMSCCacheEvent> {
200 self.event_sender.subscribe()
201 }
202
203 /// Publish a cache consistency event
204 ///
205 /// This method publishes a cache event to all subscribers, ensuring cache consistency
206 /// across all instances.
207 ///
208 /// **Parameters:**
209 /// - `event`: The cache event to publish
210 pub fn publish_event(&self, event: DMSCCacheEvent) {
211 let event_type = match &event {
212 DMSCCacheEvent::Invalidate { key } => format!("Invalidate(key: {key})"),
213 DMSCCacheEvent::InvalidatePattern { pattern } => format!("InvalidatePattern(pattern: {pattern})"),
214 DMSCCacheEvent::Clear() => "Clear".to_string(),
215 };
216
217 log::info!("[DMSC.Cache] Publishing cache event: {event_type}");
218 let _ = self.event_sender.send(event);
219 }
220
221 /// Get a value from cache
222 ///
223 /// This method retrieves a value from the cache using the specified key. If the key
224 /// exists and the value is valid, it is deserialized and returned. Otherwise, `None`
225 /// is returned.
226 ///
227 /// **Parameters:**
228 /// - `key`: The cache key to retrieve
229 ///
230 /// **Returns:**
231 /// - `Ok(Some(T))` if the key exists and the value is valid
232 /// - `Ok(None)` if the key does not exist
233 /// - `Err(DMSCError)` if an error occurs during retrieval or deserialization
234 pub async fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> crate::core::DMSCResult<Option<T>> {
235 log::debug!("[DMSC.Cache] Getting cache key: {key}");
236
237 match self.backend.get(key).await? {
238 Some(cached_value) => {
239 log::debug!("[DMSC.Cache] Cache hit for key: {key}");
240 let value = serde_json::from_str(&cached_value)
241 .map_err(|e| crate::core::DMSCError::Other(format!("Deserialization error: {e}")))?;
242 Ok(Some(value))
243 }
244 None => {
245 log::debug!("[DMSC.Cache] Cache miss for key: {key}");
246 Ok(None)
247 }
248 }
249 }
250
251 /// Set a value in cache with optional TTL
252 ///
253 /// This method stores a value in the cache with the specified key and optional TTL.
254 /// It also publishes an invalidate event to ensure cache consistency across all instances.
255 ///
256 /// **Parameters:**
257 /// - `key`: The cache key to set
258 /// - `value`: The value to store (must implement Serialize)
259 /// - `ttl_seconds`: Optional time-to-live in seconds
260 ///
261 /// **Returns:**
262 /// - `Ok(())` if the value was successfully stored
263 /// - `Err(DMSCError)` if an error occurs during serialization or storage
264 pub async fn set<T: serde::Serialize>(&self, key: &str, value: &T, ttl_seconds: Option<u64>) -> crate::core::DMSCResult<()> {
265 log::debug!("[DMSC.Cache] Setting cache key: {key} with TTL: {ttl_seconds:?}");
266
267 let serialized = serde_json::to_string(value)
268 .map_err(|e| crate::core::DMSCError::Other(format!("Serialization error: {e}")))?;
269
270 let result = self.backend.set(key, &serialized, ttl_seconds).await;
271
272 match &result {
273 Ok(_) => log::debug!("[DMSC.Cache] Successfully set cache key: {key}"),
274 Err(e) => log::error!("[DMSC.Cache] Failed to set cache key {key}: {e}"),
275 }
276
277 // Publish invalidate event to ensure consistency across instances
278 self.publish_event(DMSCCacheEvent::Invalidate { key: key.to_string() });
279
280 result
281 }
282
283 /// Delete a value from cache
284 ///
285 /// This method deletes a value from the cache using the specified key. It also
286 /// publishes an invalidate event to ensure cache consistency across all instances.
287 ///
288 /// **Parameters:**
289 /// - `key`: The cache key to delete
290 ///
291 /// **Returns:**
292 /// - `Ok(true)` if the key was found and deleted
293 /// - `Ok(false)` if the key didn't exist
294 /// - `Err(DMSCError)` if an error occurs during deletion
295 pub async fn delete(&self, key: &str) -> crate::core::DMSCResult<bool> {
296 log::debug!("[DMSC.Cache] Deleting cache key: {key}");
297
298 let result = self.backend.delete(key).await;
299
300 match &result {
301 Ok(true) => log::debug!("[DMSC.Cache] Successfully deleted cache key: {key}"),
302 Ok(false) => log::debug!("[DMSC.Cache] Cache key not found for deletion: {key}"),
303 Err(e) => log::error!("[DMSC.Cache] Failed to delete cache key {key}: {e}"),
304 }
305
306 // Publish invalidate event to ensure consistency across instances
307 self.publish_event(DMSCCacheEvent::Invalidate { key: key.to_string() });
308
309 result
310 }
311
312 /// Check if a key exists in cache
313 ///
314 /// This method checks if the specified key exists in the cache.
315 ///
316 /// **Parameters:**
317 /// - `key`: The cache key to check
318 ///
319 /// **Returns:**
320 /// - `true` if the key exists, `false` otherwise
321 pub async fn exists(&self, key: &str) -> bool {
322 self.backend.exists(key).await
323 }
324
325 /// Clear all cache entries
326 ///
327 /// This method clears all entries from the cache. It also publishes a clear event
328 /// to ensure cache consistency across all instances.
329 ///
330 /// **Returns:**
331 /// - `Ok(())` if the cache was successfully cleared
332 /// - `Err(DMSCError)` if an error occurs during clearing
333 pub async fn clear(&self) -> crate::core::DMSCResult<()> {
334 log::info!("[DMSC.Cache] Clearing all cache entries");
335
336 let result = self.backend.clear().await;
337
338 match &result {
339 Ok(_) => log::info!("[DMSC.Cache] Successfully cleared all cache entries"),
340 Err(e) => log::error!("[DMSC.Cache] Failed to clear cache: {e}"),
341 }
342
343 // Publish clear event to ensure consistency across instances
344 self.publish_event(DMSCCacheEvent::Clear());
345
346 result
347 }
348
349 /// Invalidate cache entries matching a pattern
350 ///
351 /// This method invalidates all cache entries matching the specified pattern. It
352 /// publishes an invalidate pattern event to ensure cache consistency across all instances.
353 ///
354 /// **Parameters:**
355 /// - `pattern`: The pattern to match cache keys (supports wildcards depending on backend)
356 ///
357 /// **Returns:**
358 /// - `Ok(())` if the invalidation event was successfully published
359 pub async fn invalidate_pattern(&self, pattern: &str) -> crate::core::DMSCResult<()> {
360 // Publish invalidate pattern event to ensure consistency across instances
361 self.publish_event(DMSCCacheEvent::InvalidatePattern { pattern: pattern.to_string() });
362
363 Ok(())
364 }
365
366 /// Get cache statistics
367 ///
368 /// This method retrieves statistics about the cache, including hit rate, miss rate,
369 /// and the number of entries.
370 ///
371 /// **Returns:**
372 /// - A `DMSCCacheStats` struct containing the cache statistics
373 pub async fn stats(&self) -> DMSCCacheStats {
374 let stats = self.backend.stats().await;
375
376 // Log cache statistics for monitoring
377 log::info!("[DMSC.Cache] Cache Statistics: hits={}, misses={}, entries={}, hit_rate={:.2}%",
378 stats.hits, stats.misses, stats.entries, stats.avg_hit_rate * 100.0);
379
380 // Monitor cache performance
381 if stats.hits + stats.misses > 0 {
382 let current_hit_rate = stats.hits as f64 / (stats.hits + stats.misses) as f64;
383 if current_hit_rate < 0.5 && stats.hits + stats.misses > 100 {
384 log::warn!("[DMSC.Cache] Warning: Low cache hit rate ({:.2}%) with {} total operations",
385 current_hit_rate * 100.0, stats.hits + stats.misses);
386 }
387 }
388
389 stats
390 }
391
392 /// Cleanup expired cache entries
393 ///
394 /// This method removes all expired entries from the cache.
395 ///
396 /// **Returns:**
397 /// - `Ok(usize)` with the number of expired entries cleaned up
398 /// - `Err(DMSCError)` if an error occurs during cleanup
399 pub async fn cleanup_expired(&self) -> crate::core::DMSCResult<usize> {
400 let cleaned = self.backend.cleanup_expired().await?;
401
402 // Log cleanup results for monitoring
403 if cleaned > 0 {
404 log::info!("[DMSC.Cache] Cleanup completed: {cleaned} expired entries removed");
405 }
406
407 Ok(cleaned)
408 }
409
410 /// Get a value from cache or set it if it doesn't exist
411 ///
412 /// This method retrieves a value from the cache using the specified key. If the key
413 /// exists and the value is valid, it is returned. Otherwise, the provided factory
414 /// function is called to generate the value, which is then stored in the cache and
415 /// returned.
416 ///
417 /// **Parameters:**
418 /// - `key`: The cache key to retrieve or set
419 /// - `ttl_seconds`: Optional time-to-live in seconds for the new value
420 /// - `factory`: A function that generates the value if it doesn't exist in cache
421 ///
422 /// **Returns:**
423 /// - `Ok(T)` with the retrieved or generated value
424 /// - `Err(DMSCError)` if an error occurs during retrieval, generation, or storage
425 pub async fn get_or_set<T, F>(&self, key: &str, ttl_seconds: Option<u64>, factory: F) -> crate::core::DMSCResult<T>
426 where
427 T: serde::Serialize + serde::de::DeserializeOwned + Clone,
428 F: FnOnce() -> crate::core::DMSCResult<T>,
429 {
430 log::debug!("[DMSC.Cache] get_or_set operation for key: {key} with TTL: {ttl_seconds:?}");
431
432 // Try to get from cache first
433 if let Some(value) = self.get::<T>(key).await? {
434 log::debug!("[DMSC.Cache] get_or_set cache hit for key: {key}");
435 return Ok(value);
436 }
437
438 log::debug!("[DMSC.Cache] get_or_set cache miss for key: {key}, generating value");
439
440 // If not found, generate the value
441 let value = factory()?;
442
443 // Store in cache
444 self.set(key, &value, ttl_seconds).await?;
445
446 log::debug!("[DMSC.Cache] get_or_set successfully generated and cached value for key: {key}");
447 Ok(value)
448 }
449}
450
451#[cfg(feature = "pyo3")]
452#[pymethods]
453impl DMSCCacheManager {
454 #[new]
455 fn py_new() -> Self {
456 use crate::cache::backends::DMSCMemoryCache;
457 let backend = std::sync::Arc::new(DMSCMemoryCache::new());
458 Self::new(backend)
459 }
460
461 #[pyo3(name = "get")]
462 fn get_impl(&self, key: String) -> pyo3::PyResult<Option<String>> {
463 let rt = tokio::runtime::Runtime::new().map_err(|e| {
464 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
465 })?;
466
467 rt.block_on(async {
468 self.get::<String>(&key).await.map_err(|e| {
469 pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
470 })
471 })
472 }
473
474 #[pyo3(name = "set")]
475 fn set_impl(&self, key: String, value: String, ttl_seconds: Option<u64>) -> pyo3::PyResult<()> {
476 let rt = tokio::runtime::Handle::current();
477
478 rt.block_on(async {
479 self.set(&key, &value, ttl_seconds).await.map_err(|e| {
480 pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
481 })
482 })
483 }
484
485 #[pyo3(name = "delete")]
486 fn delete_impl(&self, key: String) -> pyo3::PyResult<bool> {
487 let rt = tokio::runtime::Runtime::new().map_err(|e| {
488 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
489 })?;
490
491 rt.block_on(async {
492 self.delete(&key).await.map_err(|e| {
493 pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
494 })
495 })
496 }
497
498 #[pyo3(name = "exists")]
499 fn exists_impl(&self, key: String) -> pyo3::PyResult<bool> {
500 let rt = tokio::runtime::Runtime::new().map_err(|e| {
501 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
502 })?;
503
504 Ok(rt.block_on(async {
505 self.exists(&key).await
506 }))
507 }
508
509 #[pyo3(name = "clear")]
510 fn clear_impl(&self) -> pyo3::PyResult<()> {
511 let rt = tokio::runtime::Runtime::new().map_err(|e| {
512 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
513 })?;
514
515 rt.block_on(async {
516 self.clear().await.map_err(|e| {
517 pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
518 })
519 })
520 }
521
522 #[pyo3(name = "stats")]
523 fn stats_impl(&self) -> pyo3::PyResult<DMSCCacheStats> {
524 let rt = tokio::runtime::Runtime::new().map_err(|e| {
525 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
526 })?;
527
528 Ok(rt.block_on(async {
529 self.stats().await
530 }))
531 }
532
533 #[pyo3(name = "cleanup_expired")]
534 fn cleanup_expired_impl(&self) -> pyo3::PyResult<usize> {
535 let rt = tokio::runtime::Runtime::new().map_err(|e| {
536 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
537 })?;
538
539 rt.block_on(async {
540 self.cleanup_expired().await.map_err(|e| {
541 pyo3::exceptions::PyRuntimeError::new_err(format!("Cache error: {}", e))
542 })
543 })
544 }
545}