dmsc/queue/backends/
rabbitmq_backend.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19#![cfg(feature = "rabbitmq")]
20
21//! # RabbitMQ Queue Backend
22//! 
23//! This module provides a RabbitMQ implementation for the DMSC queue system. It allows
24//! sending and receiving messages using RabbitMQ as the underlying message broker.
25//! 
26//! ## Key Components
27//! 
28//! - **DMSCRabbitMQQueue**: Main RabbitMQ queue implementation
29//! - **RabbitMQProducer**: RabbitMQ producer implementation
30//! - **RabbitMQConsumer**: RabbitMQ consumer implementation
31//! 
32//! ## Design Principles
33//! 
34//! 1. **Async Trait Implementation**: Implements the DMSCQueue, DMSCQueueProducer, and DMSCQueueConsumer traits
35//! 2. **RabbitMQ Integration**: Uses the lapin crate for RabbitMQ connectivity
36//! 3. **Thread Safety**: Uses Arc for safe sharing of connections, channels, and consumers
37//! 4. **Future-based API**: Leverages async/await for non-blocking operations
38//! 5. **Durable Queues**: Configured with durable queues for message persistence
39//! 6. **Error Handling**: Comprehensive error handling with DMSCResult
40//! 7. **Stream-based Consumer**: Uses StreamExt for efficient message processing
41//! 8. **Batch Support**: Provides batch sending functionality
42//! 
43//! ## Usage
44//! 
45//! ```rust
46//! use dmsc::prelude::*;
47//! 
48//! async fn example() -> DMSCResult<()> {
49//!     // Create a new RabbitMQ queue
50//!     let queue = DMSCRabbitMQQueue::new("test-queue", "amqp://guest:guest@localhost:5672/%2f").await?;
51//!     
52//!     // Create a producer
53//!     let producer = queue.create_producer().await?;
54//!     
55//!     // Create a message
56//!     let message = DMSCQueueMessage {
57//!         id: "12345".to_string(),
58//!         payload: b"Hello, RabbitMQ!".to_vec(),
59//!         headers: vec![("key1".to_string(), "value1".to_string())],
60//!         timestamp: chrono::Utc::now().timestamp_millis() as u64,
61//!         priority: 0,
62//!     };
63//!     
64//!     // Send the message
65//!     producer.send(message).await?;
66//!     
67//!     // Create a consumer
68//!     let consumer = queue.create_consumer("test-consumer-group").await?;
69//!     
70//!     // Receive messages
71//!     if let Some(received_message) = consumer.receive().await? {
72//!         println!("Received message: {:?}", received_message);
73//!         consumer.ack(&received_message.id).await?;
74//!     }
75//!     
76//!     Ok(())
77//! }
78//! ```
79
80use async_trait::async_trait;
81use lapin::{Connection, ConnectionProperties, Channel, Queue, Consumer};
82use lapin::options::{QueueDeclareOptions, BasicConsumeOptions, BasicPublishOptions, BasicAckOptions};
83use lapin::types::FieldTable;
84use std::collections::HashMap;
85use std::sync::atomic::AtomicU64;
86use std::sync::Arc;
87use tokio::sync::Mutex;
88use futures::StreamExt;
89use serde::{Deserialize, Serialize};
90#[cfg(feature = "http_client")]
91use reqwest;
92#[cfg(feature = "http_client")]
93use urlencoding;
94use crate::core::DMSCResult;
95use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
96
97/// RabbitMQ queue implementation for the DMSC queue system.
98///
99/// This struct provides a RabbitMQ implementation of the DMSCQueue trait, allowing
100/// sending and receiving messages using RabbitMQ as the underlying message broker.
101pub struct DMSCRabbitMQQueue {
102    /// Queue name
103    name: String,
104    /// RabbitMQ connection
105    #[allow(dead_code)]
106    connection: Arc<Connection>,
107    /// RabbitMQ channel
108    channel: Arc<Channel>,
109    /// RabbitMQ queue
110    #[allow(dead_code)]
111    queue: Arc<Queue>,
112    /// RabbitMQ management API URL
113    #[allow(dead_code)]
114    management_url: Option<String>,
115    /// Management API username
116    #[allow(dead_code)]
117    management_username: Option<String>,
118    /// RabbitMQ management API password
119    #[allow(dead_code)]
120    management_password: Option<String>,
121}
122
123#[derive(Debug, Serialize, Deserialize)]
124#[allow(dead_code)]
125struct RabbitMQQueueInfo {
126    name: String,
127    messages: u64,
128    consumers: u64,
129    message_stats: Option<RabbitMQMessageStats>,
130}
131
132#[derive(Debug, Serialize, Deserialize)]
133#[allow(dead_code)]
134struct RabbitMQMessageStats {
135    publish: Option<u64>,
136    deliver_no_ack: Option<u64>,
137    get_no_ack: Option<u64>,
138    redeliver: Option<u64>,
139    deliver: Option<u64>,
140    get: Option<u64>,
141}
142
143impl DMSCRabbitMQQueue {
144    /// Creates a new RabbitMQ queue instance.
145    ///
146    /// # Parameters
147    ///
148    /// - `name`: The name of the queue
149    /// - `connection_string`: The RabbitMQ connection string
150    ///
151    /// # Returns
152    ///
153    /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
154    pub async fn new(name: &str, connection_string: &str) -> DMSCResult<Self> {
155        let connection = Connection::connect(connection_string, ConnectionProperties::default()).await?;
156        Self::new_with_connection(name, connection).await
157    }
158
159    /// Creates a new RabbitMQ queue instance with an existing connection.
160    ///
161    /// # Parameters
162    ///
163    /// - `name`: The name of the queue
164    /// - `connection`: The existing RabbitMQ connection
165    ///
166    /// # Returns
167    ///
168    /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
169    pub async fn new_with_connection(name: &str, connection: lapin::Connection) -> DMSCResult<Self> {
170        let channel = connection.create_channel().await?;
171        
172        let queue = channel
173            .queue_declare(
174                name,
175                QueueDeclareOptions {
176                    durable: true,
177                    ..Default::default()
178                },
179                FieldTable::default(),
180            )
181            .await?;
182
183        Ok(Self {
184            name: name.to_string(),
185            connection: Arc::new(connection),
186            channel: Arc::new(channel),
187            queue: Arc::new(queue),
188            management_url: None,
189            management_username: None,
190            management_password: None,
191        })
192    }
193
194    /// Creates a new RabbitMQ queue instance with management API support.
195    ///
196    /// # Parameters
197    ///
198    /// - `name`: The name of the queue
199    /// - `connection`: The existing RabbitMQ connection
200    /// - `management_url`: RabbitMQ management API URL (e.g., "http://localhost:15672")
201    /// - `management_username`: Management API username
202    /// - `management_password`: Management API password
203    ///
204    /// # Returns
205    ///
206    /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
207    pub async fn new_with_management(
208        name: &str,
209        connection: lapin::Connection,
210        management_url: &str,
211        management_username: &str,
212        management_password: &str,
213    ) -> DMSCResult<Self> {
214        let channel = connection.create_channel().await?;
215        
216        let queue = channel
217            .queue_declare(
218                name,
219                QueueDeclareOptions {
220                    durable: true,
221                    ..Default::default()
222                },
223                FieldTable::default(),
224            )
225            .await?;
226
227        Ok(Self {
228            name: name.to_string(),
229            connection: Arc::new(connection),
230            channel: Arc::new(channel),
231            queue: Arc::new(queue),
232            management_url: Some(management_url.to_string()),
233            management_username: Some(management_username.to_string()),
234            management_password: Some(management_password.to_string()),
235        })
236    }
237
238    /// Fetches detailed statistics from RabbitMQ management API.
239    ///
240    /// This method attempts to connect to RabbitMQ's management API to retrieve
241    /// comprehensive queue statistics including message counts, consumer counts,
242    /// and message processing rates.
243    ///
244    /// # Returns
245    ///
246    /// Detailed DMSCQueueStats wrapped in DMSCResult
247    #[cfg(feature = "http_client")]
248    async fn fetch_rabbitmq_stats(&self) -> DMSCResult<DMSCQueueStats> {
249        let (Some(management_url), Some(username), Some(password)) = (
250            &self.management_url,
251            &self.management_username,
252            &self.management_password,
253        ) else {
254            return Err(crate::core::DMSCError::Other(
255                "Management API not configured. Use new_with_management() to enable.".to_string(),
256            ));
257        };
258
259        let client = reqwest::Client::new();
260        let url = format!(
261            "{}/api/queues/%2f/{}",
262            management_url.trim_end_matches('/'),
263            urlencoding::encode(&self.name)
264        );
265
266        let response = client
267            .get(&url)
268            .basic_auth(username, Some(password))
269            .send()
270            .await
271            .map_err(|e| crate::core::DMSCError::Other(format!("Failed to connect to RabbitMQ Management API: {}", e)))?;
272
273        if !response.status().is_success() {
274            return Err(crate::core::DMSCError::Other(format!(
275                "RabbitMQ Management API returned error: {}",
276                response.status()
277            )));
278        }
279
280        let queue_info: RabbitMQQueueInfo = response
281            .json()
282            .await
283            .map_err(|e| crate::core::DMSCError::Other(format!("Failed to parse RabbitMQ response: {}", e)))?;
284
285        let processed_messages = queue_info
286            .message_stats
287            .as_ref()
288            .and_then(|s| s.publish.or(s.deliver).or(s.get))
289            .unwrap_or(0);
290
291        let failed_messages = queue_info
292            .message_stats
293            .as_ref()
294            .and_then(|s| s.redeliver)
295            .unwrap_or(0);
296
297        Ok(DMSCQueueStats {
298            queue_name: queue_info.name,
299            message_count: queue_info.messages,
300            consumer_count: queue_info.consumers as u32,
301            producer_count: 0,
302            processed_messages,
303            failed_messages,
304            avg_processing_time_ms: 0.0,
305            total_bytes_sent: 0,
306            total_bytes_received: 0,
307            last_message_time: 0,
308        })
309    }
310    
311    async fn get_basic_stats(&self) -> DMSCResult<DMSCQueueStats> {
312        let queue_info = self.channel
313            .queue_declare(
314                &self.name,
315                lapin::options::QueueDeclareOptions {
316                    passive: true,
317                    ..Default::default()
318                },
319                lapin::types::FieldTable::default(),
320            )
321            .await?;
322        
323        Ok(DMSCQueueStats {
324            queue_name: self.name.clone(),
325            message_count: queue_info.message_count() as u64,
326            consumer_count: queue_info.consumer_count() as u32,
327            producer_count: 0,
328            processed_messages: 0,
329            failed_messages: 0,
330            avg_processing_time_ms: 0.0,
331            total_bytes_sent: 0,
332            total_bytes_received: 0,
333            last_message_time: 0,
334        })
335    }
336
337}
338
339#[async_trait]
340impl DMSCQueue for DMSCRabbitMQQueue {
341    /// Creates a new producer for the RabbitMQ queue.
342    ///
343    /// # Returns
344    ///
345    /// A new DMSCQueueProducer instance wrapped in DMSCResult
346    async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
347        Ok(Box::new(RabbitMQProducer {
348            channel: self.channel.clone(),
349            queue_name: self.name.clone(),
350        }))
351    }
352
353    /// Creates a new consumer for the RabbitMQ queue.
354    ///
355    /// # Parameters
356    ///
357    /// - `consumer_group`: The consumer group name
358    ///
359    /// # Returns
360    ///
361    /// A new DMSCQueueConsumer instance wrapped in DMSCResult
362    async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
363        let consumer = self.channel
364            .basic_consume(
365                &self.name,
366                consumer_group,
367                BasicConsumeOptions::default(),
368                FieldTable::default(),
369            )
370            .await?;
371
372        Ok(Box::new(RabbitMQConsumer::new(self.channel.clone(), consumer)))
373    }
374
375    /// Gets statistics for the RabbitMQ queue.
376    ///
377    /// This implementation integrates with RabbitMQ management API to provide detailed
378    /// queue statistics including message counts, consumer counts, and processing metrics.
379    ///
380    /// # Returns
381    ///
382    /// DMSCQueueStats containing detailed queue statistics wrapped in DMSCResult
383    async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
384        #[cfg(feature = "http_client")]
385        match self.fetch_rabbitmq_stats().await {
386            Ok(detailed_stats) => Ok(detailed_stats),
387            Err(_) => {
388                self.get_basic_stats().await
389            }
390        }
391        #[cfg(not(feature = "http_client"))]
392        self.get_basic_stats().await
393    }
394
395    /// Purges all messages from the RabbitMQ queue.
396    ///
397    /// # Returns
398    ///
399    /// DMSCResult indicating success or failure
400
401    /// Purges all messages from the RabbitMQ queue.
402    ///
403    /// # Returns
404    ///
405    /// DMSCResult indicating success or failure
406    async fn purge(&self) -> DMSCResult<()> {
407        self.channel.queue_purge(&self.name, Default::default()).await?;
408        Ok(())
409    }
410
411    /// Deletes the RabbitMQ queue.
412    ///
413    /// # Returns
414    ///
415    /// DMSCResult indicating success or failure
416    async fn delete(&self) -> DMSCResult<()> {
417        self.channel.queue_delete(&self.name, Default::default()).await?;
418        Ok(())
419    }
420}
421
422/// RabbitMQ producer implementation.
423///
424/// This struct provides a RabbitMQ implementation of the DMSCQueueProducer trait,
425/// allowing sending messages to RabbitMQ queues.
426struct RabbitMQProducer {
427    /// RabbitMQ channel
428    channel: Arc<Channel>,
429    /// Queue name to send messages to
430    queue_name: String,
431}
432
433#[async_trait]
434impl DMSCQueueProducer for RabbitMQProducer {
435    /// Sends a single message to the RabbitMQ queue.
436    ///
437    /// # Parameters
438    ///
439    /// - `message`: The message to send
440    ///
441    /// # Returns
442    ///
443    /// DMSCResult indicating success or failure
444    async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
445        let payload = serde_json::to_vec(&message)?;
446        
447        self.channel
448            .basic_publish(
449                "",
450                &self.queue_name,
451                BasicPublishOptions::default(),
452                &payload,
453                Default::default(),
454            )
455            .await?;
456        
457        Ok(())
458    }
459
460    /// Sends multiple messages to the RabbitMQ queue.
461    ///
462    /// # Parameters
463    ///
464    /// - `messages`: A vector of messages to send
465    ///
466    /// # Returns
467    ///
468    /// DMSCResult indicating success or failure
469    async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
470        for message in messages {
471            self.send(message).await?;
472        }
473        Ok(())
474    }
475}
476
477/// RabbitMQ consumer implementation.
478#[allow(dead_code)]
479struct RabbitMQConsumer {
480    /// RabbitMQ channel for sending acks/nacks
481    channel: Arc<Channel>,
482    /// RabbitMQ consumer
483    consumer: Arc<Mutex<Consumer>>,
484    /// Flag indicating if the consumer is paused
485    paused: Arc<Mutex<bool>>,
486    /// Message tracking: delivery_tag -> message_id
487    delivery_tags: Arc<Mutex<HashMap<u64, String>>>,
488    /// Next delivery tag counter
489    next_delivery_tag: Arc<AtomicU64>,
490}
491
492impl RabbitMQConsumer {
493    fn new(channel: Arc<Channel>, consumer: Consumer) -> Self {
494        Self {
495            channel,
496            consumer: Arc::new(Mutex::new(consumer)),
497            paused: Arc::new(Mutex::new(false)),
498            delivery_tags: Arc::new(Mutex::new(HashMap::new())),
499            next_delivery_tag: Arc::new(AtomicU64::new(1)),
500        }
501    }
502}
503
504#[async_trait]
505impl DMSCQueueConsumer for RabbitMQConsumer {
506    /// Receives a message from the RabbitMQ queue.
507    ///
508    /// # Returns
509    ///
510    /// An Option containing the received message, or None if the consumer is paused
511    async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
512        let paused = *self.paused.lock().await;
513        if paused {
514            return Ok(None);
515        }
516
517        let mut consumer = self.consumer.lock().await;
518        
519        match consumer.next().await {
520            Some(delivery_result) => {
521                let delivery = delivery_result.map_err(|e| crate::core::DMSCError::Other(format!("Consumer error: {e}")))?;
522                
523                let _message_id = {
524                    let delivery_tag = delivery.delivery_tag;
525                    let message_id = format!("msg_{}", uuid::Uuid::new_v4());
526                    
527                    let mut tags = self.delivery_tags.lock().await;
528                    tags.insert(delivery_tag, message_id.clone());
529                    
530                    message_id
531                };
532                
533                let message: DMSCQueueMessage = serde_json::from_slice(&delivery.data)?;
534                
535                Ok(Some(message))
536            },
537            None => Ok(None)
538        }
539    }
540
541    /// Acknowledges a message.
542    ///
543    /// This implementation tracks delivery tags and uses basic_ack to acknowledge messages.
544    ///
545    /// # Parameters
546    ///
547    /// - `message_id`: The message ID to acknowledge
548    ///
549    /// # Returns
550    ///
551    /// DMSCResult indicating success or failure
552    async fn ack(&self, message_id: &str) -> DMSCResult<()> {
553        log::debug!("Acknowledging message: {}", message_id);
554        
555        let delivery_tag = {
556            let tags = self.delivery_tags.lock().await;
557            tags.iter()
558                .find(|(_, id)| *id == message_id)
559                .map(|(tag, _)| *tag)
560        };
561
562        if let Some(tag) = delivery_tag {
563            let channel = self.channel.clone();
564            channel.basic_ack(tag, BasicAckOptions { multiple: false }).await
565                .map_err(|e| crate::core::DMSCError::Other(format!("Failed to ack message: {e}")))?;
566            
567            let mut tags = self.delivery_tags.lock().await;
568            tags.remove(&tag);
569            
570            log::debug!("Message {} acknowledged successfully", message_id);
571        } else {
572            log::warn!("Message ID not found for acknowledgment: {}", message_id);
573        }
574        
575        Ok(())
576    }
577
578    /// Negatively acknowledges a message.
579    ///
580    /// This implementation tracks delivery tags and uses basic_nack to negatively acknowledge messages.
581    ///
582    /// # Parameters
583    ///
584    /// - `message_id`: The message ID to negatively acknowledge
585    ///
586    /// # Returns
587    ///
588    /// DMSCResult indicating success or failure
589    async fn nack(&self, message_id: &str) -> DMSCResult<()> {
590        log::debug!("Negatively acknowledging message: {}", message_id);
591        
592        let delivery_tag = {
593            let tags = self.delivery_tags.lock().await;
594            tags.iter()
595                .find(|(_, id)| *id == message_id)
596                .map(|(tag, _)| *tag)
597        };
598
599        if let Some(tag) = delivery_tag {
600            let channel = self.channel.clone();
601            use lapin::options::BasicNackOptions;
602            channel.basic_nack(tag, BasicNackOptions { multiple: false, requeue: true }).await
603                .map_err(|e| crate::core::DMSCError::Other(format!("Failed to nack message: {e}")))?;
604            
605            let mut tags = self.delivery_tags.lock().await;
606            tags.remove(&tag);
607            
608            log::debug!("Message {} negatively acknowledged (will be requeued)", message_id);
609        } else {
610            log::warn!("Message ID not found for negative acknowledgment: {}", message_id);
611        }
612        
613        Ok(())
614    }
615
616    /// Pauses the consumer.
617    ///
618    /// # Returns
619    ///
620    /// DMSCResult indicating success or failure
621    async fn pause(&self) -> DMSCResult<()> {
622        let mut paused = self.paused.lock().await;
623        *paused = true;
624        Ok(())
625    }
626
627    /// Resumes the consumer.
628    ///
629    /// # Returns
630    ///
631    /// DMSCResult indicating success or failure
632    async fn resume(&self) -> DMSCResult<()> {
633        let mut paused = self.paused.lock().await;
634        *paused = false;
635        Ok(())
636    }
637}