dmsc/queue/backends/rabbitmq_backend.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19#![cfg(feature = "rabbitmq")]
20
21//! # RabbitMQ Queue Backend
22//!
23//! This module provides a RabbitMQ implementation for the DMSC queue system. It allows
24//! sending and receiving messages using RabbitMQ as the underlying message broker.
25//!
26//! ## Key Components
27//!
28//! - **DMSCRabbitMQQueue**: Main RabbitMQ queue implementation
29//! - **RabbitMQProducer**: RabbitMQ producer implementation
30//! - **RabbitMQConsumer**: RabbitMQ consumer implementation
31//!
32//! ## Design Principles
33//!
34//! 1. **Async Trait Implementation**: Implements the DMSCQueue, DMSCQueueProducer, and DMSCQueueConsumer traits
35//! 2. **RabbitMQ Integration**: Uses the lapin crate for RabbitMQ connectivity
36//! 3. **Thread Safety**: Uses Arc for safe sharing of connections, channels, and consumers
37//! 4. **Future-based API**: Leverages async/await for non-blocking operations
38//! 5. **Durable Queues**: Configured with durable queues for message persistence
39//! 6. **Error Handling**: Comprehensive error handling with DMSCResult
40//! 7. **Stream-based Consumer**: Uses StreamExt for efficient message processing
41//! 8. **Batch Support**: Provides batch sending functionality
42//!
43//! ## Usage
44//!
45//! ```rust
46//! use dmsc::prelude::*;
47//!
48//! async fn example() -> DMSCResult<()> {
49//! // Create a new RabbitMQ queue
50//! let queue = DMSCRabbitMQQueue::new("test-queue", "amqp://guest:guest@localhost:5672/%2f").await?;
51//!
52//! // Create a producer
53//! let producer = queue.create_producer().await?;
54//!
55//! // Create a message
56//! let message = DMSCQueueMessage {
57//! id: "12345".to_string(),
58//! payload: b"Hello, RabbitMQ!".to_vec(),
59//! headers: vec![("key1".to_string(), "value1".to_string())],
60//! timestamp: chrono::Utc::now().timestamp_millis() as u64,
61//! priority: 0,
62//! };
63//!
64//! // Send the message
65//! producer.send(message).await?;
66//!
67//! // Create a consumer
68//! let consumer = queue.create_consumer("test-consumer-group").await?;
69//!
70//! // Receive messages
71//! if let Some(received_message) = consumer.receive().await? {
72//! println!("Received message: {:?}", received_message);
73//! consumer.ack(&received_message.id).await?;
74//! }
75//!
76//! Ok(())
77//! }
78//! ```
79
80use async_trait::async_trait;
81use lapin::{Connection, ConnectionProperties, Channel, Queue, Consumer};
82use lapin::options::{QueueDeclareOptions, BasicConsumeOptions, BasicPublishOptions, BasicAckOptions};
83use lapin::types::FieldTable;
84use std::collections::HashMap;
85use std::sync::atomic::AtomicU64;
86use std::sync::Arc;
87use tokio::sync::Mutex;
88use futures::StreamExt;
89use serde::{Deserialize, Serialize};
90#[cfg(feature = "http_client")]
91use reqwest;
92#[cfg(feature = "http_client")]
93use urlencoding;
94use crate::core::DMSCResult;
95use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
96
97/// RabbitMQ queue implementation for the DMSC queue system.
98///
99/// This struct provides a RabbitMQ implementation of the DMSCQueue trait, allowing
100/// sending and receiving messages using RabbitMQ as the underlying message broker.
101pub struct DMSCRabbitMQQueue {
102 /// Queue name
103 name: String,
104 /// RabbitMQ connection
105 #[allow(dead_code)]
106 connection: Arc<Connection>,
107 /// RabbitMQ channel
108 channel: Arc<Channel>,
109 /// RabbitMQ queue
110 #[allow(dead_code)]
111 queue: Arc<Queue>,
112 /// RabbitMQ management API URL
113 #[allow(dead_code)]
114 management_url: Option<String>,
115 /// Management API username
116 #[allow(dead_code)]
117 management_username: Option<String>,
118 /// RabbitMQ management API password
119 #[allow(dead_code)]
120 management_password: Option<String>,
121}
122
123#[derive(Debug, Serialize, Deserialize)]
124#[allow(dead_code)]
125struct RabbitMQQueueInfo {
126 name: String,
127 messages: u64,
128 consumers: u64,
129 message_stats: Option<RabbitMQMessageStats>,
130}
131
132#[derive(Debug, Serialize, Deserialize)]
133#[allow(dead_code)]
134struct RabbitMQMessageStats {
135 publish: Option<u64>,
136 deliver_no_ack: Option<u64>,
137 get_no_ack: Option<u64>,
138 redeliver: Option<u64>,
139 deliver: Option<u64>,
140 get: Option<u64>,
141}
142
143impl DMSCRabbitMQQueue {
144 /// Creates a new RabbitMQ queue instance.
145 ///
146 /// # Parameters
147 ///
148 /// - `name`: The name of the queue
149 /// - `connection_string`: The RabbitMQ connection string
150 ///
151 /// # Returns
152 ///
153 /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
154 pub async fn new(name: &str, connection_string: &str) -> DMSCResult<Self> {
155 let connection = Connection::connect(connection_string, ConnectionProperties::default()).await?;
156 Self::new_with_connection(name, connection).await
157 }
158
159 /// Creates a new RabbitMQ queue instance with an existing connection.
160 ///
161 /// # Parameters
162 ///
163 /// - `name`: The name of the queue
164 /// - `connection`: The existing RabbitMQ connection
165 ///
166 /// # Returns
167 ///
168 /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
169 pub async fn new_with_connection(name: &str, connection: lapin::Connection) -> DMSCResult<Self> {
170 let channel = connection.create_channel().await?;
171
172 let queue = channel
173 .queue_declare(
174 name,
175 QueueDeclareOptions {
176 durable: true,
177 ..Default::default()
178 },
179 FieldTable::default(),
180 )
181 .await?;
182
183 Ok(Self {
184 name: name.to_string(),
185 connection: Arc::new(connection),
186 channel: Arc::new(channel),
187 queue: Arc::new(queue),
188 management_url: None,
189 management_username: None,
190 management_password: None,
191 })
192 }
193
194 /// Creates a new RabbitMQ queue instance with management API support.
195 ///
196 /// # Parameters
197 ///
198 /// - `name`: The name of the queue
199 /// - `connection`: The existing RabbitMQ connection
200 /// - `management_url`: RabbitMQ management API URL (e.g., "http://localhost:15672")
201 /// - `management_username`: Management API username
202 /// - `management_password`: Management API password
203 ///
204 /// # Returns
205 ///
206 /// A new DMSCRabbitMQQueue instance wrapped in DMSCResult
207 pub async fn new_with_management(
208 name: &str,
209 connection: lapin::Connection,
210 management_url: &str,
211 management_username: &str,
212 management_password: &str,
213 ) -> DMSCResult<Self> {
214 let channel = connection.create_channel().await?;
215
216 let queue = channel
217 .queue_declare(
218 name,
219 QueueDeclareOptions {
220 durable: true,
221 ..Default::default()
222 },
223 FieldTable::default(),
224 )
225 .await?;
226
227 Ok(Self {
228 name: name.to_string(),
229 connection: Arc::new(connection),
230 channel: Arc::new(channel),
231 queue: Arc::new(queue),
232 management_url: Some(management_url.to_string()),
233 management_username: Some(management_username.to_string()),
234 management_password: Some(management_password.to_string()),
235 })
236 }
237
238 /// Fetches detailed statistics from RabbitMQ management API.
239 ///
240 /// This method attempts to connect to RabbitMQ's management API to retrieve
241 /// comprehensive queue statistics including message counts, consumer counts,
242 /// and message processing rates.
243 ///
244 /// # Returns
245 ///
246 /// Detailed DMSCQueueStats wrapped in DMSCResult
247 #[cfg(feature = "http_client")]
248 async fn fetch_rabbitmq_stats(&self) -> DMSCResult<DMSCQueueStats> {
249 let (Some(management_url), Some(username), Some(password)) = (
250 &self.management_url,
251 &self.management_username,
252 &self.management_password,
253 ) else {
254 return Err(crate::core::DMSCError::Other(
255 "Management API not configured. Use new_with_management() to enable.".to_string(),
256 ));
257 };
258
259 let client = reqwest::Client::new();
260 let url = format!(
261 "{}/api/queues/%2f/{}",
262 management_url.trim_end_matches('/'),
263 urlencoding::encode(&self.name)
264 );
265
266 let response = client
267 .get(&url)
268 .basic_auth(username, Some(password))
269 .send()
270 .await
271 .map_err(|e| crate::core::DMSCError::Other(format!("Failed to connect to RabbitMQ Management API: {}", e)))?;
272
273 if !response.status().is_success() {
274 return Err(crate::core::DMSCError::Other(format!(
275 "RabbitMQ Management API returned error: {}",
276 response.status()
277 )));
278 }
279
280 let queue_info: RabbitMQQueueInfo = response
281 .json()
282 .await
283 .map_err(|e| crate::core::DMSCError::Other(format!("Failed to parse RabbitMQ response: {}", e)))?;
284
285 let processed_messages = queue_info
286 .message_stats
287 .as_ref()
288 .and_then(|s| s.publish.or(s.deliver).or(s.get))
289 .unwrap_or(0);
290
291 let failed_messages = queue_info
292 .message_stats
293 .as_ref()
294 .and_then(|s| s.redeliver)
295 .unwrap_or(0);
296
297 Ok(DMSCQueueStats {
298 queue_name: queue_info.name,
299 message_count: queue_info.messages,
300 consumer_count: queue_info.consumers as u32,
301 producer_count: 0,
302 processed_messages,
303 failed_messages,
304 avg_processing_time_ms: 0.0,
305 total_bytes_sent: 0,
306 total_bytes_received: 0,
307 last_message_time: 0,
308 })
309 }
310
311 async fn get_basic_stats(&self) -> DMSCResult<DMSCQueueStats> {
312 let queue_info = self.channel
313 .queue_declare(
314 &self.name,
315 lapin::options::QueueDeclareOptions {
316 passive: true,
317 ..Default::default()
318 },
319 lapin::types::FieldTable::default(),
320 )
321 .await?;
322
323 Ok(DMSCQueueStats {
324 queue_name: self.name.clone(),
325 message_count: queue_info.message_count() as u64,
326 consumer_count: queue_info.consumer_count() as u32,
327 producer_count: 0,
328 processed_messages: 0,
329 failed_messages: 0,
330 avg_processing_time_ms: 0.0,
331 total_bytes_sent: 0,
332 total_bytes_received: 0,
333 last_message_time: 0,
334 })
335 }
336
337}
338
339#[async_trait]
340impl DMSCQueue for DMSCRabbitMQQueue {
341 /// Creates a new producer for the RabbitMQ queue.
342 ///
343 /// # Returns
344 ///
345 /// A new DMSCQueueProducer instance wrapped in DMSCResult
346 async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
347 Ok(Box::new(RabbitMQProducer {
348 channel: self.channel.clone(),
349 queue_name: self.name.clone(),
350 }))
351 }
352
353 /// Creates a new consumer for the RabbitMQ queue.
354 ///
355 /// # Parameters
356 ///
357 /// - `consumer_group`: The consumer group name
358 ///
359 /// # Returns
360 ///
361 /// A new DMSCQueueConsumer instance wrapped in DMSCResult
362 async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
363 let consumer = self.channel
364 .basic_consume(
365 &self.name,
366 consumer_group,
367 BasicConsumeOptions::default(),
368 FieldTable::default(),
369 )
370 .await?;
371
372 Ok(Box::new(RabbitMQConsumer::new(self.channel.clone(), consumer)))
373 }
374
375 /// Gets statistics for the RabbitMQ queue.
376 ///
377 /// This implementation integrates with RabbitMQ management API to provide detailed
378 /// queue statistics including message counts, consumer counts, and processing metrics.
379 ///
380 /// # Returns
381 ///
382 /// DMSCQueueStats containing detailed queue statistics wrapped in DMSCResult
383 async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
384 #[cfg(feature = "http_client")]
385 match self.fetch_rabbitmq_stats().await {
386 Ok(detailed_stats) => Ok(detailed_stats),
387 Err(_) => {
388 self.get_basic_stats().await
389 }
390 }
391 #[cfg(not(feature = "http_client"))]
392 self.get_basic_stats().await
393 }
394
395 /// Purges all messages from the RabbitMQ queue.
396 ///
397 /// # Returns
398 ///
399 /// DMSCResult indicating success or failure
400
401 /// Purges all messages from the RabbitMQ queue.
402 ///
403 /// # Returns
404 ///
405 /// DMSCResult indicating success or failure
406 async fn purge(&self) -> DMSCResult<()> {
407 self.channel.queue_purge(&self.name, Default::default()).await?;
408 Ok(())
409 }
410
411 /// Deletes the RabbitMQ queue.
412 ///
413 /// # Returns
414 ///
415 /// DMSCResult indicating success or failure
416 async fn delete(&self) -> DMSCResult<()> {
417 self.channel.queue_delete(&self.name, Default::default()).await?;
418 Ok(())
419 }
420}
421
422/// RabbitMQ producer implementation.
423///
424/// This struct provides a RabbitMQ implementation of the DMSCQueueProducer trait,
425/// allowing sending messages to RabbitMQ queues.
426struct RabbitMQProducer {
427 /// RabbitMQ channel
428 channel: Arc<Channel>,
429 /// Queue name to send messages to
430 queue_name: String,
431}
432
433#[async_trait]
434impl DMSCQueueProducer for RabbitMQProducer {
435 /// Sends a single message to the RabbitMQ queue.
436 ///
437 /// # Parameters
438 ///
439 /// - `message`: The message to send
440 ///
441 /// # Returns
442 ///
443 /// DMSCResult indicating success or failure
444 async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
445 let payload = serde_json::to_vec(&message)?;
446
447 self.channel
448 .basic_publish(
449 "",
450 &self.queue_name,
451 BasicPublishOptions::default(),
452 &payload,
453 Default::default(),
454 )
455 .await?;
456
457 Ok(())
458 }
459
460 /// Sends multiple messages to the RabbitMQ queue.
461 ///
462 /// # Parameters
463 ///
464 /// - `messages`: A vector of messages to send
465 ///
466 /// # Returns
467 ///
468 /// DMSCResult indicating success or failure
469 async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
470 for message in messages {
471 self.send(message).await?;
472 }
473 Ok(())
474 }
475}
476
477/// RabbitMQ consumer implementation.
478#[allow(dead_code)]
479struct RabbitMQConsumer {
480 /// RabbitMQ channel for sending acks/nacks
481 channel: Arc<Channel>,
482 /// RabbitMQ consumer
483 consumer: Arc<Mutex<Consumer>>,
484 /// Flag indicating if the consumer is paused
485 paused: Arc<Mutex<bool>>,
486 /// Message tracking: delivery_tag -> message_id
487 delivery_tags: Arc<Mutex<HashMap<u64, String>>>,
488 /// Next delivery tag counter
489 next_delivery_tag: Arc<AtomicU64>,
490}
491
492impl RabbitMQConsumer {
493 fn new(channel: Arc<Channel>, consumer: Consumer) -> Self {
494 Self {
495 channel,
496 consumer: Arc::new(Mutex::new(consumer)),
497 paused: Arc::new(Mutex::new(false)),
498 delivery_tags: Arc::new(Mutex::new(HashMap::new())),
499 next_delivery_tag: Arc::new(AtomicU64::new(1)),
500 }
501 }
502}
503
504#[async_trait]
505impl DMSCQueueConsumer for RabbitMQConsumer {
506 /// Receives a message from the RabbitMQ queue.
507 ///
508 /// # Returns
509 ///
510 /// An Option containing the received message, or None if the consumer is paused
511 async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
512 let paused = *self.paused.lock().await;
513 if paused {
514 return Ok(None);
515 }
516
517 let mut consumer = self.consumer.lock().await;
518
519 match consumer.next().await {
520 Some(delivery_result) => {
521 let delivery = delivery_result.map_err(|e| crate::core::DMSCError::Other(format!("Consumer error: {e}")))?;
522
523 let _message_id = {
524 let delivery_tag = delivery.delivery_tag;
525 let message_id = format!("msg_{}", uuid::Uuid::new_v4());
526
527 let mut tags = self.delivery_tags.lock().await;
528 tags.insert(delivery_tag, message_id.clone());
529
530 message_id
531 };
532
533 let message: DMSCQueueMessage = serde_json::from_slice(&delivery.data)?;
534
535 Ok(Some(message))
536 },
537 None => Ok(None)
538 }
539 }
540
541 /// Acknowledges a message.
542 ///
543 /// This implementation tracks delivery tags and uses basic_ack to acknowledge messages.
544 ///
545 /// # Parameters
546 ///
547 /// - `message_id`: The message ID to acknowledge
548 ///
549 /// # Returns
550 ///
551 /// DMSCResult indicating success or failure
552 async fn ack(&self, message_id: &str) -> DMSCResult<()> {
553 log::debug!("Acknowledging message: {}", message_id);
554
555 let delivery_tag = {
556 let tags = self.delivery_tags.lock().await;
557 tags.iter()
558 .find(|(_, id)| *id == message_id)
559 .map(|(tag, _)| *tag)
560 };
561
562 if let Some(tag) = delivery_tag {
563 let channel = self.channel.clone();
564 channel.basic_ack(tag, BasicAckOptions { multiple: false }).await
565 .map_err(|e| crate::core::DMSCError::Other(format!("Failed to ack message: {e}")))?;
566
567 let mut tags = self.delivery_tags.lock().await;
568 tags.remove(&tag);
569
570 log::debug!("Message {} acknowledged successfully", message_id);
571 } else {
572 log::warn!("Message ID not found for acknowledgment: {}", message_id);
573 }
574
575 Ok(())
576 }
577
578 /// Negatively acknowledges a message.
579 ///
580 /// This implementation tracks delivery tags and uses basic_nack to negatively acknowledge messages.
581 ///
582 /// # Parameters
583 ///
584 /// - `message_id`: The message ID to negatively acknowledge
585 ///
586 /// # Returns
587 ///
588 /// DMSCResult indicating success or failure
589 async fn nack(&self, message_id: &str) -> DMSCResult<()> {
590 log::debug!("Negatively acknowledging message: {}", message_id);
591
592 let delivery_tag = {
593 let tags = self.delivery_tags.lock().await;
594 tags.iter()
595 .find(|(_, id)| *id == message_id)
596 .map(|(tag, _)| *tag)
597 };
598
599 if let Some(tag) = delivery_tag {
600 let channel = self.channel.clone();
601 use lapin::options::BasicNackOptions;
602 channel.basic_nack(tag, BasicNackOptions { multiple: false, requeue: true }).await
603 .map_err(|e| crate::core::DMSCError::Other(format!("Failed to nack message: {e}")))?;
604
605 let mut tags = self.delivery_tags.lock().await;
606 tags.remove(&tag);
607
608 log::debug!("Message {} negatively acknowledged (will be requeued)", message_id);
609 } else {
610 log::warn!("Message ID not found for negative acknowledgment: {}", message_id);
611 }
612
613 Ok(())
614 }
615
616 /// Pauses the consumer.
617 ///
618 /// # Returns
619 ///
620 /// DMSCResult indicating success or failure
621 async fn pause(&self) -> DMSCResult<()> {
622 let mut paused = self.paused.lock().await;
623 *paused = true;
624 Ok(())
625 }
626
627 /// Resumes the consumer.
628 ///
629 /// # Returns
630 ///
631 /// DMSCResult indicating success or failure
632 async fn resume(&self) -> DMSCResult<()> {
633 let mut paused = self.paused.lock().await;
634 *paused = false;
635 Ok(())
636 }
637}