dmsc/queue/
manager.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Queue Manager Implementation
21//! 
22//! This file defines the queue management components for the DMSC queue system, including the
23//! queue module and queue manager. These components provide the infrastructure for creating,
24//! managing, and shutting down queues across different backend implementations.
25//! 
26//! ## Key Components
27//! 
28//! - **DMSCQueueModule**: Main queue module implementing the `AsyncServiceModule` trait
29//! - **DMSCQueueManager**: Central queue management component responsible for queue lifecycle
30//! 
31//! ## Design Principles
32//! 
33//! 1. **Async-First**: All queue management operations are asynchronous
34//! 2. **Backend Agnostic**: Supports multiple queue backends through a unified interface
35//! 3. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
36//! 4. **Singleton Pattern**: Each queue is created once and shared across the application
37//! 5. **Lazy Initialization**: Queues are created on demand when requested
38//! 6. **Clean Shutdown**: Properly cleans up resources during shutdown
39//! 7. **Non-critical**: Queue failures should not break the entire application
40//! 8. **Service Module Integration**: Implements async service module traits for seamless integration
41//! 
42//! ## Usage
43//! 
44//! ```rust
45//! use dmsc::prelude::*;
46//! use dmsc::queue::{DMSCQueueConfig, DMSCQueueBackendType};
47//! 
48//! async fn example() -> DMSCResult<()> {
49//!     // Create queue configuration
50//!     let queue_config = DMSCQueueConfig {
51//!         enabled: true,
52//!         backend_type: DMSCQueueBackendType::Memory,
53//!         default_queue_name: "default".to_string(),
54//!         max_retry_count: 3,
55//!         retry_delay_ms: 1000,
56//!         queue_url: "".to_string(),
57//!         connection_string: "".to_string(),
58//!     };
59//!     
60//!     // Create queue module
61//!     let queue_module = DMSCQueueModule::new(queue_config).await?;
62    //!     
63    //!     // Get queue manager
64    //!     let queue_manager = queue_module.queue_manager();
65    //!     
66    //!     // Create or get a queue
67    //!     let queue = queue_manager.create_queue("example_queue").await?;
68    //!     
69    //!     // List all queues
70    //!     let queues = queue_manager.list_queues().await;
71//!     println!("Available queues: {:?}", queues);
72//!     
73//!     Ok(())
74//! }
75//! ```
76
77use std::sync::Arc;
78use std::collections::HashMap;
79use tokio::sync::RwLock;
80use async_trait::async_trait;
81use crate::core::{DMSCResult, AsyncServiceModule, DMSCServiceContext};
82use crate::queue::{DMSCQueue, DMSCQueueConfig, DMSCQueueBackendType};
83
84#[cfg(feature = "pyo3")]
85use pyo3::PyResult;
86
87/// Connection pool for external queue backends
88struct QueueConnectionPool {
89    backend_type: DMSCQueueBackendType,
90    connections: Arc<RwLock<Vec<Arc<dyn std::any::Any + Send + Sync>>>>,
91    max_connections: usize,
92    #[allow(dead_code)]
93    connection_string: String,
94}
95
96impl QueueConnectionPool {
97    fn new(backend_type: DMSCQueueBackendType, connection_string: String, max_connections: usize) -> Self {
98        Self {
99            backend_type,
100            connections: Arc::new(RwLock::new(Vec::new())),
101            max_connections,
102            connection_string,
103        }
104    }
105
106    #[allow(dead_code)]
107    async fn get_connection(&self) -> DMSCResult<Arc<dyn std::any::Any + Send + Sync>> {
108        let mut connections = self.connections.write().await;
109        
110        if let Some(conn) = connections.pop() {
111            return Ok(conn);
112        }
113
114        // Create new connection if pool is empty and under limit
115        if connections.len() < self.max_connections {
116            let conn = self.create_connection().await?;
117            return Ok(conn);
118        }
119
120        Err(crate::core::DMSCError::Other("Connection pool exhausted".to_string()))
121    }
122
123    async fn return_connection(&self, connection: Arc<dyn std::any::Any + Send + Sync>) {
124        let mut connections = self.connections.write().await;
125        if connections.len() < self.max_connections {
126            connections.push(connection);
127        }
128    }
129
130    async fn create_connection(&self) -> DMSCResult<Arc<dyn std::any::Any + Send + Sync>> {
131        match self.backend_type {
132            #[cfg(feature = "rabbitmq")]
133            DMSCQueueBackendType::RabbitMQ => {
134                use lapin::{Connection, ConnectionProperties};
135                let conn = Connection::connect(&self.connection_string, ConnectionProperties::default()).await?;
136                Ok(Arc::new(conn))
137            }
138            #[cfg(not(feature = "rabbitmq"))]
139            DMSCQueueBackendType::RabbitMQ => {
140                Err(crate::core::DMSCError::Config(
141                    "RabbitMQ support is disabled. Enable the 'rabbitmq' feature to use RabbitMQ backend.".to_string(),
142                ))
143            }
144            #[cfg(feature = "redis")]
145            DMSCQueueBackendType::Redis => {
146                use redis::Client;
147                let client = Client::open(self.connection_string.as_str())?;
148                let conn = client.get_multiplexed_async_connection().await?;
149                Ok(Arc::new(conn))
150            }
151            #[cfg(not(feature = "redis"))]
152            DMSCQueueBackendType::Redis => {
153                Err(crate::core::DMSCError::Config(
154                    "Redis support is disabled. Enable the 'redis' feature to use Redis backend.".to_string(),
155                ))
156            }
157            #[cfg(all(feature = "kafka", not(windows)))]
158            DMSCQueueBackendType::Kafka => {
159                // For Kafka, we don't need persistent connections like RabbitMQ/Redis
160                // The Kafka client manages its own connections internally
161                Ok(Arc::new(()))
162            }
163            #[cfg(any(not(feature = "kafka"), windows))]
164            DMSCQueueBackendType::Kafka => {
165                Err(crate::core::DMSCError::Config(
166                    "Kafka support is disabled. Enable the 'kafka' feature to use Kafka backend.".to_string(),
167                ))
168            }
169            DMSCQueueBackendType::Memory => {
170                // Memory queue doesn't require any external connections
171                Ok(Arc::new(()))
172            }
173        }
174    }
175
176    async fn close_all(&self) {
177        let mut connections = self.connections.write().await;
178        connections.clear();
179    }
180}
181
182/// Main queue module implementing the async service module trait.
183/// 
184/// This module provides the main entry point for the queue system, integrating with the
185/// DMSC service module system for lifecycle management.
186#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
187pub struct DMSCQueueModule {
188    /// The queue manager instance
189    queue_manager: Arc<DMSCQueueManager>,
190}
191
192impl DMSCQueueModule {
193    /// Creates a new queue module with the given configuration.
194    /// 
195    /// # Parameters
196    /// 
197    /// - `config`: The queue configuration to use
198    /// 
199    /// # Returns
200    /// 
201    /// A `DMSCResult<Self>` containing the new queue module instance
202    pub async fn new(config: DMSCQueueConfig) -> DMSCResult<Self> {
203        let queue_manager = Arc::new(DMSCQueueManager::new(config).await?);
204        Ok(Self { queue_manager })
205    }
206
207    /// Creates a new queue module with the given configuration (synchronous version).
208    /// 
209    /// This is a synchronous wrapper for use in the builder pattern.
210    /// 
211    /// # Parameters
212    /// 
213    /// - `config`: The queue configuration to use
214    /// 
215    /// # Returns
216    /// 
217    /// A `DMSCResult<Self>` containing the new queue module instance
218    pub fn with_config(_config: DMSCQueueConfig) -> DMSCResult<Self> {
219        // Create a simple memory-based queue manager for synchronous initialization
220        let queue_manager = Arc::new(DMSCQueueManager::default());
221        Ok(Self { queue_manager })
222    }
223
224    /// Returns a reference to the queue manager.
225    /// 
226    /// # Returns
227    /// 
228    /// An Arc<DMSCQueueManager> providing thread-safe access to the queue manager
229    pub fn queue_manager(&self) -> Arc<DMSCQueueManager> {
230        self.queue_manager.clone()
231    }
232}
233
234#[cfg(feature = "pyo3")]
235/// Python bindings for DMSCQueueModule
236#[pyo3::prelude::pymethods]
237impl DMSCQueueModule {
238    #[new]
239    fn py_new() -> PyResult<Self> {
240        use crate::queue::DMSCQueueConfig;
241        use crate::queue::DMSCQueueBackendType;
242        
243        let config = DMSCQueueConfig {
244            enabled: true,
245            backend_type: DMSCQueueBackendType::Memory,
246            connection_string: "memory://localhost".to_string(),
247            max_connections: 10,
248            message_max_size: 1024 * 1024,
249            consumer_timeout_ms: 30000,
250            producer_timeout_ms: 30000,
251            retry_policy: crate::queue::config::DMSCRetryPolicy::default(),
252            dead_letter_config: None,
253        };
254        
255        let runtime = tokio::runtime::Handle::current();
256        let result = runtime.block_on(async {
257            Self::new(config).await
258        });
259        
260        match result {
261            Ok(module) => Ok(module),
262            Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create queue module: {e}"))),
263        }
264    }
265}
266
267#[async_trait]
268impl AsyncServiceModule for DMSCQueueModule {
269    /// Initializes the queue module.
270    /// 
271    /// This method delegates to the queue manager's initialization method.
272    /// 
273    /// # Parameters
274    /// 
275    /// - `_ctx`: The service context (not used in this implementation)
276    /// 
277    /// # Returns
278    /// 
279    /// A `DMSCResult<()>` indicating success or failure
280    async fn init(&mut self, _ctx: &mut DMSCServiceContext) -> DMSCResult<()> {
281        self.queue_manager.init().await
282    }
283
284    /// Performs cleanup after the application has shut down.
285    /// 
286    /// This method delegates to the queue manager's shutdown method.
287    /// 
288    /// # Parameters
289    /// 
290    /// - `_ctx`: The service context (not used in this implementation)
291    /// 
292    /// # Returns
293    /// 
294    /// A `DMSCResult<()>` indicating success or failure
295    async fn after_shutdown(&mut self, _ctx: &mut DMSCServiceContext) -> DMSCResult<()> {
296        self.queue_manager.shutdown().await
297    }
298
299    /// Returns the name of the queue module.
300    /// 
301    /// # Returns
302    /// 
303    /// The module name as a string
304    fn name(&self) -> &str {
305        "dms-queue"
306    }
307
308    /// Indicates whether the queue module is critical.
309    /// 
310    /// The queue module is non-critical, meaning that if it fails to initialize or operate,
311    /// it should not break the entire application.
312    /// 
313    /// # Returns
314    /// 
315    /// `false` since queue is non-critical
316    fn is_critical(&self) -> bool {
317        false
318    }
319}
320
321impl crate::core::ServiceModule for DMSCQueueModule {
322    fn name(&self) -> &str {
323        "DMSC.Queue"
324    }
325
326    fn is_critical(&self) -> bool {
327        false
328    }
329
330    fn priority(&self) -> i32 {
331        15
332    }
333
334    fn dependencies(&self) -> Vec<&str> {
335        vec![]
336    }
337
338    fn init(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
339        Ok(())
340    }
341
342    fn start(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
343        Ok(())
344    }
345
346    fn shutdown(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
347        Ok(())
348    }
349}
350
351/// Central queue management component.
352/// 
353/// This struct is responsible for the lifecycle of queues, including creating, retrieving,
354/// listing, and deleting queues. It manages queues across different backend implementations.
355#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
356pub struct DMSCQueueManager {
357    /// Queue configuration
358    config: DMSCQueueConfig,
359    /// Map of queue names to queue instances, protected by a RwLock for thread-safe access
360    queues: Arc<RwLock<HashMap<String, Arc<dyn DMSCQueue>>>>,
361    /// Connection pool for external backends
362    connection_pool: Option<Arc<QueueConnectionPool>>,
363}
364
365impl DMSCQueueManager {
366    /// Creates a new queue manager with the given configuration.
367    /// 
368    /// # Parameters
369    /// 
370    /// - `config`: The queue configuration to use
371    /// 
372    /// # Returns
373    /// 
374    /// A `DMSCResult<Self>` containing the new queue manager instance
375    pub async fn new(config: DMSCQueueConfig) -> DMSCResult<Self> {
376        let backend_type = config.backend_type.clone();
377        let connection_string = config.connection_string.clone();
378        
379        let connection_pool = match backend_type {
380            DMSCQueueBackendType::RabbitMQ | DMSCQueueBackendType::Redis => {
381                Some(Arc::new(QueueConnectionPool::new(
382                    backend_type,
383                    connection_string,
384                    10, // max_connections
385                )))
386            }
387            #[cfg(all(feature = "kafka", not(windows)))]
388            DMSCQueueBackendType::Kafka => {
389                Some(Arc::new(QueueConnectionPool::new(
390                    backend_type,
391                    connection_string,
392                    10, // max_connections
393                )))
394            }
395            #[cfg(any(not(feature = "kafka"), windows))]
396            DMSCQueueBackendType::Kafka => {
397                None
398            }
399            _ => None,
400        };
401
402        Ok(Self {
403            config: DMSCQueueConfig {
404                enabled: config.enabled,
405                backend_type: config.backend_type.clone(),
406                connection_string: config.connection_string.clone(),
407                max_connections: config.max_connections,
408                message_max_size: config.message_max_size,
409                consumer_timeout_ms: config.consumer_timeout_ms,
410                producer_timeout_ms: config.producer_timeout_ms,
411                retry_policy: config.retry_policy.clone(),
412                dead_letter_config: config.dead_letter_config.clone(),
413            },
414            queues: Arc::new(RwLock::new(HashMap::new())),
415            connection_pool,
416        })
417    }
418}
419
420impl Default for DMSCQueueManager {
421    fn default() -> Self {
422        Self {
423            config: DMSCQueueConfig {
424                enabled: true,
425                backend_type: DMSCQueueBackendType::Memory,
426                connection_string: "memory://localhost".to_string(),
427                max_connections: 10,
428                message_max_size: 1024 * 1024,
429                consumer_timeout_ms: 30000,
430                producer_timeout_ms: 30000,
431                retry_policy: crate::queue::config::DMSCRetryPolicy::default(),
432                dead_letter_config: None,
433            },
434            queues: Arc::new(RwLock::new(HashMap::new())),
435            connection_pool: None,
436        }
437    }
438}
439
440impl DMSCQueueManager {
441    /// Initializes the queue manager.
442    /// 
443    /// This method performs backend-specific initialization, such as setting up connection pools
444    /// for external queue systems.
445    /// 
446    /// # Returns
447    /// 
448    /// A `DMSCResult<()>` indicating success or failure
449    pub async fn init(&self) -> DMSCResult<()> {
450        // Initialize connection pool for external backends
451        if let Some(ref pool) = self.connection_pool {
452            // Pre-create a few connections for better performance
453            for _ in 0..3 {
454                if let Ok(conn) = pool.create_connection().await {
455                    let _ = pool.return_connection(conn).await;
456                }
457            }
458        }
459        Ok(())
460    }
461
462    /// Creates a new queue or returns an existing one with the same name.
463    /// 
464    /// This method implements lazy initialization, creating the queue only when requested.
465    /// 
466    /// # Parameters
467    /// 
468    /// - `name`: The name of the queue to create or retrieve
469    /// 
470    /// # Returns
471    /// 
472    /// A `DMSCResult<Arc<dyn DMSCQueue>>` containing the queue instance
473    pub async fn create_queue(&self, name: &str) -> DMSCResult<Arc<dyn DMSCQueue>> {
474        let mut queues = self.queues.write().await;
475        
476        if let Some(queue) = queues.get(name) {
477            return Ok(queue.clone());
478        }
479
480        let queue = self.create_backend_queue(name).await?;
481        queues.insert(name.to_string(), queue.clone());
482        
483        Ok(queue)
484    }
485
486    /// Creates a queue with the appropriate backend based on configuration.
487    /// 
488    /// This is an internal method that creates the actual queue instance based on the
489    /// configured backend type.
490    /// 
491    /// # Parameters
492    /// 
493    /// - `name`: The name of the queue to create
494    /// 
495    /// # Returns
496    /// 
497    /// A `DMSCResult<Arc<dyn DMSCQueue>>` containing the created queue instance
498    async fn create_backend_queue(&self, name: &str) -> DMSCResult<Arc<dyn DMSCQueue>> {
499        match self.config.backend_type {
500            DMSCQueueBackendType::Memory => {
501                Ok(Arc::new(crate::queue::backends::DMSCMemoryQueue::new(name)))
502            }
503            #[cfg(feature = "rabbitmq")]
504            DMSCQueueBackendType::RabbitMQ => {
505                if let Some(ref pool) = self.connection_pool {
506                    let conn = pool.get_connection().await?;
507                    // Extract the actual lapin connection from the pooled connection
508                    if let Some(_lapin_conn) = conn.downcast_ref::<lapin::Connection>() {
509                        // Since lapin::Connection doesn't implement Clone, we need to create a new connection
510                        // This is a workaround - in a real system, you'd want to use connection pooling properly
511                        let queue = crate::queue::backends::DMSCRabbitMQQueue::new(
512                            name,
513                            &self.config.connection_string,
514                        )
515                        .await?;
516                        // Return connection to pool
517                        let _ = pool.return_connection(conn).await;
518                        return Ok(Arc::new(queue));
519                    }
520                }
521                Ok(Arc::new(
522                    crate::queue::backends::DMSCRabbitMQQueue::new(
523                        name,
524                        &self.config.connection_string,
525                    )
526                    .await?,
527                ))
528            }
529            #[cfg(not(feature = "rabbitmq"))]
530            DMSCQueueBackendType::RabbitMQ => {
531                Err(crate::core::DMSCError::Config(
532                    "RabbitMQ support is disabled. Enable the 'rabbitmq' feature to use RabbitMQ backend.".to_string(),
533                ))
534            }
535            #[cfg(all(feature = "kafka", not(windows)))]
536            DMSCQueueBackendType::Kafka => {
537                Ok(Arc::new(
538                    crate::queue::backends::DMSCKafkaQueue::new(
539                        name,
540                        &self.config.connection_string,
541                    )
542                    .await?,
543                ))
544            }
545            #[cfg(any(not(feature = "kafka"), windows))]
546            DMSCQueueBackendType::Kafka => {
547                Err(crate::core::DMSCError::Config(
548                    "Kafka support is disabled. Enable the 'kafka' feature to use Kafka backend.".to_string(),
549                ))
550            }
551            #[cfg(feature = "redis")]
552            DMSCQueueBackendType::Redis => {
553                Ok(Arc::new(
554                    crate::queue::backends::DMSCRedisQueue::new(
555                        name,
556                        &self.config.connection_string,
557                    )
558                    .await?,
559                ))
560            }
561            #[cfg(not(feature = "redis"))]
562            DMSCQueueBackendType::Redis => {
563                Err(crate::core::DMSCError::Config(
564                    "Redis support is disabled. Enable the 'redis' feature to use Redis backend.".to_string(),
565                ))
566            }
567        }
568    }
569
570    /// Retrieves an existing queue by name.
571    /// 
572    /// # Parameters
573    /// 
574    /// - `name`: The name of the queue to retrieve
575    /// 
576    /// # Returns
577    /// 
578    /// An `Option<Arc<dyn DMSCQueue>>` containing the queue instance if it exists, or None otherwise
579    pub async fn get_queue(&self, name: &str) -> Option<Arc<dyn DMSCQueue>> {
580        let queues = self.queues.read().await;
581        queues.get(name).cloned()
582    }
583
584    /// Lists all currently created queues.
585    /// 
586    /// # Returns
587    /// 
588    /// A `Vec<String>` containing the names of all created queues
589    pub async fn list_queues(&self) -> Vec<String> {
590        let queues = self.queues.read().await;
591        queues.keys().cloned().collect()
592    }
593
594    /// Deletes a queue by name.
595    /// 
596    /// This method removes the queue from the manager and calls the queue's delete method
597    /// to clean up any backend-specific resources.
598    /// 
599    /// # Parameters
600    /// 
601    /// - `name`: The name of the queue to delete
602    /// 
603    /// # Returns
604    /// 
605    /// A `DMSCResult<()>` indicating success or failure
606    pub async fn delete_queue(&self, name: &str) -> DMSCResult<()> {
607        let mut queues = self.queues.write().await;
608        if let Some(queue) = queues.remove(name) {
609            queue.delete().await?;
610        }
611        Ok(())
612    }
613
614    /// Shuts down the queue manager and cleans up resources.
615    /// 
616    /// This method purges all queues and cleans up any backend-specific resources.
617    /// 
618    /// # Returns
619    /// 
620    /// A `DMSCResult<()>` indicating success or failure
621    pub async fn shutdown(&self) -> DMSCResult<()> {
622        let mut queues = self.queues.write().await;
623        for (_, queue) in queues.drain() {
624            // Cleanup each queue
625            let _ = queue.purge().await;
626        }
627        
628        // Close connection pool
629        if let Some(ref pool) = self.connection_pool {
630            pool.close_all().await;
631        }
632        
633        Ok(())
634    }
635}
636
637#[cfg(feature = "pyo3")]
638#[pyo3::prelude::pymethods]
639impl DMSCQueueManager {
640    #[pyo3(name = "create_queue")]
641    fn create_queue_impl(&self, name: String) -> PyResult<String> {
642        let rt = tokio::runtime::Runtime::new().map_err(|e| {
643            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
644        })?;
645        
646        rt.block_on(async {
647            self.create_queue(&name)
648                .await
649                .map(|_| name)
650                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create queue: {e}")))
651        })
652    }
653    
654    #[pyo3(name = "get_queue")]
655    fn get_queue_impl(&self, name: String) -> PyResult<Option<()>> {
656        let rt = tokio::runtime::Runtime::new().map_err(|e| {
657            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
658        })?;
659        
660        Ok(rt.block_on(async {
661            self.get_queue(&name).await.map(|_| ())
662        }))
663    }
664    
665    #[pyo3(name = "list_queues")]
666    fn list_queues_impl(&self) -> PyResult<Vec<String>> {
667        let rt = tokio::runtime::Runtime::new().map_err(|e| {
668            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
669        })?;
670        
671        Ok(rt.block_on(async {
672            self.list_queues().await
673        }))
674    }
675    
676    #[pyo3(name = "delete_queue")]
677    fn delete_queue_impl(&self, name: String) -> PyResult<()> {
678        let rt = tokio::runtime::Runtime::new().map_err(|e| {
679            pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
680        })?;
681        
682        rt.block_on(async {
683            self.delete_queue(&name)
684                .await
685                .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to delete queue: {e}")))
686        })
687    }
688}