dmsc/queue/backends/redis_backend.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Redis Queue Backend
21//!
22//! This module provides a Redis implementation for the DMSC queue system. It allows
23//! sending and receiving messages using Redis lists as the underlying message broker.
24//!
25//! ## Key Components
26//!
27//! - **DMSCRedisQueue**: Main Redis queue implementation
28//! - **RedisQueueProducer**: Redis producer implementation
29//! - **RedisQueueConsumer**: Redis consumer implementation
30//!
31//! ## Design Principles
32//!
33//! 1. **Async Trait Implementation**: Implements the DMSCQueue, DMSCQueueProducer, and DMSCQueueConsumer traits
34//! 2. **Redis Integration**: Uses the redis crate for Redis connectivity
35//! 3. **Thread Safety**: Uses Arc for safe sharing of connections and consumers
36//! 4. **Future-based API**: Leverages async/await for non-blocking operations
37//! 5. **Blocking Operations**: Uses BLPOP for efficient blocking message consumption
38//! 6. **Error Handling**: Comprehensive error handling with DMSCResult
39//! 7. **List-based Queue**: Uses Redis lists for simple FIFO queue functionality
40//! 8. **Batch Support**: Provides batch sending functionality
41//! 9. **Implicit Acknowledgment**: Acknowledgment is implicit when messages are popped from the list
42//! 10. **Stats Support**: Provides queue length statistics using Redis LLEN command
43//!
44//! ## Usage
45//!
46//! ```rust
47//! use dmsc::prelude::*;
48//!
49//! async fn example() -> DMSCResult<()> {
50//! // Create a new Redis queue
51//! let queue = DMSCRedisQueue::new("test-queue", "redis://localhost:6379").await?;
52//!
53//! // Create a producer
54//! let producer = queue.create_producer().await?;
55//!
56//! // Create a message
57//! let message = DMSCQueueMessage {
58//! id: "12345".to_string(),
59//! payload: b"Hello, Redis!".to_vec(),
60//! headers: vec![("key1".to_string(), "value1".to_string())],
61//! timestamp: chrono::Utc::now().timestamp_millis() as u64,
62//! priority: 0,
63//! };
64//!
65//! // Send the message
66//! producer.send(message).await?;
67//!
68//! // Create a consumer
69//! let consumer = queue.create_consumer("test-consumer-group").await?;
70//!
71//! // Receive messages
72//! if let Some(received_message) = consumer.receive().await? {
73//! println!("Received message: {:?}", received_message);
74//! consumer.ack(&received_message.id).await?;
75//! }
76//!
77//! Ok(())
78//! }
79//! ```
80
81use async_trait::async_trait;
82use redis::{AsyncCommands, Client};
83use std::sync::Arc;
84use std::time::Duration;
85use tokio::sync::Mutex;
86use crate::core::DMSCResult;
87use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
88
89/// Redis queue implementation for the DMSC queue system.
90///
91/// This struct provides a Redis implementation of the DMSCQueue trait, allowing
92/// sending and receiving messages using Redis lists as the underlying message broker.
93pub struct DMSCRedisQueue {
94 /// Queue name (Redis key)
95 name: String,
96 /// Redis client for connecting to Redis
97 client: Arc<Client>,
98}
99
100impl DMSCRedisQueue {
101 /// Creates a new Redis queue instance.
102 ///
103 /// # Parameters
104 ///
105 /// - `name`: The name of the queue (Redis key)
106 /// - `connection_string`: The Redis connection string
107 ///
108 /// # Returns
109 ///
110 /// A new DMSCRedisQueue instance wrapped in DMSCResult
111 pub async fn new(name: &str, connection_string: &str) -> DMSCResult<Self> {
112 let client = Client::open(connection_string)?;
113 Self::new_with_client(name, client)
114 }
115
116 /// Creates a new Redis queue instance with an existing client.
117 ///
118 /// # Parameters
119 ///
120 /// - `name`: The name of the queue (Redis key)
121 /// - `client`: The existing Redis client
122 ///
123 /// # Returns
124 ///
125 /// A new DMSCRedisQueue instance wrapped in DMSCResult
126 pub fn new_with_client(name: &str, client: Client) -> DMSCResult<Self> {
127 Ok(Self {
128 name: name.to_string(),
129 client: Arc::new(client),
130 })
131 }
132
133 /// Creates a new Redis queue instance with an existing connection.
134 ///
135 /// # Parameters
136 ///
137 /// - `name`: The name of the queue (Redis key)
138 /// - `connection_string`: The Redis connection string
139 ///
140 /// # Returns
141 ///
142 /// A new DMSCRedisQueue instance wrapped in DMSCResult
143 pub async fn new_with_connection(name: &str, connection_string: &str) -> DMSCResult<Self> {
144 let client = Client::open(connection_string)?;
145 Ok(Self {
146 name: name.to_string(),
147 client: Arc::new(client),
148 })
149 }
150}
151
152#[async_trait]
153impl DMSCQueue for DMSCRedisQueue {
154 /// Creates a new producer for the Redis queue.
155 ///
156 /// # Returns
157 ///
158 /// A new DMSCQueueProducer instance wrapped in DMSCResult
159 async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
160 let conn = self.client.get_async_connection().await?;
161
162 Ok(Box::new(RedisQueueProducer {
163 connection: Arc::new(Mutex::new(conn)),
164 queue_name: self.name.clone(),
165 }))
166 }
167
168 /// Creates a new consumer for the Redis queue.
169 ///
170 /// # Parameters
171 ///
172 /// - `_consumer_group`: The consumer group name (ignored in this implementation)
173 ///
174 /// # Returns
175 ///
176 /// A new DMSCQueueConsumer instance wrapped in DMSCResult
177 async fn create_consumer(&self, _consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
178 let conn = self.client.get_async_connection().await?;
179
180 Ok(Box::new(RedisQueueConsumer {
181 connection: Arc::new(Mutex::new(conn)),
182 queue_name: self.name.clone(),
183 paused: Arc::new(Mutex::new(false)),
184 }))
185 }
186
187 /// Gets statistics for the Redis queue.
188 ///
189 /// # Returns
190 ///
191 /// DMSCQueueStats containing queue statistics wrapped in DMSCResult
192 async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
193 let mut conn = self.client.get_async_connection().await?;
194 let len: i64 = conn.llen(&self.name).await?;
195
196 Ok(DMSCQueueStats {
197 queue_name: self.name.clone(),
198 message_count: len as u64,
199 consumer_count: 0,
200 producer_count: 0,
201 processed_messages: 0,
202 failed_messages: 0,
203 avg_processing_time_ms: 0.0,
204 total_bytes_sent: 0,
205 total_bytes_received: 0,
206 last_message_time: 0,
207 })
208 }
209
210 /// Purges all messages from the Redis queue.
211 ///
212 /// # Returns
213 ///
214 /// DMSCResult indicating success or failure
215 async fn purge(&self) -> DMSCResult<()> {
216 let mut conn = self.client.get_async_connection().await?;
217 conn.del::<_, ()>(&self.name).await?;
218 Ok(())
219 }
220
221 /// Deletes the Redis queue.
222 ///
223 /// Note: This implementation simply calls purge since deleting a Redis key
224 /// is the same as purging all messages from the queue.
225 ///
226 /// # Returns
227 ///
228 /// DMSCResult indicating success or failure
229 async fn delete(&self) -> DMSCResult<()> {
230 self.purge().await
231 }
232}
233
234/// Redis queue producer implementation.
235///
236/// This struct provides a Redis implementation of the DMSCQueueProducer trait,
237/// allowing sending messages to Redis queues.
238struct RedisQueueProducer {
239 /// Redis async connection
240 connection: Arc<Mutex<redis::aio::Connection>>,
241 /// Queue name (Redis key) to send messages to
242 queue_name: String,
243}
244
245#[async_trait]
246impl DMSCQueueProducer for RedisQueueProducer {
247 /// Sends a single message to the Redis queue.
248 ///
249 /// # Parameters
250 ///
251 /// - `message`: The message to send
252 ///
253 /// # Returns
254 ///
255 /// DMSCResult indicating success or failure
256 async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
257 let mut conn = self.connection.lock().await;
258 let payload = serde_json::to_vec(&message)?;
259
260 conn.rpush::<_, _, ()>(&self.queue_name, payload).await?;
261 Ok(())
262 }
263
264 /// Sends multiple messages to the Redis queue.
265 ///
266 /// # Parameters
267 ///
268 /// - `messages`: A vector of messages to send
269 ///
270 /// # Returns
271 ///
272 /// DMSCResult indicating success or failure
273 async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
274 let mut conn = self.connection.lock().await;
275
276 for message in messages {
277 let payload = serde_json::to_vec(&message)?;
278 conn.rpush::<_, _, ()>(&self.queue_name, payload).await?;
279 }
280 Ok(())
281 }
282}
283
284/// Redis queue consumer implementation.
285///
286/// This struct provides a Redis implementation of the DMSCQueueConsumer trait,
287/// allowing receiving messages from Redis queues.
288struct RedisQueueConsumer {
289 /// Redis async connection
290 connection: Arc<Mutex<redis::aio::Connection>>,
291 /// Queue name (Redis key) to receive messages from
292 queue_name: String,
293 /// Flag indicating if the consumer is paused
294 paused: Arc<Mutex<bool>>,
295}
296
297#[async_trait]
298impl DMSCQueueConsumer for RedisQueueConsumer {
299 /// Receives a message from the Redis queue.
300 ///
301 /// # Returns
302 ///
303 /// An Option containing the received message, or None if the consumer is paused
304 /// or the BLPOP operation timed out
305 async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
306 let paused = *self.paused.lock().await;
307 if paused {
308 return Ok(None);
309 }
310
311 let mut conn = self.connection.lock().await;
312
313 // Use BLPOP for blocking pop with timeout
314 let result: Option<(String, Vec<u8>)> = conn.blpop(&self.queue_name, 5.0).await?;
315
316 if let Some((_, payload)) = result {
317 let message: DMSCQueueMessage = serde_json::from_slice(&payload)?;
318 Ok(Some(message))
319 } else {
320 Ok(None)
321 }
322 }
323
324 /// Acknowledges a message.
325 ///
326 /// Note: In Redis list-based queues, acknowledgment is implicit when messages
327 /// are popped from the list. This method is a no-op in this implementation.
328 ///
329 /// # Parameters
330 ///
331 /// - `_message_id`: The message ID to acknowledge (ignored in this implementation)
332 ///
333 /// # Returns
334 ///
335 /// DMSCResult indicating success or failure
336 async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
337 // In Redis list-based queue, acknowledgment is implicit when message is popped
338 Ok(())
339 }
340
341 /// Negatively acknowledges a message.
342 ///
343 /// This implementation handles message retry by pushing the message back to the queue
344 /// with appropriate retry logic and delay mechanisms. It tracks retry counts and
345 /// implements exponential backoff for retry delays.
346 ///
347 /// # Parameters
348 ///
349 /// - `message_id`: The message ID to negatively acknowledge
350 ///
351 /// # Returns
352 ///
353 /// DMSCResult indicating success or failure
354 async fn nack(&self, message_id: &str) -> DMSCResult<()> {
355 log::info!("Message negatively acknowledged: {message_id}");
356
357 let mut conn = self.connection.lock().await;
358
359 let (original_data, retry_count): (Option<Vec<u8>>, u32) = conn.hgetall::<_, (Option<Vec<u8>>, u32)>(&format!("{}_meta", self.queue_name)).await
360 .map(|(data, count)| (data, count))
361 .unwrap_or((None, 0));
362
363 let max_retries = 3u32;
364 let base_delay_ms = 1000u64;
365
366 if retry_count >= max_retries {
367 log::warn!("Message {message_id} exceeded max retries ({max_retries}), moving to dead letter queue");
368 conn.rpush::<_, _, ()>(&format!("{}_dlq", self.queue_name), message_id.as_bytes()).await?;
369 conn.hdel::<_, &str, ()>(&format!("{}_meta", self.queue_name), "retry_count").await?;
370 return Ok(());
371 }
372
373 let new_retry_count = retry_count + 1;
374 let retry_delay_ms = base_delay_ms * (2u64.pow(new_retry_count - 1));
375
376 log::info!("Message {message_id} scheduled for retry {new_retry_count}/{max_retries} after {retry_delay_ms}ms delay");
377
378 conn.hset::<_, &str, u32, ()>(&format!("{}_meta", self.queue_name), "retry_count", new_retry_count).await?;
379
380 tokio::time::sleep(Duration::from_millis(retry_delay_ms)).await;
381
382 if let Some(data) = original_data {
383 conn.rpush::<_, _, ()>(&self.queue_name, &data).await?;
384 }
385
386 Ok(())
387 }
388
389 /// Pauses the consumer.
390 ///
391 /// # Returns
392 ///
393 /// DMSCResult indicating success or failure
394 async fn pause(&self) -> DMSCResult<()> {
395 let mut paused = self.paused.lock().await;
396 *paused = true;
397 Ok(())
398 }
399
400 /// Resumes the consumer.
401 ///
402 /// # Returns
403 ///
404 /// DMSCResult indicating success or failure
405 async fn resume(&self) -> DMSCResult<()> {
406 let mut paused = self.paused.lock().await;
407 *paused = false;
408 Ok(())
409 }
410}