Module rabbitmq_backend

Module rabbitmq_backend 

Source
Expand description

Copyright © 2025-2026 Wenze Wei. All Rights Reserved.

This file is part of DMSC. The DMSC project belongs to the Dunimd Team.

Licensed under the Apache License, Version 2.0 (the “License”); You may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

§RabbitMQ Queue Backend

This module provides a RabbitMQ implementation for the DMSC queue system. It allows sending and receiving messages using RabbitMQ as the underlying message broker.

§Key Components

  • DMSCRabbitMQQueue: Main RabbitMQ queue implementation
  • RabbitMQProducer: RabbitMQ producer implementation
  • RabbitMQConsumer: RabbitMQ consumer implementation

§Design Principles

  1. Async Trait Implementation: Implements the DMSCQueue, DMSCQueueProducer, and DMSCQueueConsumer traits
  2. RabbitMQ Integration: Uses the lapin crate for RabbitMQ connectivity
  3. Thread Safety: Uses Arc for safe sharing of connections, channels, and consumers
  4. Future-based API: Leverages async/await for non-blocking operations
  5. Durable Queues: Configured with durable queues for message persistence
  6. Error Handling: Comprehensive error handling with DMSCResult
  7. Stream-based Consumer: Uses StreamExt for efficient message processing
  8. Batch Support: Provides batch sending functionality

§Usage

use dmsc::prelude::*;
 
async fn example() -> DMSCResult<()> {
    // Create a new RabbitMQ queue
    let queue = DMSCRabbitMQQueue::new("test-queue", "amqp://guest:guest@localhost:5672/%2f").await?;
     
    // Create a producer
    let producer = queue.create_producer().await?;
     
    // Create a message
    let message = DMSCQueueMessage {
        id: "12345".to_string(),
        payload: b"Hello, RabbitMQ!".to_vec(),
        headers: vec![("key1".to_string(), "value1".to_string())],
        timestamp: chrono::Utc::now().timestamp_millis() as u64,
        priority: 0,
    };
     
    // Send the message
    producer.send(message).await?;
     
    // Create a consumer
    let consumer = queue.create_consumer("test-consumer-group").await?;
     
    // Receive messages
    if let Some(received_message) = consumer.receive().await? {
        println!("Received message: {:?}", received_message);
        consumer.ack(&received_message.id).await?;
    }
     
    Ok(())
}

Structs§

DMSCRabbitMQQueue
RabbitMQ queue implementation for the DMSC queue system.