Skip to main content

Module redis_backend

Module redis_backend 

Source
Expand description

Copyright © 2025-2026 Wenze Wei. All Rights Reserved.

This file is part of Ri. The Ri project belongs to the Dunimd Team.

Licensed under the Apache License, Version 2.0 (the “License”); You may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

§Redis Queue Backend

This module provides a Redis implementation for the Ri queue system. It allows sending and receiving messages using Redis lists as the underlying message broker.

§Key Components

  • RiRedisQueue: Main Redis queue implementation
  • RedisQueueProducer: Redis producer implementation
  • RedisQueueConsumer: Redis consumer implementation

§Design Principles

  1. Async Trait Implementation: Implements the RiQueue, RiQueueProducer, and RiQueueConsumer traits
  2. Redis Integration: Uses the redis crate for Redis connectivity
  3. Thread Safety: Uses Arc for safe sharing of connections and consumers
  4. Future-based API: Leverages async/await for non-blocking operations
  5. Blocking Operations: Uses BLPOP for efficient blocking message consumption
  6. Error Handling: Comprehensive error handling with RiResult
  7. List-based Queue: Uses Redis lists for simple FIFO queue functionality
  8. Batch Support: Provides batch sending functionality
  9. Implicit Acknowledgment: Acknowledgment is implicit when messages are popped from the list
  10. Stats Support: Provides queue length statistics using Redis LLEN command

§Usage

use ri::prelude::*;
 
async fn example() -> RiResult<()> {
    // Create a new Redis queue
    let queue = RiRedisQueue::new("test-queue", "redis://localhost:6379").await?;
     
    // Create a producer
    let producer = queue.create_producer().await?;
     
    // Create a message
    let message = RiQueueMessage {
        id: "12345".to_string(),
        payload: b"Hello, Redis!".to_vec(),
        headers: vec![("key1".to_string(), "value1".to_string())],
        timestamp: chrono::Utc::now().timestamp_millis() as u64,
        priority: 0,
    };
     
    // Send the message
    producer.send(message).await?;
     
    // Create a consumer
    let consumer = queue.create_consumer("test-consumer-group").await?;
     
    // Receive messages
    if let Some(received_message) = consumer.receive().await? {
        println!("Received message: {:?}", received_message);
        consumer.ack(&received_message.id).await?;
    }
     
    Ok(())
}

Structs§

RiRedisQueue
Redis queue implementation for the Ri queue system.