dmsc/queue/backends/memory_backend.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # In-Memory Queue Implementation
21//!
22//! This file implements an in-memory queue backend for the DMSC queue system. The in-memory queue
23//! provides a lightweight, fast queue implementation suitable for testing, development, and
24//! scenarios where durability is not a strict requirement. It also supports optional persistence
25//! to disk for basic durability.
26//!
27//! ## Key Components
28//!
29//! - **DMSCMemoryQueue**: Main in-memory queue implementation
30//! - **MemoryQueueState**: Internal state management for the queue
31//! - **MemoryQueueProducer**: Producer implementation for sending messages
32//! - **MemoryQueueConsumer**: Consumer implementation for receiving messages
33//!
34//! ## Design Principles
35//!
36//! 1. **Lightweight**: Minimal dependencies and overhead
37//! 2. **Fast Performance**: In-memory operations for low latency
38//! 3. **Optional Persistence**: Can be configured to persist messages to disk
39//! 4. **Consumer Groups**: Supports multiple consumer groups with message distribution
40//! 5. **Async-First**: All operations are asynchronous
41//! 6. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
42//! 7. **Durable Option**: Optional disk persistence for message durability
43//! 8. **Simple API**: Implements the standard DMSCQueue interfaces
44//! 9. **Non-blocking**: Uses tokio's spawn_blocking for file I/O operations
45//! 10. **Message Retry**: Supports message requeueing with retry count increment
46//!
47//! ## Usage
48//!
49//! ```rust
50//! use dmsc::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer};
51//! use dmsc::queue::backends::DMSCMemoryQueue;
52//! use dmsc::core::DMSCResult;
53//! use serde_json::json;
54//!
55//! async fn example() -> DMSCResult<()> {
56//! // Create a basic in-memory queue
57//! let queue = DMSCMemoryQueue::new("example_queue");
58//!
59//! // Or create a queue with disk persistence
60//! // let queue = DMSCMemoryQueue::with_persistence("example_queue", "/tmp/queue_persistence");
61//!
62//! // Create a producer
63//! let producer = queue.create_producer().await?;
64//!
65//! // Create a message
66//! let payload = json!({ "key": "value" }).to_string().into_bytes();
67//! let message = DMSCQueueMessage::new(payload);
68//!
69//! // Send the message
70//! producer.send(message).await?;
71//!
72//! // Create a consumer
73//! let consumer = queue.create_consumer("consumer_group_1").await?;
74//!
75//! // Receive a message
76//! if let Some(message) = consumer.receive().await? {
77//! // Process the message
78//! let payload = String::from_utf8_lossy(&message.payload);
79//! println!("Received message: {}", payload);
80//!
81//! // Acknowledge the message
82//! consumer.ack(&message.id).await?;
83//! }
84//!
85//! Ok(())
86//! }
87//! ```
88
89use crate::core::DMSCResult;
90use crate::queue::{DMSCQueue, DMSCQueueConsumer, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueStats};
91use async_trait::async_trait;
92use std::collections::{HashMap, VecDeque};
93use std::fs::{File, OpenOptions};
94use std::io::{Read, Write};
95use std::path::Path;
96use std::sync::Arc;
97use tokio::sync::{Mutex, RwLock};
98use tokio::task::spawn_blocking;
99
100/// Internal state management for the in-memory queue.
101///
102/// This struct holds the queue's messages and consumer-specific queues. It is protected by a
103/// RwLock to ensure thread-safe access.
104struct MemoryQueueState {
105 /// Main queue of messages waiting to be consumed
106 messages: VecDeque<DMSCQueueMessage>,
107 /// Map of consumer group names to their respective message queues
108 consumers: HashMap<String, VecDeque<DMSCQueueMessage>>,
109}
110
111impl MemoryQueueState {
112 /// Creates a new MemoryQueueState with empty queues.
113 ///
114 /// # Returns
115 ///
116 /// A new MemoryQueueState instance
117 fn new() -> Self {
118 Self {
119 messages: VecDeque::new(),
120 consumers: HashMap::new(),
121 }
122 }
123}
124
125/// In-memory queue implementation.
126///
127/// This struct implements the DMSCQueue trait for an in-memory queue backend. It supports optional
128/// disk persistence for message durability.
129pub struct DMSCMemoryQueue {
130 /// Name of the queue
131 name: String,
132 /// Internal queue state protected by a RwLock
133 state: Arc<RwLock<MemoryQueueState>>,
134 /// Optional path for disk persistence
135 persistence_path: Option<String>,
136}
137
138#[allow(dead_code)]
139impl DMSCMemoryQueue {
140 /// Creates a new in-memory queue without persistence.
141 ///
142 /// # Parameters
143 ///
144 /// - `name`: The name of the queue
145 ///
146 /// # Returns
147 ///
148 /// A new DMSCMemoryQueue instance
149 pub fn new(name: &str) -> Self {
150 Self {
151 name: name.to_string(),
152 state: Arc::new(RwLock::new(MemoryQueueState::new())),
153 persistence_path: None,
154 }
155 }
156
157 /// Creates a new in-memory queue with disk persistence.
158 ///
159 /// # Parameters
160 ///
161 /// - `name`: The name of the queue
162 /// - `persistence_path`: Path to the file where messages will be persisted
163 ///
164 /// # Returns
165 ///
166 /// A new DMSCMemoryQueue instance with persistence enabled
167 pub fn with_persistence(name: &str, persistence_path: &str) -> Self {
168 let queue = Self {
169 name: name.to_string(),
170 state: Arc::new(RwLock::new(MemoryQueueState::new())),
171 persistence_path: Some(persistence_path.to_string()),
172 };
173
174 // Load messages from disk if persistence is enabled
175 if let Err(e) = queue.load_messages() {
176 log::warn!("Failed to load persisted messages for queue '{name}': {e}");
177 }
178
179 queue
180 }
181
182 /// Loads messages from disk if persistence is enabled.
183 ///
184 /// # Returns
185 ///
186 /// A `DMSCResult<()>` indicating success or failure
187 fn load_messages(&self) -> DMSCResult<()> {
188 if let Some(path) = &self.persistence_path {
189 if Path::new(path).exists() {
190 let mut file = File::open(path)?;
191 let mut content = String::new();
192 file.read_to_string(&mut content)?;
193
194 if !content.is_empty() {
195 let messages: VecDeque<DMSCQueueMessage> = serde_json::from_str(&content)?;
196 let mut state = self.state.blocking_write();
197 state.messages = messages;
198 }
199 }
200 }
201 Ok(())
202 }
203
204 /// Saves messages to disk if persistence is enabled.
205 ///
206 /// # Returns
207 ///
208 /// A `DMSCResult<()>` indicating success or failure
209 fn save_messages(&self) -> DMSCResult<()> {
210 if let Some(path) = &self.persistence_path {
211 let state = self.state.blocking_read();
212 let content = serde_json::to_string(&state.messages)?;
213
214 let mut file = OpenOptions::new()
215 .write(true)
216 .create(true)
217 .truncate(true)
218 .open(path)?;
219
220 file.write_all(content.as_bytes())?;
221 }
222 Ok(())
223 }
224}
225
226#[async_trait]
227impl DMSCQueue for DMSCMemoryQueue {
228 /// Creates a new producer for this queue.
229 ///
230 /// # Returns
231 ///
232 /// A `DMSCResult<Box<dyn DMSCQueueProducer>>` containing the producer
233 async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
234 Ok(Box::new(MemoryQueueProducer {
235 state: self.state.clone(),
236 persistence_path: self.persistence_path.clone(),
237 }))
238 }
239
240 /// Creates a new consumer for this queue with the given consumer group.
241 ///
242 /// # Parameters
243 ///
244 /// - `consumer_group`: The name of the consumer group
245 ///
246 /// # Returns
247 ///
248 /// A `DMSCResult<Box<dyn DMSCQueueConsumer>>` containing the consumer
249 async fn create_consumer(
250 &self,
251 consumer_group: &str,
252 ) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
253 Ok(Box::new(MemoryQueueConsumer {
254 state: self.state.clone(),
255 consumer_group: consumer_group.to_string(),
256 paused: Arc::new(Mutex::new(false)),
257 persistence_path: self.persistence_path.clone(),
258 }))
259 }
260
261 /// Gets statistics for this queue.
262 ///
263 /// # Returns
264 ///
265 /// A `DMSCResult<DMSCQueueStats>` containing the queue statistics
266 async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
267 let state = self.state.read().await;
268 Ok(DMSCQueueStats {
269 queue_name: self.name.clone(),
270 message_count: state.messages.len() as u64,
271 consumer_count: state.consumers.len() as u32,
272 producer_count: 1,
273 processed_messages: 0,
274 failed_messages: 0,
275 avg_processing_time_ms: 0.0,
276 total_bytes_sent: 0,
277 total_bytes_received: 0,
278 last_message_time: 0,
279 })
280 }
281
282 /// Purges all messages from this queue.
283 ///
284 /// # Returns
285 ///
286 /// A `DMSCResult<()>` indicating success or failure
287 async fn purge(&self) -> DMSCResult<()> {
288 let mut state = self.state.write().await;
289 state.messages.clear();
290 state.consumers.clear();
291
292 // Clear persistence file if enabled
293 if let Some(path) = &self.persistence_path {
294 let path_clone = path.clone();
295 spawn_blocking(move || {
296 if Path::new(&path_clone).exists() {
297 if let Err(e) = std::fs::remove_file(&path_clone) {
298 log::warn!("Failed to remove persistence file '{path_clone}': {e}");
299 }
300 }
301 })
302 .await
303 .map_err(|e| {
304 log::error!("Failed to execute persistence file removal: {e}");
305 crate::core::DMSCError::Other(format!("Failed to clear persistence: {e}"))
306 })?;
307 }
308
309 Ok(())
310 }
311
312 /// Deletes this queue.
313 ///
314 /// # Returns
315 ///
316 /// A `DMSCResult<()>` indicating success or failure
317 async fn delete(&self) -> DMSCResult<()> {
318 self.purge().await
319 }
320}
321
322/// Producer implementation for the in-memory queue.
323///
324/// This struct implements the DMSCQueueProducer trait for sending messages to the in-memory queue.
325struct MemoryQueueProducer {
326 /// Shared queue state
327 state: Arc<RwLock<MemoryQueueState>>,
328 /// Optional path for disk persistence
329 persistence_path: Option<String>,
330}
331
332#[async_trait]
333impl DMSCQueueProducer for MemoryQueueProducer {
334 /// Sends a single message to the queue.
335 ///
336 /// # Parameters
337 ///
338 /// - `message`: The message to send
339 ///
340 /// # Returns
341 ///
342 /// A `DMSCResult<()>` indicating success or failure
343 async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
344 let mut state = self.state.write().await;
345 state.messages.push_back(message);
346
347 // Save to disk if persistence is enabled
348 if let Some(path) = &self.persistence_path {
349 let messages_clone = state.messages.clone();
350 let path_clone = path.clone();
351
352 let _ = spawn_blocking(move || {
353 let content = serde_json::to_string(&messages_clone)
354 .map_err(|e| {
355 log::error!("Failed to serialize messages for persistence: {e}");
356 crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
357 })?;
358 let mut file = OpenOptions::new()
359 .write(true)
360 .create(true)
361 .truncate(true)
362 .open(path_clone)
363 .map_err(|e| {
364 log::error!("Failed to open persistence file: {e}");
365 crate::core::DMSCError::Io(format!("File open failed: {e}"))
366 })?;
367 file.write_all(content.as_bytes())
368 .map_err(|e| {
369 log::error!("Failed to write persistence file: {e}");
370 crate::core::DMSCError::Io(format!("File write failed: {e}"))
371 })?;
372 Ok::<(), crate::core::DMSCError>(())
373 })
374 .await
375 .map_err(|e| {
376 log::error!("Failed to execute persistence task: {e}");
377 crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
378 });
379 }
380
381 Ok(())
382 }
383
384 /// Sends multiple messages to the queue in a batch.
385 ///
386 /// # Parameters
387 ///
388 /// - `messages`: A vector of messages to send
389 ///
390 /// # Returns
391 ///
392 /// A `DMSCResult<()>` indicating success or failure
393 async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
394 let mut state = self.state.write().await;
395 for message in messages {
396 state.messages.push_back(message);
397 }
398
399 // Save to disk if persistence is enabled
400 if let Some(path) = &self.persistence_path {
401 let messages_clone = state.messages.clone();
402 let path_clone = path.clone();
403
404 let _ = spawn_blocking(move || {
405 let content = serde_json::to_string(&messages_clone)
406 .map_err(|e| {
407 log::error!("Failed to serialize messages for persistence: {e}");
408 crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
409 })?;
410 let mut file = OpenOptions::new()
411 .write(true)
412 .create(true)
413 .truncate(true)
414 .open(path_clone)
415 .map_err(|e| {
416 log::error!("Failed to open persistence file: {e}");
417 crate::core::DMSCError::Io(format!("File open failed: {e}"))
418 })?;
419 file.write_all(content.as_bytes())
420 .map_err(|e| {
421 log::error!("Failed to write persistence file: {e}");
422 crate::core::DMSCError::Io(format!("File write failed: {e}"))
423 })?;
424 Ok::<(), crate::core::DMSCError>(())
425 })
426 .await
427 .map_err(|e| {
428 log::error!("Failed to execute persistence task: {e}");
429 crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
430 });
431 }
432
433 Ok(())
434 }
435}
436
437/// Consumer implementation for the in-memory queue.
438///
439/// This struct implements the DMSCQueueConsumer trait for receiving messages from the in-memory queue.
440struct MemoryQueueConsumer {
441 /// Shared queue state
442 state: Arc<RwLock<MemoryQueueState>>,
443 /// Name of the consumer group
444 consumer_group: String,
445 /// Flag indicating if the consumer is paused
446 paused: Arc<Mutex<bool>>,
447 /// Optional path for disk persistence
448 persistence_path: Option<String>,
449}
450
451#[async_trait]
452impl DMSCQueueConsumer for MemoryQueueConsumer {
453 /// Receives a message from the queue.
454 ///
455 /// # Returns
456 ///
457 /// A `DMSCResult<Option<DMSCQueueMessage>>` containing the message if available, or None if no message is available
458 async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
459 let paused = *self.paused.lock().await;
460 if paused {
461 return Ok(None);
462 }
463
464 let mut state = self.state.write().await;
465
466 // If consumer queue exists and has messages, return one
467 if let Some(consumer_queue) = state.consumers.get_mut(&self.consumer_group) {
468 if let Some(message) = consumer_queue.pop_front() {
469 return Ok(Some(message));
470 }
471 }
472
473 // If main queue has messages, move one to consumer queue
474 if let Some(message) = state.messages.pop_front() {
475 let mut consumer_queue = VecDeque::new();
476 consumer_queue.push_back(message.clone());
477 state
478 .consumers
479 .insert(self.consumer_group.clone(), consumer_queue);
480
481 // Save to disk if persistence is enabled (since main queue changed)
482 if let Some(path) = &self.persistence_path {
483 let messages_clone = state.messages.clone();
484 let path_clone = path.clone();
485
486 let _ = spawn_blocking(move || {
487 let content = serde_json::to_string(&messages_clone)
488 .map_err(|e| {
489 log::error!("Failed to serialize messages for persistence: {e}");
490 crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
491 })?;
492 let mut file = OpenOptions::new()
493 .write(true)
494 .create(true)
495 .truncate(true)
496 .open(path_clone)
497 .map_err(|e| {
498 log::error!("Failed to open persistence file: {e}");
499 crate::core::DMSCError::Io(format!("File open failed: {e}"))
500 })?;
501 file.write_all(content.as_bytes())
502 .map_err(|e| {
503 log::error!("Failed to write persistence file: {e}");
504 crate::core::DMSCError::Io(format!("File write failed: {e}"))
505 })?;
506 Ok::<(), crate::core::DMSCError>(())
507 })
508 .await
509 .map_err(|e| {
510 log::error!("Failed to execute persistence task: {e}");
511 crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
512 });
513 }
514
515 Ok(Some(message))
516 } else {
517 Ok(None)
518 }
519 }
520
521 /// Acknowledges a message, indicating it has been successfully processed.
522 ///
523 /// For in-memory queues, acknowledgment is implicit when the message is received.
524 ///
525 /// # Parameters
526 ///
527 /// - `_message_id`: The ID of the message to acknowledge (not used for in-memory queues)
528 ///
529 /// # Returns
530 ///
531 /// A `DMSCResult<()>` indicating success or failure
532 async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
533 // In memory queue, acknowledgment is implicit when message is received
534 Ok(())
535 }
536
537 /// Negatively acknowledges a message, indicating it failed to process and should be retried.
538 ///
539 /// # Parameters
540 ///
541 /// - `message_id`: The ID of the message to negatively acknowledge
542 ///
543 /// # Returns
544 ///
545 /// A `DMSCResult<()>` indicating success or failure
546 async fn nack(&self, message_id: &str) -> DMSCResult<()> {
547 // Find the message in consumer queue and put it back in main queue
548 let mut state = self.state.write().await;
549
550 if let Some(consumer_queue) = state.consumers.get_mut(&self.consumer_group) {
551 // Find the message by ID
552 let mut message_to_requeue: Option<DMSCQueueMessage> = None;
553
554 // Iterate through consumer queue to find the message
555 let mut index = 0;
556 for (i, message) in consumer_queue.iter().enumerate() {
557 if message.id == message_id {
558 message_to_requeue = Some(message.clone());
559 index = i;
560 break;
561 }
562 }
563
564 // If found, remove from consumer queue and put back in main queue
565 if let Some(mut message) = message_to_requeue {
566 consumer_queue.remove(index);
567 message.increment_retry();
568 state.messages.push_back(message);
569
570 // Save to disk if persistence is enabled
571 if let Some(path) = &self.persistence_path {
572 let messages_clone = state.messages.clone();
573 let path_clone = path.clone();
574
575 let _ = spawn_blocking(move || {
576 let content = serde_json::to_string(&messages_clone)
577 .map_err(|e| {
578 log::error!("Failed to serialize messages for persistence: {e}");
579 crate::core::DMSCError::Serde(format!("Serialization failed: {e}"))
580 })?;
581 let mut file = OpenOptions::new()
582 .write(true)
583 .create(true)
584 .truncate(true)
585 .open(path_clone)
586 .map_err(|e| {
587 log::error!("Failed to open persistence file: {e}");
588 crate::core::DMSCError::Io(format!("File open failed: {e}"))
589 })?;
590 file.write_all(content.as_bytes())
591 .map_err(|e| {
592 log::error!("Failed to write persistence file: {e}");
593 crate::core::DMSCError::Io(format!("File write failed: {e}"))
594 })?;
595 Ok::<(), crate::core::DMSCError>(())
596 })
597 .await
598 .map_err(|e| {
599 log::error!("Failed to execute persistence task: {e}");
600 crate::core::DMSCError::Other(format!("Persistence task failed: {e}"))
601 });
602 }
603 }
604 }
605
606 Ok(())
607 }
608
609 /// Pauses message consumption.
610 ///
611 /// # Returns
612 ///
613 /// A `DMSCResult<()>` indicating success or failure
614 async fn pause(&self) -> DMSCResult<()> {
615 let mut paused = self.paused.lock().await;
616 *paused = true;
617 Ok(())
618 }
619
620 /// Resumes message consumption after pausing.
621 ///
622 /// # Returns
623 ///
624 /// A `DMSCResult<()>` indicating success or failure
625 async fn resume(&self) -> DMSCResult<()> {
626 let mut paused = self.paused.lock().await;
627 *paused = false;
628 Ok(())
629 }
630}