dmsc/queue/backends/
memory_backend.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # In-Memory Queue Implementation
21//!
22//! This file implements an in-memory queue backend for the DMSC queue system. The in-memory queue
23//! provides a lightweight, fast queue implementation suitable for testing, development, and
24//! scenarios where durability is not a strict requirement. It also supports optional persistence
25//! to disk for basic durability.
26//!
27//! ## Key Components
28//!
29//! - **DMSCMemoryQueue**: Main in-memory queue implementation
30//! - **MemoryQueueState**: Internal state management for the queue
31//! - **MemoryQueueProducer**: Producer implementation for sending messages
32//! - **MemoryQueueConsumer**: Consumer implementation for receiving messages
33//!
34//! ## Design Principles
35//!
36//! 1. **Lightweight**: Minimal dependencies and overhead
37//! 2. **Fast Performance**: In-memory operations for low latency
38//! 3. **Optional Persistence**: Can be configured to persist messages to disk
39//! 4. **Consumer Groups**: Supports multiple consumer groups with message distribution
40//! 5. **Async-First**: All operations are asynchronous
41//! 6. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
42//! 7. **Durable Option**: Optional disk persistence for message durability
43//! 8. **Simple API**: Implements the standard DMSCQueue interfaces
44//! 9. **Non-blocking**: Uses tokio's spawn_blocking for file I/O operations
45//! 10. **Message Retry**: Supports message requeueing with retry count increment
46//!
47//! ## Usage
48//!
49//! ```rust
50//! use dmsc::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer};
51//! use dmsc::queue::backends::DMSCMemoryQueue;
52//! use dmsc::core::DMSCResult;
53//! use serde_json::json;
54//!
55//! async fn example() -> DMSCResult<()> {
56//!     // Create a basic in-memory queue
57//!     let queue = DMSCMemoryQueue::new("example_queue");
58//!     
59//!     // Or create a queue with disk persistence
60//!     // let queue = DMSCMemoryQueue::with_persistence("example_queue", "/tmp/queue_persistence");
61//!     
62//!     // Create a producer
63//!     let producer = queue.create_producer().await?;
64//!     
65//!     // Create a message
66//!     let payload = json!({ "key": "value" }).to_string().into_bytes();
67//!     let message = DMSCQueueMessage::new(payload);
68//!     
69//!     // Send the message
70//!     producer.send(message).await?;
71//!     
72//!     // Create a consumer
73//!     let consumer = queue.create_consumer("consumer_group_1").await?;
74//!     
75//!     // Receive a message
76//!     if let Some(message) = consumer.receive().await? {
77//!         // Process the message
78//!         let payload = String::from_utf8_lossy(&message.payload);
79//!         println!("Received message: {}", payload);
80//!         
81//!         // Acknowledge the message
82//!         consumer.ack(&message.id).await?;
83//!     }
84//!     
85//!     Ok(())
86//! }
87//! ```
88
89use crate::core::DMSCResult;
90use crate::queue::{DMSCQueue, DMSCQueueConsumer, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueStats};
91use async_trait::async_trait;
92use std::collections::{HashMap, VecDeque};
93use std::fs::{File, OpenOptions};
94use std::io::{Read, Write};
95use std::path::Path;
96use std::sync::Arc;
97use tokio::sync::{Mutex, RwLock};
98use tokio::task::spawn_blocking;
99
100/// Internal state management for the in-memory queue.
101///
102/// This struct holds the queue's messages and consumer-specific queues. It is protected by a
103/// RwLock to ensure thread-safe access.
104struct MemoryQueueState {
105    /// Main queue of messages waiting to be consumed
106    messages: VecDeque<DMSCQueueMessage>,
107    /// Map of consumer group names to their respective message queues
108    consumers: HashMap<String, VecDeque<DMSCQueueMessage>>,
109}
110
111impl MemoryQueueState {
112    /// Creates a new MemoryQueueState with empty queues.
113    ///
114    /// # Returns
115    ///
116    /// A new MemoryQueueState instance
117    fn new() -> Self {
118        Self {
119            messages: VecDeque::new(),
120            consumers: HashMap::new(),
121        }
122    }
123}
124
125/// In-memory queue implementation.
126///
127/// This struct implements the DMSCQueue trait for an in-memory queue backend. It supports optional
128/// disk persistence for message durability.
129pub struct DMSCMemoryQueue {
130    /// Name of the queue
131    name: String,
132    /// Internal queue state protected by a RwLock
133    state: Arc<RwLock<MemoryQueueState>>,
134    /// Optional path for disk persistence
135    persistence_path: Option<String>,
136}
137
138#[allow(dead_code)]
139impl DMSCMemoryQueue {
140    /// Creates a new in-memory queue without persistence.
141    ///
142    /// # Parameters
143    ///
144    /// - `name`: The name of the queue
145    ///
146    /// # Returns
147    ///
148    /// A new DMSCMemoryQueue instance
149    pub fn new(name: &str) -> Self {
150        Self {
151            name: name.to_string(),
152            state: Arc::new(RwLock::new(MemoryQueueState::new())),
153            persistence_path: None,
154        }
155    }
156
157    /// Creates a new in-memory queue with disk persistence.
158    ///
159    /// # Parameters
160    ///
161    /// - `name`: The name of the queue
162    /// - `persistence_path`: Path to the file where messages will be persisted
163    ///
164    /// # Returns
165    ///
166    /// A new DMSCMemoryQueue instance with persistence enabled
167    pub fn with_persistence(name: &str, persistence_path: &str) -> Self {
168        let queue = Self {
169            name: name.to_string(),
170            state: Arc::new(RwLock::new(MemoryQueueState::new())),
171            persistence_path: Some(persistence_path.to_string()),
172        };
173
174        // Load messages from disk if persistence is enabled
175        if let Err(e) = queue.load_messages() {
176            log::warn!("Failed to load persisted messages for queue '{name}': {e}");
177        }
178
179        queue
180    }
181
182    /// Loads messages from disk if persistence is enabled.
183    ///
184    /// # Returns
185    ///
186    /// A `DMSCResult<()>` indicating success or failure
187    fn load_messages(&self) -> DMSCResult<()> {
188        if let Some(path) = &self.persistence_path {
189            if Path::new(path).exists() {
190                let mut file = File::open(path)?;
191                let mut content = String::new();
192                file.read_to_string(&mut content)?;
193
194                if !content.is_empty() {
195                    let messages: VecDeque<DMSCQueueMessage> = serde_json::from_str(&content)?;
196                    let mut state = self.state.blocking_write();
197                    state.messages = messages;
198                }
199            }
200        }
201        Ok(())
202    }
203
204    /// Saves messages to disk if persistence is enabled.
205    ///
206    /// # Returns
207    ///
208    /// A `DMSCResult<()>` indicating success or failure
209    fn save_messages(&self) -> DMSCResult<()> {
210        if let Some(path) = &self.persistence_path {
211            let state = self.state.blocking_read();
212            let content = serde_json::to_string(&state.messages)?;
213
214            let mut file = OpenOptions::new()
215                .write(true)
216                .create(true)
217                .truncate(true)
218                .open(path)?;
219
220            file.write_all(content.as_bytes())?;
221        }
222        Ok(())
223    }
224}
225
226#[async_trait]
227impl DMSCQueue for DMSCMemoryQueue {
228    /// Creates a new producer for this queue.
229    ///
230    /// # Returns
231    ///
232    /// A `DMSCResult<Box<dyn DMSCQueueProducer>>` containing the producer
233    async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
234        Ok(Box::new(MemoryQueueProducer {
235            state: self.state.clone(),
236            persistence_path: self.persistence_path.clone(),
237        }))
238    }
239
240    /// Creates a new consumer for this queue with the given consumer group.
241    ///
242    /// # Parameters
243    ///
244    /// - `consumer_group`: The name of the consumer group
245    ///
246    /// # Returns
247    ///
248    /// A `DMSCResult<Box<dyn DMSCQueueConsumer>>` containing the consumer
249    async fn create_consumer(
250        &self,
251        consumer_group: &str,
252    ) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
253        Ok(Box::new(MemoryQueueConsumer {
254            state: self.state.clone(),
255            consumer_group: consumer_group.to_string(),
256            paused: Arc::new(Mutex::new(false)),
257            persistence_path: self.persistence_path.clone(),
258        }))
259    }
260
261    /// Gets statistics for this queue.
262    ///
263    /// # Returns
264    ///
265    /// A `DMSCResult<DMSCQueueStats>` containing the queue statistics
266    async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
267        let state = self.state.read().await;
268        Ok(DMSCQueueStats {
269            queue_name: self.name.clone(),
270            message_count: state.messages.len() as u64,
271            consumer_count: state.consumers.len() as u32,
272            producer_count: 1,
273            processed_messages: 0,
274            failed_messages: 0,
275            avg_processing_time_ms: 0.0,
276            total_bytes_sent: 0,
277            total_bytes_received: 0,
278            last_message_time: 0,
279        })
280    }
281
282    /// Purges all messages from this queue.
283    ///
284    /// # Returns
285    ///
286    /// A `DMSCResult<()>` indicating success or failure
287    async fn purge(&self) -> DMSCResult<()> {
288        let mut state = self.state.write().await;
289        state.messages.clear();
290        state.consumers.clear();
291
292        // Clear persistence file if enabled
293        if let Some(path) = &self.persistence_path {
294            let path_clone = path.clone();
295            spawn_blocking(move || {
296                if Path::new(&path_clone).exists() {
297                    if let Err(e) = std::fs::remove_file(&path_clone) {
298                        log::warn!("Failed to remove persistence file '{path_clone}': {e}");
299                    }
300                }
301            })
302            .await
303            .map_err(|e| {
304                log::error!("Failed to execute persistence file removal: {e}");
305                crate::core::DMSCError::Other(format!("Failed to clear persistence: {e}"))
306            })?;
307        }
308
309        Ok(())
310    }
311
312    /// Deletes this queue.
313    ///
314    /// # Returns
315    ///
316    /// A `DMSCResult<()>` indicating success or failure
317    async fn delete(&self) -> DMSCResult<()> {
318        self.purge().await
319    }
320}
321
322/// Producer implementation for the in-memory queue.
323///
324/// This struct implements the DMSCQueueProducer trait for sending messages to the in-memory queue.
325struct MemoryQueueProducer {
326    /// Shared queue state
327    state: Arc<RwLock<MemoryQueueState>>,
328    /// Optional path for disk persistence
329    persistence_path: Option<String>,
330}
331
332#[async_trait]
333impl DMSCQueueProducer for MemoryQueueProducer {
334    /// Sends a single message to the queue.
335    ///
336    /// # Parameters
337    ///
338    /// - `message`: The message to send
339    ///
340    /// # Returns
341    ///
342    /// A `DMSCResult<()>` indicating success or failure
343    async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
344        let mut state = self.state.write().await;
345        state.messages.push_back(message);
346
347        // Save to disk if persistence is enabled
348        if let Some(path) = &self.persistence_path {
349            let messages_clone = state.messages.clone();
350            let path_clone = path.clone();
351
352            let _ = spawn_blocking(move || {
353            let content = serde_json::to_string(&messages_clone)
354                .map_err(|e| {
355                    log::error!("Failed to serialize messages for persistence: {e}");
356                    crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
357                })?;
358            let mut file = OpenOptions::new()
359                .write(true)
360                .create(true)
361                .truncate(true)
362                .open(path_clone)
363                .map_err(|e| {
364                    log::error!("Failed to open persistence file: {e}");
365                    crate::core::DMSCError::Io(format!("File open failed: {e}"))
366                })?;
367            file.write_all(content.as_bytes())
368                .map_err(|e| {
369                    log::error!("Failed to write persistence file: {e}");
370                    crate::core::DMSCError::Io(format!("File write failed: {e}"))
371                })?;
372            Ok::<(), crate::core::DMSCError>(())
373        })
374        .await
375        .map_err(|e| {
376            log::error!("Failed to execute persistence task: {e}");
377            crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
378        });
379        }
380
381        Ok(())
382    }
383
384    /// Sends multiple messages to the queue in a batch.
385    ///
386    /// # Parameters
387    ///
388    /// - `messages`: A vector of messages to send
389    ///
390    /// # Returns
391    ///
392    /// A `DMSCResult<()>` indicating success or failure
393    async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
394        let mut state = self.state.write().await;
395        for message in messages {
396            state.messages.push_back(message);
397        }
398
399        // Save to disk if persistence is enabled
400        if let Some(path) = &self.persistence_path {
401            let messages_clone = state.messages.clone();
402            let path_clone = path.clone();
403
404            let _ = spawn_blocking(move || {
405            let content = serde_json::to_string(&messages_clone)
406                .map_err(|e| {
407                    log::error!("Failed to serialize messages for persistence: {e}");
408                    crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
409                })?;
410            let mut file = OpenOptions::new()
411                .write(true)
412                .create(true)
413                .truncate(true)
414                .open(path_clone)
415                .map_err(|e| {
416                    log::error!("Failed to open persistence file: {e}");
417                    crate::core::DMSCError::Io(format!("File open failed: {e}"))
418                })?;
419            file.write_all(content.as_bytes())
420                .map_err(|e| {
421                    log::error!("Failed to write persistence file: {e}");
422                    crate::core::DMSCError::Io(format!("File write failed: {e}"))
423                })?;
424            Ok::<(), crate::core::DMSCError>(())
425        })
426        .await
427        .map_err(|e| {
428            log::error!("Failed to execute persistence task: {e}");
429            crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
430        });
431        }
432
433        Ok(())
434    }
435}
436
437/// Consumer implementation for the in-memory queue.
438///
439/// This struct implements the DMSCQueueConsumer trait for receiving messages from the in-memory queue.
440struct MemoryQueueConsumer {
441    /// Shared queue state
442    state: Arc<RwLock<MemoryQueueState>>,
443    /// Name of the consumer group
444    consumer_group: String,
445    /// Flag indicating if the consumer is paused
446    paused: Arc<Mutex<bool>>,
447    /// Optional path for disk persistence
448    persistence_path: Option<String>,
449}
450
451#[async_trait]
452impl DMSCQueueConsumer for MemoryQueueConsumer {
453    /// Receives a message from the queue.
454    ///
455    /// # Returns
456    ///
457    /// A `DMSCResult<Option<DMSCQueueMessage>>` containing the message if available, or None if no message is available
458    async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
459        let paused = *self.paused.lock().await;
460        if paused {
461            return Ok(None);
462        }
463
464        let mut state = self.state.write().await;
465
466        // If consumer queue exists and has messages, return one
467        if let Some(consumer_queue) = state.consumers.get_mut(&self.consumer_group) {
468            if let Some(message) = consumer_queue.pop_front() {
469                return Ok(Some(message));
470            }
471        }
472
473        // If main queue has messages, move one to consumer queue
474        if let Some(message) = state.messages.pop_front() {
475            let mut consumer_queue = VecDeque::new();
476            consumer_queue.push_back(message.clone());
477            state
478                .consumers
479                .insert(self.consumer_group.clone(), consumer_queue);
480
481            // Save to disk if persistence is enabled (since main queue changed)
482            if let Some(path) = &self.persistence_path {
483                let messages_clone = state.messages.clone();
484                let path_clone = path.clone();
485
486                let _ = spawn_blocking(move || {
487                    let content = serde_json::to_string(&messages_clone)
488                        .map_err(|e| {
489                            log::error!("Failed to serialize messages for persistence: {e}");
490                            crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
491                        })?;
492                    let mut file = OpenOptions::new()
493                        .write(true)
494                        .create(true)
495                        .truncate(true)
496                        .open(path_clone)
497                        .map_err(|e| {
498                            log::error!("Failed to open persistence file: {e}");
499                            crate::core::DMSCError::Io(format!("File open failed: {e}"))
500                        })?;
501                    file.write_all(content.as_bytes())
502                        .map_err(|e| {
503                            log::error!("Failed to write persistence file: {e}");
504                            crate::core::DMSCError::Io(format!("File write failed: {e}"))
505                        })?;
506                    Ok::<(), crate::core::DMSCError>(())
507                })
508                .await
509                .map_err(|e| {
510                    log::error!("Failed to execute persistence task: {e}");
511                    crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
512                });
513            }
514
515            Ok(Some(message))
516        } else {
517            Ok(None)
518        }
519    }
520
521    /// Acknowledges a message, indicating it has been successfully processed.
522    ///
523    /// For in-memory queues, acknowledgment is implicit when the message is received.
524    ///
525    /// # Parameters
526    ///
527    /// - `_message_id`: The ID of the message to acknowledge (not used for in-memory queues)
528    ///
529    /// # Returns
530    ///
531    /// A `DMSCResult<()>` indicating success or failure
532    async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
533        // In memory queue, acknowledgment is implicit when message is received
534        Ok(())
535    }
536
537    /// Negatively acknowledges a message, indicating it failed to process and should be retried.
538    ///
539    /// # Parameters
540    ///
541    /// - `message_id`: The ID of the message to negatively acknowledge
542    ///
543    /// # Returns
544    ///
545    /// A `DMSCResult<()>` indicating success or failure
546    async fn nack(&self, message_id: &str) -> DMSCResult<()> {
547        // Find the message in consumer queue and put it back in main queue
548        let mut state = self.state.write().await;
549
550        if let Some(consumer_queue) = state.consumers.get_mut(&self.consumer_group) {
551            // Find the message by ID
552            let mut message_to_requeue: Option<DMSCQueueMessage> = None;
553
554            // Iterate through consumer queue to find the message
555            let mut index = 0;
556            for (i, message) in consumer_queue.iter().enumerate() {
557                if message.id == message_id {
558                    message_to_requeue = Some(message.clone());
559                    index = i;
560                    break;
561                }
562            }
563
564            // If found, remove from consumer queue and put back in main queue
565            if let Some(mut message) = message_to_requeue {
566                consumer_queue.remove(index);
567                message.increment_retry();
568                state.messages.push_back(message);
569
570                // Save to disk if persistence is enabled
571                if let Some(path) = &self.persistence_path {
572                    let messages_clone = state.messages.clone();
573                    let path_clone = path.clone();
574
575                    let _ = spawn_blocking(move || {
576                        let content = serde_json::to_string(&messages_clone)
577                            .map_err(|e| {
578                                log::error!("Failed to serialize messages for persistence: {e}");
579                                crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
580                            })?;
581                        let mut file = OpenOptions::new()
582                            .write(true)
583                            .create(true)
584                            .truncate(true)
585                            .open(path_clone)
586                            .map_err(|e| {
587                                log::error!("Failed to open persistence file: {e}");
588                                crate::core::DMSCError::Io(format!("File open failed: {e}"))
589                            })?;
590                        file.write_all(content.as_bytes())
591                            .map_err(|e| {
592                                log::error!("Failed to write persistence file: {e}");
593                                crate::core::DMSCError::Io(format!("File write failed: {e}"))
594                            })?;
595                        Ok::<(), crate::core::DMSCError>(())
596                    })
597                    .await
598                    .map_err(|e| {
599                        log::error!("Failed to execute persistence task: {e}");
600                        crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
601                    });
602                }
603            }
604        }
605
606        Ok(())
607    }
608
609    /// Pauses message consumption.
610    ///
611    /// # Returns
612    ///
613    /// A `DMSCResult<()>` indicating success or failure
614    async fn pause(&self) -> DMSCResult<()> {
615        let mut paused = self.paused.lock().await;
616        *paused = true;
617        Ok(())
618    }
619
620    /// Resumes message consumption after pausing.
621    ///
622    /// # Returns
623    ///
624    /// A `DMSCResult<()>` indicating success or failure
625    async fn resume(&self) -> DMSCResult<()> {
626        let mut paused = self.paused.lock().await;
627        *paused = false;
628        Ok(())
629    }
630}