dmsc/queue/
core.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Queue Core Implementation
21//! 
22//! This file defines the core queueing interfaces and message structures for the DMSC queue system.
23//! It provides the fundamental building blocks for implementing various queue backends.
24//! 
25//! ## Key Components
26//! 
27//! - **DMSCQueueMessage**: Message structure for queue operations
28//! - **QueueStats**: Statistics for queue monitoring
29//! - **DMSCQueueProducer**: Trait for producing messages to queues
30//! - **DMSCQueueConsumer**: Trait for consuming messages from queues
31//! - **DMSCQueue**: Main queue trait defining queue operations
32//! 
33//! ## Design Principles
34//! 
35//! 1. **Async-First**: All queue operations are asynchronous
36//! 2. **Type Safety**: Strongly typed message structures
37//! 3. **Retry Mechanism**: Built-in support for message retry with configurable maximum attempts
38//! 4. **Header Support**: Allows adding custom headers to messages
39//! 5. **Statistics**: Comprehensive queue statistics for monitoring
40//! 6. **Extensible**: Easy to implement new queue backends
41//! 7. **Thread-safe**: All traits are Send + Sync for safe concurrent use
42//! 
43//! ## Usage
44//! 
45//! ```rust
46//! use dmsc::queue::{DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueue};
47//! use dmsc::core::DMSCResult;
48//! use serde_json::json;
49//! 
50//! async fn example(queue: &dyn DMSCQueue) -> DMSCResult<()> {
51//!     // Create a producer
52//!     let producer = queue.create_producer().await?;
53//!     
54//!     // Create a message
55//!     let payload = json!({ "key": "value" }).to_string().into_bytes();
56//!     let message = DMSCQueueMessage::new(payload)
57//!         .with_max_retries(5);
58//!     
59//!     // Send the message
60//!     producer.send(message).await?;
61//!     
62//!     // Create a consumer
63//!     let consumer = queue.create_consumer("consumer_group_1").await?;
64//!     
65//!     // Receive a message
66//!     if let Some(message) = consumer.receive().await? {
67//!         // Process the message
68//!         let payload = String::from_utf8_lossy(&message.payload);
69//!         println!("Received message: {}", payload);
70//!         
71//!         // Acknowledge the message
72//!         consumer.ack(&message.id).await?;
73//!     }
74//!     
75//!     Ok(())
76//! }
77//! ```
78
79use async_trait::async_trait;
80use serde::{Serialize, Deserialize};
81use std::collections::HashMap;
82use std::time::SystemTime;
83use crate::core::DMSCResult;
84
85/// Error types for queue operations.
86#[derive(Debug, Clone)]
87pub enum DMSCQueueError {
88    /// Backend-specific error with descriptive message
89    BackendError(String),
90    /// Configuration error
91    ConfigError(String),
92    /// Connection error
93    ConnectionError(String),
94    /// Message not found
95    MessageNotFound(String),
96    /// Consumer group error
97    ConsumerGroupError(String),
98    /// Serialization error
99    SerializationError(String),
100}
101
102impl std::fmt::Display for DMSCQueueError {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match self {
105            DMSCQueueError::BackendError(msg) => write!(f, "Queue backend error: {}", msg),
106            DMSCQueueError::ConfigError(msg) => write!(f, "Queue configuration error: {}", msg),
107            DMSCQueueError::ConnectionError(msg) => write!(f, "Queue connection error: {}", msg),
108            DMSCQueueError::MessageNotFound(msg) => write!(f, "Message not found: {}", msg),
109            DMSCQueueError::ConsumerGroupError(msg) => write!(f, "Consumer group error: {}", msg),
110            DMSCQueueError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
111        }
112    }
113}
114
115impl std::error::Error for DMSCQueueError {}
116
117impl From<DMSCQueueError> for crate::core::DMSCError {
118    fn from(error: DMSCQueueError) -> Self {
119        crate::core::DMSCError::Queue(error.to_string())
120    }
121}
122
123/// Message structure for queue operations.
124/// 
125/// This struct represents a message that can be sent to and received from queues. It includes
126/// a unique ID, payload, headers, timestamp, and retry information.
127#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct DMSCQueueMessage {
130    /// Unique message ID
131    pub id: String,
132    /// Message payload as bytes
133    pub payload: Vec<u8>,
134    /// Custom headers for the message
135    pub headers: HashMap<String, String>,
136    /// Timestamp when the message was created
137    pub timestamp: SystemTime,
138    /// Number of times this message has been retried
139    pub retry_count: u32,
140    /// Maximum number of retry attempts allowed
141    pub max_retries: u32,
142}
143
144impl DMSCQueueMessage {
145    /// Creates a new message with the given payload.
146    /// 
147    /// # Parameters
148    /// 
149    /// - `payload`: The message payload as bytes
150    /// 
151    /// # Returns
152    /// 
153    /// A new `DMSCQueueMessage` instance
154    pub fn new(payload: Vec<u8>) -> Self {
155        Self {
156            id: uuid::Uuid::new_v4().to_string(),
157            payload,
158            headers: HashMap::new(),
159            timestamp: SystemTime::now(),
160            retry_count: 0,
161            max_retries: 3,
162        }
163    }
164
165    /// Adds custom headers to the message.
166    /// 
167    /// # Parameters
168    /// 
169    /// - `headers`: A HashMap of custom headers
170    /// 
171    /// # Returns
172    /// 
173    /// The updated `DMSCQueueMessage` instance
174    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
175        self.headers = headers;
176        self
177    }
178
179    /// Sets the maximum number of retry attempts for this message.
180    /// 
181    /// # Parameters
182    /// 
183    /// - `max_retries`: The maximum number of retry attempts
184    /// 
185    /// # Returns
186    /// 
187    /// The updated `DMSCQueueMessage` instance
188    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
189        self.max_retries = max_retries;
190        self
191    }
192
193    /// Increments the retry count for this message.
194    pub fn increment_retry(&mut self) {
195        self.retry_count += 1;
196    }
197
198    /// Checks if this message can be retried.
199    /// 
200    /// # Returns
201    /// 
202    /// `true` if the message can be retried, `false` otherwise
203    pub fn can_retry(&self) -> bool {
204        self.retry_count < self.max_retries
205    }
206}
207
208#[cfg(feature = "pyo3")]
209/// Python bindings for DMSCQueueMessage
210#[pyo3::prelude::pymethods]
211impl DMSCQueueMessage {
212    #[new]
213    fn py_new(payload: Vec<u8>) -> Self {
214        Self::new(payload)
215    }
216    
217    #[staticmethod]
218    fn py_new_with_string(payload: String) -> Self {
219        Self::new(payload.into_bytes())
220    }
221    
222    fn get_payload_string(&self) -> String {
223        String::from_utf8_lossy(&self.payload).to_string()
224    }
225    
226    fn get_id(&self) -> String {
227        self.id.clone()
228    }
229}
230
231/// Statistics for queue monitoring.
232/// 
233/// This struct contains comprehensive statistics about a queue's performance and usage.
234#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
235#[derive(Debug, Clone)]
236pub struct DMSCQueueStats {
237    /// Name of the queue
238    pub queue_name: String,
239    /// Current number of messages in the queue
240    pub message_count: u64,
241    /// Number of active consumers
242    pub consumer_count: u32,
243    /// Number of active producers
244    pub producer_count: u32,
245    /// Total number of processed messages
246    pub processed_messages: u64,
247    /// Total number of failed messages
248    pub failed_messages: u64,
249    /// Average processing time in milliseconds
250    pub avg_processing_time_ms: f64,
251    /// Total bytes sent
252    pub total_bytes_sent: u64,
253    /// Total bytes received
254    pub total_bytes_received: u64,
255    /// Timestamp of last message (milliseconds since start)
256    pub last_message_time: u64,
257}
258
259#[cfg(feature = "pyo3")]
260/// Python bindings for DMSCQueueStats
261#[pyo3::prelude::pymethods]
262impl DMSCQueueStats {
263    #[new]
264    fn py_new(queue_name: String) -> Self {
265        Self {
266            queue_name,
267            message_count: 0,
268            consumer_count: 0,
269            producer_count: 0,
270            processed_messages: 0,
271            failed_messages: 0,
272            avg_processing_time_ms: 0.0,
273            total_bytes_sent: 0,
274            total_bytes_received: 0,
275            last_message_time: 0,
276        }
277    }
278}
279
280/// Trait for producing messages to queues.
281/// 
282/// This trait defines the interface for sending messages to queues, including single message
283/// sends and batch sends.
284#[async_trait]
285pub trait DMSCQueueProducer: Send + Sync {
286    async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()>;
287    
288    async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()>;
289
290    async fn send_multi(&self, messages: &[DMSCQueueMessage]) -> DMSCResult<()> {
291        for message in messages {
292            self.send(message.clone()).await?;
293        }
294        Ok(())
295    }
296}
297
298/// Trait for consuming messages from queues.
299/// 
300/// This trait defines the interface for receiving and acknowledging messages from queues.
301#[async_trait]
302pub trait DMSCQueueConsumer: Send + Sync {
303    /// Receives a message from the queue.
304    /// 
305    /// # Returns
306    /// 
307    /// A `DMSCResult<Option<DMSCQueueMessage>>` containing the message if available, or None if no message is available
308    async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>>;
309    
310    /// Acknowledges a message, indicating it has been successfully processed.
311    /// 
312    /// # Parameters
313    /// 
314    /// - `message_id`: The ID of the message to acknowledge
315    /// 
316    /// # Returns
317    /// 
318    /// A `DMSCResult<()>` indicating success or failure
319    async fn ack(&self, message_id: &str) -> DMSCResult<()>;
320    
321    /// Negatively acknowledges a message, indicating it failed to process and should be retried.
322    /// 
323    /// # Parameters
324    /// 
325    /// - `message_id`: The ID of the message to negatively acknowledge
326    /// 
327    /// # Returns
328    /// 
329    /// A `DMSCResult<()>` indicating success or failure
330    async fn nack(&self, message_id: &str) -> DMSCResult<()>;
331    
332    /// Pauses message consumption.
333    /// 
334    /// # Returns
335    /// 
336    /// A `DMSCResult<()>` indicating success or failure
337    async fn pause(&self) -> DMSCResult<()>;
338    
339    /// Resumes message consumption after pausing.
340    /// 
341    /// # Returns
342    /// 
343    /// A `DMSCResult<()>` indicating success or failure
344    async fn resume(&self) -> DMSCResult<()>;
345
346    async fn receive_multi(&self, count: usize) -> DMSCResult<Vec<Option<DMSCQueueMessage>>> {
347        let mut messages = Vec::with_capacity(count);
348        for _ in 0..count {
349            messages.push(self.receive().await?);
350        }
351        Ok(messages)
352    }
353
354    async fn ack_multi(&self, message_ids: &[String]) -> DMSCResult<()> {
355        for id in message_ids {
356            self.ack(id).await?;
357        }
358        Ok(())
359    }
360}
361
362/// Main queue trait defining queue operations.
363/// 
364/// This trait defines the core operations for queues, including creating producers and consumers,
365/// getting statistics, purging queues, and deleting queues.
366#[async_trait]
367pub trait DMSCQueue: Send + Sync {
368    /// Creates a new producer for this queue.
369    /// 
370    /// # Returns
371    /// 
372    /// A `DMSCResult<Box<dyn DMSCQueueProducer>>` containing the producer
373    async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>>;
374    
375    /// Creates a new consumer for this queue with the given consumer group.
376    /// 
377    /// # Parameters
378    /// 
379    /// - `consumer_group`: The name of the consumer group
380    /// 
381    /// # Returns
382    /// 
383    /// A `DMSCResult<Box<dyn DMSCQueueConsumer>>` containing the consumer
384    async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>>;
385    
386    /// Gets statistics for this queue.
387    /// 
388    /// # Returns
389    /// 
390    /// A `DMSCResult<DMSCQueueStats>` containing the queue statistics
391    async fn get_stats(&self) -> DMSCResult<DMSCQueueStats>;
392    
393    /// Purges all messages from this queue.
394    /// 
395    /// # Returns
396    /// 
397    /// A `DMSCResult<()>` indicating success or failure
398    async fn purge(&self) -> DMSCResult<()>;
399    
400    /// Deletes this queue.
401    /// 
402    /// # Returns
403    /// 
404    /// A `DMSCResult<()>` indicating success or failure
405    async fn delete(&self) -> DMSCResult<()>;
406}