dmsc/queue/manager.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Queue Manager Implementation
21//!
22//! This file defines the queue management components for the DMSC queue system, including the
23//! queue module and queue manager. These components provide the infrastructure for creating,
24//! managing, and shutting down queues across different backend implementations.
25//!
26//! ## Key Components
27//!
28//! - **DMSCQueueModule**: Main queue module implementing the `AsyncServiceModule` trait
29//! - **DMSCQueueManager**: Central queue management component responsible for queue lifecycle
30//!
31//! ## Design Principles
32//!
33//! 1. **Async-First**: All queue management operations are asynchronous
34//! 2. **Backend Agnostic**: Supports multiple queue backends through a unified interface
35//! 3. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
36//! 4. **Singleton Pattern**: Each queue is created once and shared across the application
37//! 5. **Lazy Initialization**: Queues are created on demand when requested
38//! 6. **Clean Shutdown**: Properly cleans up resources during shutdown
39//! 7. **Non-critical**: Queue failures should not break the entire application
40//! 8. **Service Module Integration**: Implements async service module traits for seamless integration
41//!
42//! ## Usage
43//!
44//! ```rust
45//! use dmsc::prelude::*;
46//! use dmsc::queue::{DMSCQueueConfig, DMSCQueueBackendType};
47//!
48//! async fn example() -> DMSCResult<()> {
49//! // Create queue configuration
50//! let queue_config = DMSCQueueConfig {
51//! enabled: true,
52//! backend_type: DMSCQueueBackendType::Memory,
53//! default_queue_name: "default".to_string(),
54//! max_retry_count: 3,
55//! retry_delay_ms: 1000,
56//! queue_url: "".to_string(),
57//! connection_string: "".to_string(),
58//! };
59//!
60//! // Create queue module
61//! let queue_module = DMSCQueueModule::new(queue_config).await?;
62 //!
63 //! // Get queue manager
64 //! let queue_manager = queue_module.queue_manager();
65 //!
66 //! // Create or get a queue
67 //! let queue = queue_manager.create_queue("example_queue").await?;
68 //!
69 //! // List all queues
70 //! let queues = queue_manager.list_queues().await;
71//! println!("Available queues: {:?}", queues);
72//!
73//! Ok(())
74//! }
75//! ```
76
77use std::sync::Arc;
78use std::collections::HashMap;
79use tokio::sync::RwLock;
80use async_trait::async_trait;
81use crate::core::{DMSCResult, AsyncServiceModule, DMSCServiceContext};
82use crate::queue::{DMSCQueue, DMSCQueueConfig, DMSCQueueBackendType};
83
84#[cfg(feature = "pyo3")]
85use pyo3::PyResult;
86
87/// Connection pool for external queue backends
88struct QueueConnectionPool {
89 backend_type: DMSCQueueBackendType,
90 connections: Arc<RwLock<Vec<Arc<dyn std::any::Any + Send + Sync>>>>,
91 max_connections: usize,
92 #[allow(dead_code)]
93 connection_string: String,
94}
95
96impl QueueConnectionPool {
97 fn new(backend_type: DMSCQueueBackendType, connection_string: String, max_connections: usize) -> Self {
98 Self {
99 backend_type,
100 connections: Arc::new(RwLock::new(Vec::new())),
101 max_connections,
102 connection_string,
103 }
104 }
105
106 #[allow(dead_code)]
107 async fn get_connection(&self) -> DMSCResult<Arc<dyn std::any::Any + Send + Sync>> {
108 let mut connections = self.connections.write().await;
109
110 if let Some(conn) = connections.pop() {
111 return Ok(conn);
112 }
113
114 // Create new connection if pool is empty and under limit
115 if connections.len() < self.max_connections {
116 let conn = self.create_connection().await?;
117 return Ok(conn);
118 }
119
120 Err(crate::core::DMSCError::Other("Connection pool exhausted".to_string()))
121 }
122
123 async fn return_connection(&self, connection: Arc<dyn std::any::Any + Send + Sync>) {
124 let mut connections = self.connections.write().await;
125 if connections.len() < self.max_connections {
126 connections.push(connection);
127 }
128 }
129
130 async fn create_connection(&self) -> DMSCResult<Arc<dyn std::any::Any + Send + Sync>> {
131 match self.backend_type {
132 #[cfg(feature = "rabbitmq")]
133 DMSCQueueBackendType::RabbitMQ => {
134 use lapin::{Connection, ConnectionProperties};
135 let conn = Connection::connect(&self.connection_string, ConnectionProperties::default()).await?;
136 Ok(Arc::new(conn))
137 }
138 #[cfg(not(feature = "rabbitmq"))]
139 DMSCQueueBackendType::RabbitMQ => {
140 Err(crate::core::DMSCError::Config(
141 "RabbitMQ support is disabled. Enable the 'rabbitmq' feature to use RabbitMQ backend.".to_string(),
142 ))
143 }
144 #[cfg(feature = "redis")]
145 DMSCQueueBackendType::Redis => {
146 use redis::Client;
147 let client = Client::open(self.connection_string.as_str())?;
148 let conn = client.get_multiplexed_async_connection().await?;
149 Ok(Arc::new(conn))
150 }
151 #[cfg(not(feature = "redis"))]
152 DMSCQueueBackendType::Redis => {
153 Err(crate::core::DMSCError::Config(
154 "Redis support is disabled. Enable the 'redis' feature to use Redis backend.".to_string(),
155 ))
156 }
157 #[cfg(all(feature = "kafka", not(windows)))]
158 DMSCQueueBackendType::Kafka => {
159 // For Kafka, we don't need persistent connections like RabbitMQ/Redis
160 // The Kafka client manages its own connections internally
161 Ok(Arc::new(()))
162 }
163 #[cfg(any(not(feature = "kafka"), windows))]
164 DMSCQueueBackendType::Kafka => {
165 Err(crate::core::DMSCError::Config(
166 "Kafka support is disabled. Enable the 'kafka' feature to use Kafka backend.".to_string(),
167 ))
168 }
169 DMSCQueueBackendType::Memory => {
170 // Memory queue doesn't require any external connections
171 Ok(Arc::new(()))
172 }
173 }
174 }
175
176 async fn close_all(&self) {
177 let mut connections = self.connections.write().await;
178 connections.clear();
179 }
180}
181
182/// Main queue module implementing the async service module trait.
183///
184/// This module provides the main entry point for the queue system, integrating with the
185/// DMSC service module system for lifecycle management.
186#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
187pub struct DMSCQueueModule {
188 /// The queue manager instance
189 queue_manager: Arc<DMSCQueueManager>,
190}
191
192impl DMSCQueueModule {
193 /// Creates a new queue module with the given configuration.
194 ///
195 /// # Parameters
196 ///
197 /// - `config`: The queue configuration to use
198 ///
199 /// # Returns
200 ///
201 /// A `DMSCResult<Self>` containing the new queue module instance
202 pub async fn new(config: DMSCQueueConfig) -> DMSCResult<Self> {
203 let queue_manager = Arc::new(DMSCQueueManager::new(config).await?);
204 Ok(Self { queue_manager })
205 }
206
207 /// Creates a new queue module with the given configuration (synchronous version).
208 ///
209 /// This is a synchronous wrapper for use in the builder pattern.
210 ///
211 /// # Parameters
212 ///
213 /// - `config`: The queue configuration to use
214 ///
215 /// # Returns
216 ///
217 /// A `DMSCResult<Self>` containing the new queue module instance
218 pub fn with_config(_config: DMSCQueueConfig) -> DMSCResult<Self> {
219 // Create a simple memory-based queue manager for synchronous initialization
220 let queue_manager = Arc::new(DMSCQueueManager::default());
221 Ok(Self { queue_manager })
222 }
223
224 /// Returns a reference to the queue manager.
225 ///
226 /// # Returns
227 ///
228 /// An Arc<DMSCQueueManager> providing thread-safe access to the queue manager
229 pub fn queue_manager(&self) -> Arc<DMSCQueueManager> {
230 self.queue_manager.clone()
231 }
232}
233
234#[cfg(feature = "pyo3")]
235/// Python bindings for DMSCQueueModule
236#[pyo3::prelude::pymethods]
237impl DMSCQueueModule {
238 #[new]
239 fn py_new() -> PyResult<Self> {
240 use crate::queue::DMSCQueueConfig;
241 use crate::queue::DMSCQueueBackendType;
242
243 let config = DMSCQueueConfig {
244 enabled: true,
245 backend_type: DMSCQueueBackendType::Memory,
246 connection_string: "memory://localhost".to_string(),
247 max_connections: 10,
248 message_max_size: 1024 * 1024,
249 consumer_timeout_ms: 30000,
250 producer_timeout_ms: 30000,
251 retry_policy: crate::queue::config::DMSCRetryPolicy::default(),
252 dead_letter_config: None,
253 };
254
255 let runtime = tokio::runtime::Handle::current();
256 let result = runtime.block_on(async {
257 Self::new(config).await
258 });
259
260 match result {
261 Ok(module) => Ok(module),
262 Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create queue module: {e}"))),
263 }
264 }
265}
266
267#[async_trait]
268impl AsyncServiceModule for DMSCQueueModule {
269 /// Initializes the queue module.
270 ///
271 /// This method delegates to the queue manager's initialization method.
272 ///
273 /// # Parameters
274 ///
275 /// - `_ctx`: The service context (not used in this implementation)
276 ///
277 /// # Returns
278 ///
279 /// A `DMSCResult<()>` indicating success or failure
280 async fn init(&mut self, _ctx: &mut DMSCServiceContext) -> DMSCResult<()> {
281 self.queue_manager.init().await
282 }
283
284 /// Performs cleanup after the application has shut down.
285 ///
286 /// This method delegates to the queue manager's shutdown method.
287 ///
288 /// # Parameters
289 ///
290 /// - `_ctx`: The service context (not used in this implementation)
291 ///
292 /// # Returns
293 ///
294 /// A `DMSCResult<()>` indicating success or failure
295 async fn after_shutdown(&mut self, _ctx: &mut DMSCServiceContext) -> DMSCResult<()> {
296 self.queue_manager.shutdown().await
297 }
298
299 /// Returns the name of the queue module.
300 ///
301 /// # Returns
302 ///
303 /// The module name as a string
304 fn name(&self) -> &str {
305 "dms-queue"
306 }
307
308 /// Indicates whether the queue module is critical.
309 ///
310 /// The queue module is non-critical, meaning that if it fails to initialize or operate,
311 /// it should not break the entire application.
312 ///
313 /// # Returns
314 ///
315 /// `false` since queue is non-critical
316 fn is_critical(&self) -> bool {
317 false
318 }
319}
320
321impl crate::core::ServiceModule for DMSCQueueModule {
322 fn name(&self) -> &str {
323 "DMSC.Queue"
324 }
325
326 fn is_critical(&self) -> bool {
327 false
328 }
329
330 fn priority(&self) -> i32 {
331 15
332 }
333
334 fn dependencies(&self) -> Vec<&str> {
335 vec![]
336 }
337
338 fn init(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
339 Ok(())
340 }
341
342 fn start(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
343 Ok(())
344 }
345
346 fn shutdown(&mut self, _ctx: &mut crate::core::DMSCServiceContext) -> crate::core::DMSCResult<()> {
347 Ok(())
348 }
349}
350
351/// Central queue management component.
352///
353/// This struct is responsible for the lifecycle of queues, including creating, retrieving,
354/// listing, and deleting queues. It manages queues across different backend implementations.
355#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
356pub struct DMSCQueueManager {
357 /// Queue configuration
358 config: DMSCQueueConfig,
359 /// Map of queue names to queue instances, protected by a RwLock for thread-safe access
360 queues: Arc<RwLock<HashMap<String, Arc<dyn DMSCQueue>>>>,
361 /// Connection pool for external backends
362 connection_pool: Option<Arc<QueueConnectionPool>>,
363}
364
365impl DMSCQueueManager {
366 /// Creates a new queue manager with the given configuration.
367 ///
368 /// # Parameters
369 ///
370 /// - `config`: The queue configuration to use
371 ///
372 /// # Returns
373 ///
374 /// A `DMSCResult<Self>` containing the new queue manager instance
375 pub async fn new(config: DMSCQueueConfig) -> DMSCResult<Self> {
376 let backend_type = config.backend_type.clone();
377 let connection_string = config.connection_string.clone();
378
379 let connection_pool = match backend_type {
380 DMSCQueueBackendType::RabbitMQ | DMSCQueueBackendType::Redis => {
381 Some(Arc::new(QueueConnectionPool::new(
382 backend_type,
383 connection_string,
384 10, // max_connections
385 )))
386 }
387 #[cfg(all(feature = "kafka", not(windows)))]
388 DMSCQueueBackendType::Kafka => {
389 Some(Arc::new(QueueConnectionPool::new(
390 backend_type,
391 connection_string,
392 10, // max_connections
393 )))
394 }
395 #[cfg(any(not(feature = "kafka"), windows))]
396 DMSCQueueBackendType::Kafka => {
397 None
398 }
399 _ => None,
400 };
401
402 Ok(Self {
403 config: DMSCQueueConfig {
404 enabled: config.enabled,
405 backend_type: config.backend_type.clone(),
406 connection_string: config.connection_string.clone(),
407 max_connections: config.max_connections,
408 message_max_size: config.message_max_size,
409 consumer_timeout_ms: config.consumer_timeout_ms,
410 producer_timeout_ms: config.producer_timeout_ms,
411 retry_policy: config.retry_policy.clone(),
412 dead_letter_config: config.dead_letter_config.clone(),
413 },
414 queues: Arc::new(RwLock::new(HashMap::new())),
415 connection_pool,
416 })
417 }
418}
419
420impl Default for DMSCQueueManager {
421 fn default() -> Self {
422 Self {
423 config: DMSCQueueConfig {
424 enabled: true,
425 backend_type: DMSCQueueBackendType::Memory,
426 connection_string: "memory://localhost".to_string(),
427 max_connections: 10,
428 message_max_size: 1024 * 1024,
429 consumer_timeout_ms: 30000,
430 producer_timeout_ms: 30000,
431 retry_policy: crate::queue::config::DMSCRetryPolicy::default(),
432 dead_letter_config: None,
433 },
434 queues: Arc::new(RwLock::new(HashMap::new())),
435 connection_pool: None,
436 }
437 }
438}
439
440impl DMSCQueueManager {
441 /// Initializes the queue manager.
442 ///
443 /// This method performs backend-specific initialization, such as setting up connection pools
444 /// for external queue systems.
445 ///
446 /// # Returns
447 ///
448 /// A `DMSCResult<()>` indicating success or failure
449 pub async fn init(&self) -> DMSCResult<()> {
450 // Initialize connection pool for external backends
451 if let Some(ref pool) = self.connection_pool {
452 // Pre-create a few connections for better performance
453 for _ in 0..3 {
454 if let Ok(conn) = pool.create_connection().await {
455 let _ = pool.return_connection(conn).await;
456 }
457 }
458 }
459 Ok(())
460 }
461
462 /// Creates a new queue or returns an existing one with the same name.
463 ///
464 /// This method implements lazy initialization, creating the queue only when requested.
465 ///
466 /// # Parameters
467 ///
468 /// - `name`: The name of the queue to create or retrieve
469 ///
470 /// # Returns
471 ///
472 /// A `DMSCResult<Arc<dyn DMSCQueue>>` containing the queue instance
473 pub async fn create_queue(&self, name: &str) -> DMSCResult<Arc<dyn DMSCQueue>> {
474 let mut queues = self.queues.write().await;
475
476 if let Some(queue) = queues.get(name) {
477 return Ok(queue.clone());
478 }
479
480 let queue = self.create_backend_queue(name).await?;
481 queues.insert(name.to_string(), queue.clone());
482
483 Ok(queue)
484 }
485
486 /// Creates a queue with the appropriate backend based on configuration.
487 ///
488 /// This is an internal method that creates the actual queue instance based on the
489 /// configured backend type.
490 ///
491 /// # Parameters
492 ///
493 /// - `name`: The name of the queue to create
494 ///
495 /// # Returns
496 ///
497 /// A `DMSCResult<Arc<dyn DMSCQueue>>` containing the created queue instance
498 async fn create_backend_queue(&self, name: &str) -> DMSCResult<Arc<dyn DMSCQueue>> {
499 match self.config.backend_type {
500 DMSCQueueBackendType::Memory => {
501 Ok(Arc::new(crate::queue::backends::DMSCMemoryQueue::new(name)))
502 }
503 #[cfg(feature = "rabbitmq")]
504 DMSCQueueBackendType::RabbitMQ => {
505 if let Some(ref pool) = self.connection_pool {
506 let conn = pool.get_connection().await?;
507 // Extract the actual lapin connection from the pooled connection
508 if let Some(_lapin_conn) = conn.downcast_ref::<lapin::Connection>() {
509 // Since lapin::Connection doesn't implement Clone, we need to create a new connection
510 // This is a workaround - in a real system, you'd want to use connection pooling properly
511 let queue = crate::queue::backends::DMSCRabbitMQQueue::new(
512 name,
513 &self.config.connection_string,
514 )
515 .await?;
516 // Return connection to pool
517 let _ = pool.return_connection(conn).await;
518 return Ok(Arc::new(queue));
519 }
520 }
521 Ok(Arc::new(
522 crate::queue::backends::DMSCRabbitMQQueue::new(
523 name,
524 &self.config.connection_string,
525 )
526 .await?,
527 ))
528 }
529 #[cfg(not(feature = "rabbitmq"))]
530 DMSCQueueBackendType::RabbitMQ => {
531 Err(crate::core::DMSCError::Config(
532 "RabbitMQ support is disabled. Enable the 'rabbitmq' feature to use RabbitMQ backend.".to_string(),
533 ))
534 }
535 #[cfg(all(feature = "kafka", not(windows)))]
536 DMSCQueueBackendType::Kafka => {
537 Ok(Arc::new(
538 crate::queue::backends::DMSCKafkaQueue::new(
539 name,
540 &self.config.connection_string,
541 )
542 .await?,
543 ))
544 }
545 #[cfg(any(not(feature = "kafka"), windows))]
546 DMSCQueueBackendType::Kafka => {
547 Err(crate::core::DMSCError::Config(
548 "Kafka support is disabled. Enable the 'kafka' feature to use Kafka backend.".to_string(),
549 ))
550 }
551 #[cfg(feature = "redis")]
552 DMSCQueueBackendType::Redis => {
553 Ok(Arc::new(
554 crate::queue::backends::DMSCRedisQueue::new(
555 name,
556 &self.config.connection_string,
557 )
558 .await?,
559 ))
560 }
561 #[cfg(not(feature = "redis"))]
562 DMSCQueueBackendType::Redis => {
563 Err(crate::core::DMSCError::Config(
564 "Redis support is disabled. Enable the 'redis' feature to use Redis backend.".to_string(),
565 ))
566 }
567 }
568 }
569
570 /// Retrieves an existing queue by name.
571 ///
572 /// # Parameters
573 ///
574 /// - `name`: The name of the queue to retrieve
575 ///
576 /// # Returns
577 ///
578 /// An `Option<Arc<dyn DMSCQueue>>` containing the queue instance if it exists, or None otherwise
579 pub async fn get_queue(&self, name: &str) -> Option<Arc<dyn DMSCQueue>> {
580 let queues = self.queues.read().await;
581 queues.get(name).cloned()
582 }
583
584 /// Lists all currently created queues.
585 ///
586 /// # Returns
587 ///
588 /// A `Vec<String>` containing the names of all created queues
589 pub async fn list_queues(&self) -> Vec<String> {
590 let queues = self.queues.read().await;
591 queues.keys().cloned().collect()
592 }
593
594 /// Deletes a queue by name.
595 ///
596 /// This method removes the queue from the manager and calls the queue's delete method
597 /// to clean up any backend-specific resources.
598 ///
599 /// # Parameters
600 ///
601 /// - `name`: The name of the queue to delete
602 ///
603 /// # Returns
604 ///
605 /// A `DMSCResult<()>` indicating success or failure
606 pub async fn delete_queue(&self, name: &str) -> DMSCResult<()> {
607 let mut queues = self.queues.write().await;
608 if let Some(queue) = queues.remove(name) {
609 queue.delete().await?;
610 }
611 Ok(())
612 }
613
614 /// Shuts down the queue manager and cleans up resources.
615 ///
616 /// This method purges all queues and cleans up any backend-specific resources.
617 ///
618 /// # Returns
619 ///
620 /// A `DMSCResult<()>` indicating success or failure
621 pub async fn shutdown(&self) -> DMSCResult<()> {
622 let mut queues = self.queues.write().await;
623 for (_, queue) in queues.drain() {
624 // Cleanup each queue
625 let _ = queue.purge().await;
626 }
627
628 // Close connection pool
629 if let Some(ref pool) = self.connection_pool {
630 pool.close_all().await;
631 }
632
633 Ok(())
634 }
635}
636
637#[cfg(feature = "pyo3")]
638#[pyo3::prelude::pymethods]
639impl DMSCQueueManager {
640 #[pyo3(name = "create_queue")]
641 fn create_queue_impl(&self, name: String) -> PyResult<String> {
642 let rt = tokio::runtime::Runtime::new().map_err(|e| {
643 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
644 })?;
645
646 rt.block_on(async {
647 self.create_queue(&name)
648 .await
649 .map(|_| name)
650 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create queue: {e}")))
651 })
652 }
653
654 #[pyo3(name = "get_queue")]
655 fn get_queue_impl(&self, name: String) -> PyResult<Option<()>> {
656 let rt = tokio::runtime::Runtime::new().map_err(|e| {
657 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
658 })?;
659
660 Ok(rt.block_on(async {
661 self.get_queue(&name).await.map(|_| ())
662 }))
663 }
664
665 #[pyo3(name = "list_queues")]
666 fn list_queues_impl(&self) -> PyResult<Vec<String>> {
667 let rt = tokio::runtime::Runtime::new().map_err(|e| {
668 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
669 })?;
670
671 Ok(rt.block_on(async {
672 self.list_queues().await
673 }))
674 }
675
676 #[pyo3(name = "delete_queue")]
677 fn delete_queue_impl(&self, name: String) -> PyResult<()> {
678 let rt = tokio::runtime::Runtime::new().map_err(|e| {
679 pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to create runtime: {}", e))
680 })?;
681
682 rt.block_on(async {
683 self.delete_queue(&name)
684 .await
685 .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to delete queue: {e}")))
686 })
687 }
688}