dmsc/queue/backends/
redis_backend.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Redis Queue Backend
21//! 
22//! This module provides a Redis implementation for the DMSC queue system. It allows
23//! sending and receiving messages using Redis lists as the underlying message broker.
24//! 
25//! ## Key Components
26//! 
27//! - **DMSCRedisQueue**: Main Redis queue implementation
28//! - **RedisQueueProducer**: Redis producer implementation
29//! - **RedisQueueConsumer**: Redis consumer implementation
30//! 
31//! ## Design Principles
32//! 
33//! 1. **Async Trait Implementation**: Implements the DMSCQueue, DMSCQueueProducer, and DMSCQueueConsumer traits
34//! 2. **Redis Integration**: Uses the redis crate for Redis connectivity
35//! 3. **Thread Safety**: Uses Arc for safe sharing of connections and consumers
36//! 4. **Future-based API**: Leverages async/await for non-blocking operations
37//! 5. **Blocking Operations**: Uses BLPOP for efficient blocking message consumption
38//! 6. **Error Handling**: Comprehensive error handling with DMSCResult
39//! 7. **List-based Queue**: Uses Redis lists for simple FIFO queue functionality
40//! 8. **Batch Support**: Provides batch sending functionality
41//! 9. **Implicit Acknowledgment**: Acknowledgment is implicit when messages are popped from the list
42//! 10. **Stats Support**: Provides queue length statistics using Redis LLEN command
43//! 
44//! ## Usage
45//! 
46//! ```rust
47//! use dmsc::prelude::*;
48//! 
49//! async fn example() -> DMSCResult<()> {
50//!     // Create a new Redis queue
51//!     let queue = DMSCRedisQueue::new("test-queue", "redis://localhost:6379").await?;
52//!     
53//!     // Create a producer
54//!     let producer = queue.create_producer().await?;
55//!     
56//!     // Create a message
57//!     let message = DMSCQueueMessage {
58//!         id: "12345".to_string(),
59//!         payload: b"Hello, Redis!".to_vec(),
60//!         headers: vec![("key1".to_string(), "value1".to_string())],
61//!         timestamp: chrono::Utc::now().timestamp_millis() as u64,
62//!         priority: 0,
63//!     };
64//!     
65//!     // Send the message
66//!     producer.send(message).await?;
67//!     
68//!     // Create a consumer
69//!     let consumer = queue.create_consumer("test-consumer-group").await?;
70//!     
71//!     // Receive messages
72//!     if let Some(received_message) = consumer.receive().await? {
73//!         println!("Received message: {:?}", received_message);
74//!         consumer.ack(&received_message.id).await?;
75//!     }
76//!     
77//!     Ok(())
78//! }
79//! ```
80
81use async_trait::async_trait;
82use redis::{AsyncCommands, Client};
83use std::sync::Arc;
84use std::time::Duration;
85use tokio::sync::Mutex;
86use crate::core::DMSCResult;
87use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
88
89/// Redis queue implementation for the DMSC queue system.
90///
91/// This struct provides a Redis implementation of the DMSCQueue trait, allowing
92/// sending and receiving messages using Redis lists as the underlying message broker.
93pub struct DMSCRedisQueue {
94    /// Queue name (Redis key)
95    name: String,
96    /// Redis client for connecting to Redis
97    client: Arc<Client>,
98}
99
100impl DMSCRedisQueue {
101    /// Creates a new Redis queue instance.
102    ///
103    /// # Parameters
104    ///
105    /// - `name`: The name of the queue (Redis key)
106    /// - `connection_string`: The Redis connection string
107    ///
108    /// # Returns
109    ///
110    /// A new DMSCRedisQueue instance wrapped in DMSCResult
111    pub async fn new(name: &str, connection_string: &str) -> DMSCResult<Self> {
112        let client = Client::open(connection_string)?;
113        Self::new_with_client(name, client)
114    }
115
116    /// Creates a new Redis queue instance with an existing client.
117    ///
118    /// # Parameters
119    ///
120    /// - `name`: The name of the queue (Redis key)
121    /// - `client`: The existing Redis client
122    ///
123    /// # Returns
124    ///
125    /// A new DMSCRedisQueue instance wrapped in DMSCResult
126    pub fn new_with_client(name: &str, client: Client) -> DMSCResult<Self> {
127        Ok(Self {
128            name: name.to_string(),
129            client: Arc::new(client),
130        })
131    }
132
133    /// Creates a new Redis queue instance with an existing connection.
134    ///
135    /// # Parameters
136    ///
137    /// - `name`: The name of the queue (Redis key)
138    /// - `connection_string`: The Redis connection string
139    ///
140    /// # Returns
141    ///
142    /// A new DMSCRedisQueue instance wrapped in DMSCResult
143    pub async fn new_with_connection(name: &str, connection_string: &str) -> DMSCResult<Self> {
144        let client = Client::open(connection_string)?;
145        Ok(Self {
146            name: name.to_string(),
147            client: Arc::new(client),
148        })
149    }
150}
151
152#[async_trait]
153impl DMSCQueue for DMSCRedisQueue {
154    /// Creates a new producer for the Redis queue.
155    ///
156    /// # Returns
157    ///
158    /// A new DMSCQueueProducer instance wrapped in DMSCResult
159    async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
160        let conn = self.client.get_async_connection().await?;
161        
162        Ok(Box::new(RedisQueueProducer {
163            connection: Arc::new(Mutex::new(conn)),
164            queue_name: self.name.clone(),
165        }))
166    }
167
168    /// Creates a new consumer for the Redis queue.
169    ///
170    /// # Parameters
171    ///
172    /// - `_consumer_group`: The consumer group name (ignored in this implementation)
173    ///
174    /// # Returns
175    ///
176    /// A new DMSCQueueConsumer instance wrapped in DMSCResult
177    async fn create_consumer(&self, _consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
178        let conn = self.client.get_async_connection().await?;
179        
180        Ok(Box::new(RedisQueueConsumer {
181            connection: Arc::new(Mutex::new(conn)),
182            queue_name: self.name.clone(),
183            paused: Arc::new(Mutex::new(false)),
184        }))
185    }
186
187    /// Gets statistics for the Redis queue.
188    ///
189    /// # Returns
190    ///
191    /// DMSCQueueStats containing queue statistics wrapped in DMSCResult
192    async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
193        let mut conn = self.client.get_async_connection().await?;
194        let len: i64 = conn.llen(&self.name).await?;
195        
196        Ok(DMSCQueueStats {
197            queue_name: self.name.clone(),
198            message_count: len as u64,
199            consumer_count: 0,
200            producer_count: 0,
201            processed_messages: 0,
202            failed_messages: 0,
203            avg_processing_time_ms: 0.0,
204            total_bytes_sent: 0,
205            total_bytes_received: 0,
206            last_message_time: 0,
207        })
208    }
209
210    /// Purges all messages from the Redis queue.
211    ///
212    /// # Returns
213    ///
214    /// DMSCResult indicating success or failure
215    async fn purge(&self) -> DMSCResult<()> {
216        let mut conn = self.client.get_async_connection().await?;
217        conn.del::<_, ()>(&self.name).await?;
218        Ok(())
219    }
220
221    /// Deletes the Redis queue.
222    ///
223    /// Note: This implementation simply calls purge since deleting a Redis key
224    /// is the same as purging all messages from the queue.
225    ///
226    /// # Returns
227    ///
228    /// DMSCResult indicating success or failure
229    async fn delete(&self) -> DMSCResult<()> {
230        self.purge().await
231    }
232}
233
234/// Redis queue producer implementation.
235///
236/// This struct provides a Redis implementation of the DMSCQueueProducer trait,
237/// allowing sending messages to Redis queues.
238struct RedisQueueProducer {
239    /// Redis async connection
240    connection: Arc<Mutex<redis::aio::Connection>>,
241    /// Queue name (Redis key) to send messages to
242    queue_name: String,
243}
244
245#[async_trait]
246impl DMSCQueueProducer for RedisQueueProducer {
247    /// Sends a single message to the Redis queue.
248    ///
249    /// # Parameters
250    ///
251    /// - `message`: The message to send
252    ///
253    /// # Returns
254    ///
255    /// DMSCResult indicating success or failure
256    async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
257        let mut conn = self.connection.lock().await;
258        let payload = serde_json::to_vec(&message)?;
259        
260        conn.rpush::<_, _, ()>(&self.queue_name, payload).await?;
261        Ok(())
262    }
263
264    /// Sends multiple messages to the Redis queue.
265    ///
266    /// # Parameters
267    ///
268    /// - `messages`: A vector of messages to send
269    ///
270    /// # Returns
271    ///
272    /// DMSCResult indicating success or failure
273    async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
274        let mut conn = self.connection.lock().await;
275        
276        for message in messages {
277            let payload = serde_json::to_vec(&message)?;
278            conn.rpush::<_, _, ()>(&self.queue_name, payload).await?;
279        }
280        Ok(())
281    }
282}
283
284/// Redis queue consumer implementation.
285///
286/// This struct provides a Redis implementation of the DMSCQueueConsumer trait,
287/// allowing receiving messages from Redis queues.
288struct RedisQueueConsumer {
289    /// Redis async connection
290    connection: Arc<Mutex<redis::aio::Connection>>,
291    /// Queue name (Redis key) to receive messages from
292    queue_name: String,
293    /// Flag indicating if the consumer is paused
294    paused: Arc<Mutex<bool>>,
295}
296
297#[async_trait]
298impl DMSCQueueConsumer for RedisQueueConsumer {
299    /// Receives a message from the Redis queue.
300    ///
301    /// # Returns
302    ///
303    /// An Option containing the received message, or None if the consumer is paused
304    /// or the BLPOP operation timed out
305    async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
306        let paused = *self.paused.lock().await;
307        if paused {
308            return Ok(None);
309        }
310
311        let mut conn = self.connection.lock().await;
312        
313        // Use BLPOP for blocking pop with timeout
314        let result: Option<(String, Vec<u8>)> = conn.blpop(&self.queue_name, 5.0).await?;
315        
316        if let Some((_, payload)) = result {
317            let message: DMSCQueueMessage = serde_json::from_slice(&payload)?;
318            Ok(Some(message))
319        } else {
320            Ok(None)
321        }
322    }
323
324    /// Acknowledges a message.
325    ///
326    /// Note: In Redis list-based queues, acknowledgment is implicit when messages
327    /// are popped from the list. This method is a no-op in this implementation.
328    ///
329    /// # Parameters
330    ///
331    /// - `_message_id`: The message ID to acknowledge (ignored in this implementation)
332    ///
333    /// # Returns
334    ///
335    /// DMSCResult indicating success or failure
336    async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
337        // In Redis list-based queue, acknowledgment is implicit when message is popped
338        Ok(())
339    }
340
341    /// Negatively acknowledges a message.
342    ///
343    /// This implementation handles message retry by pushing the message back to the queue
344    /// with appropriate retry logic and delay mechanisms. It tracks retry counts and
345    /// implements exponential backoff for retry delays.
346    ///
347    /// # Parameters
348    ///
349    /// - `message_id`: The message ID to negatively acknowledge
350    ///
351    /// # Returns
352    ///
353    /// DMSCResult indicating success or failure
354    async fn nack(&self, message_id: &str) -> DMSCResult<()> {
355        log::info!("Message negatively acknowledged: {message_id}");
356
357        let mut conn = self.connection.lock().await;
358
359        let (original_data, retry_count): (Option<Vec<u8>>, u32) = conn.hgetall::<_, (Option<Vec<u8>>, u32)>(&format!("{}_meta", self.queue_name)).await
360            .map(|(data, count)| (data, count))
361            .unwrap_or((None, 0));
362
363        let max_retries = 3u32;
364        let base_delay_ms = 1000u64;
365
366        if retry_count >= max_retries {
367            log::warn!("Message {message_id} exceeded max retries ({max_retries}), moving to dead letter queue");
368            conn.rpush::<_, _, ()>(&format!("{}_dlq", self.queue_name), message_id.as_bytes()).await?;
369            conn.hdel::<_, &str, ()>(&format!("{}_meta", self.queue_name), "retry_count").await?;
370            return Ok(());
371        }
372
373        let new_retry_count = retry_count + 1;
374        let retry_delay_ms = base_delay_ms * (2u64.pow(new_retry_count - 1));
375
376        log::info!("Message {message_id} scheduled for retry {new_retry_count}/{max_retries} after {retry_delay_ms}ms delay");
377
378        conn.hset::<_, &str, u32, ()>(&format!("{}_meta", self.queue_name), "retry_count", new_retry_count).await?;
379
380        tokio::time::sleep(Duration::from_millis(retry_delay_ms)).await;
381
382        if let Some(data) = original_data {
383            conn.rpush::<_, _, ()>(&self.queue_name, &data).await?;
384        }
385
386        Ok(())
387    }
388
389    /// Pauses the consumer.
390    ///
391    /// # Returns
392    ///
393    /// DMSCResult indicating success or failure
394    async fn pause(&self) -> DMSCResult<()> {
395        let mut paused = self.paused.lock().await;
396        *paused = true;
397        Ok(())
398    }
399
400    /// Resumes the consumer.
401    ///
402    /// # Returns
403    ///
404    /// DMSCResult indicating success or failure
405    async fn resume(&self) -> DMSCResult<()> {
406        let mut paused = self.paused.lock().await;
407        *paused = false;
408        Ok(())
409    }
410}