dmsc/queue/core.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Queue Core Implementation
21//!
22//! This file defines the core queueing interfaces and message structures for the DMSC queue system.
23//! It provides the fundamental building blocks for implementing various queue backends.
24//!
25//! ## Key Components
26//!
27//! - **DMSCQueueMessage**: Message structure for queue operations
28//! - **QueueStats**: Statistics for queue monitoring
29//! - **DMSCQueueProducer**: Trait for producing messages to queues
30//! - **DMSCQueueConsumer**: Trait for consuming messages from queues
31//! - **DMSCQueue**: Main queue trait defining queue operations
32//!
33//! ## Design Principles
34//!
35//! 1. **Async-First**: All queue operations are asynchronous
36//! 2. **Type Safety**: Strongly typed message structures
37//! 3. **Retry Mechanism**: Built-in support for message retry with configurable maximum attempts
38//! 4. **Header Support**: Allows adding custom headers to messages
39//! 5. **Statistics**: Comprehensive queue statistics for monitoring
40//! 6. **Extensible**: Easy to implement new queue backends
41//! 7. **Thread-safe**: All traits are Send + Sync for safe concurrent use
42//!
43//! ## Usage
44//!
45//! ```rust
46//! use dmsc::queue::{DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueue};
47//! use dmsc::core::DMSCResult;
48//! use serde_json::json;
49//!
50//! async fn example(queue: &dyn DMSCQueue) -> DMSCResult<()> {
51//! // Create a producer
52//! let producer = queue.create_producer().await?;
53//!
54//! // Create a message
55//! let payload = json!({ "key": "value" }).to_string().into_bytes();
56//! let message = DMSCQueueMessage::new(payload)
57//! .with_max_retries(5);
58//!
59//! // Send the message
60//! producer.send(message).await?;
61//!
62//! // Create a consumer
63//! let consumer = queue.create_consumer("consumer_group_1").await?;
64//!
65//! // Receive a message
66//! if let Some(message) = consumer.receive().await? {
67//! // Process the message
68//! let payload = String::from_utf8_lossy(&message.payload);
69//! println!("Received message: {}", payload);
70//!
71//! // Acknowledge the message
72//! consumer.ack(&message.id).await?;
73//! }
74//!
75//! Ok(())
76//! }
77//! ```
78
79use async_trait::async_trait;
80use serde::{Serialize, Deserialize};
81use std::collections::HashMap;
82use std::time::SystemTime;
83use crate::core::DMSCResult;
84
85/// Error types for queue operations.
86#[derive(Debug, Clone)]
87pub enum DMSCQueueError {
88 /// Backend-specific error with descriptive message
89 BackendError(String),
90 /// Configuration error
91 ConfigError(String),
92 /// Connection error
93 ConnectionError(String),
94 /// Message not found
95 MessageNotFound(String),
96 /// Consumer group error
97 ConsumerGroupError(String),
98 /// Serialization error
99 SerializationError(String),
100}
101
102impl std::fmt::Display for DMSCQueueError {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 DMSCQueueError::BackendError(msg) => write!(f, "Queue backend error: {}", msg),
106 DMSCQueueError::ConfigError(msg) => write!(f, "Queue configuration error: {}", msg),
107 DMSCQueueError::ConnectionError(msg) => write!(f, "Queue connection error: {}", msg),
108 DMSCQueueError::MessageNotFound(msg) => write!(f, "Message not found: {}", msg),
109 DMSCQueueError::ConsumerGroupError(msg) => write!(f, "Consumer group error: {}", msg),
110 DMSCQueueError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
111 }
112 }
113}
114
115impl std::error::Error for DMSCQueueError {}
116
117impl From<DMSCQueueError> for crate::core::DMSCError {
118 fn from(error: DMSCQueueError) -> Self {
119 crate::core::DMSCError::Queue(error.to_string())
120 }
121}
122
123/// Message structure for queue operations.
124///
125/// This struct represents a message that can be sent to and received from queues. It includes
126/// a unique ID, payload, headers, timestamp, and retry information.
127#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct DMSCQueueMessage {
130 /// Unique message ID
131 pub id: String,
132 /// Message payload as bytes
133 pub payload: Vec<u8>,
134 /// Custom headers for the message
135 pub headers: HashMap<String, String>,
136 /// Timestamp when the message was created
137 pub timestamp: SystemTime,
138 /// Number of times this message has been retried
139 pub retry_count: u32,
140 /// Maximum number of retry attempts allowed
141 pub max_retries: u32,
142}
143
144impl DMSCQueueMessage {
145 /// Creates a new message with the given payload.
146 ///
147 /// # Parameters
148 ///
149 /// - `payload`: The message payload as bytes
150 ///
151 /// # Returns
152 ///
153 /// A new `DMSCQueueMessage` instance
154 pub fn new(payload: Vec<u8>) -> Self {
155 Self {
156 id: uuid::Uuid::new_v4().to_string(),
157 payload,
158 headers: HashMap::new(),
159 timestamp: SystemTime::now(),
160 retry_count: 0,
161 max_retries: 3,
162 }
163 }
164
165 /// Adds custom headers to the message.
166 ///
167 /// # Parameters
168 ///
169 /// - `headers`: A HashMap of custom headers
170 ///
171 /// # Returns
172 ///
173 /// The updated `DMSCQueueMessage` instance
174 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
175 self.headers = headers;
176 self
177 }
178
179 /// Sets the maximum number of retry attempts for this message.
180 ///
181 /// # Parameters
182 ///
183 /// - `max_retries`: The maximum number of retry attempts
184 ///
185 /// # Returns
186 ///
187 /// The updated `DMSCQueueMessage` instance
188 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
189 self.max_retries = max_retries;
190 self
191 }
192
193 /// Increments the retry count for this message.
194 pub fn increment_retry(&mut self) {
195 self.retry_count += 1;
196 }
197
198 /// Checks if this message can be retried.
199 ///
200 /// # Returns
201 ///
202 /// `true` if the message can be retried, `false` otherwise
203 pub fn can_retry(&self) -> bool {
204 self.retry_count < self.max_retries
205 }
206}
207
208#[cfg(feature = "pyo3")]
209/// Python bindings for DMSCQueueMessage
210#[pyo3::prelude::pymethods]
211impl DMSCQueueMessage {
212 #[new]
213 fn py_new(payload: Vec<u8>) -> Self {
214 Self::new(payload)
215 }
216
217 #[staticmethod]
218 fn py_new_with_string(payload: String) -> Self {
219 Self::new(payload.into_bytes())
220 }
221
222 fn get_payload_string(&self) -> String {
223 String::from_utf8_lossy(&self.payload).to_string()
224 }
225
226 fn get_id(&self) -> String {
227 self.id.clone()
228 }
229}
230
231/// Statistics for queue monitoring.
232///
233/// This struct contains comprehensive statistics about a queue's performance and usage.
234#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
235#[derive(Debug, Clone)]
236pub struct DMSCQueueStats {
237 /// Name of the queue
238 pub queue_name: String,
239 /// Current number of messages in the queue
240 pub message_count: u64,
241 /// Number of active consumers
242 pub consumer_count: u32,
243 /// Number of active producers
244 pub producer_count: u32,
245 /// Total number of processed messages
246 pub processed_messages: u64,
247 /// Total number of failed messages
248 pub failed_messages: u64,
249 /// Average processing time in milliseconds
250 pub avg_processing_time_ms: f64,
251 /// Total bytes sent
252 pub total_bytes_sent: u64,
253 /// Total bytes received
254 pub total_bytes_received: u64,
255 /// Timestamp of last message (milliseconds since start)
256 pub last_message_time: u64,
257}
258
259#[cfg(feature = "pyo3")]
260/// Python bindings for DMSCQueueStats
261#[pyo3::prelude::pymethods]
262impl DMSCQueueStats {
263 #[new]
264 fn py_new(queue_name: String) -> Self {
265 Self {
266 queue_name,
267 message_count: 0,
268 consumer_count: 0,
269 producer_count: 0,
270 processed_messages: 0,
271 failed_messages: 0,
272 avg_processing_time_ms: 0.0,
273 total_bytes_sent: 0,
274 total_bytes_received: 0,
275 last_message_time: 0,
276 }
277 }
278}
279
280/// Trait for producing messages to queues.
281///
282/// This trait defines the interface for sending messages to queues, including single message
283/// sends and batch sends.
284#[async_trait]
285pub trait DMSCQueueProducer: Send + Sync {
286 async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()>;
287
288 async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()>;
289
290 async fn send_multi(&self, messages: &[DMSCQueueMessage]) -> DMSCResult<()> {
291 for message in messages {
292 self.send(message.clone()).await?;
293 }
294 Ok(())
295 }
296}
297
298/// Trait for consuming messages from queues.
299///
300/// This trait defines the interface for receiving and acknowledging messages from queues.
301#[async_trait]
302pub trait DMSCQueueConsumer: Send + Sync {
303 /// Receives a message from the queue.
304 ///
305 /// # Returns
306 ///
307 /// A `DMSCResult<Option<DMSCQueueMessage>>` containing the message if available, or None if no message is available
308 async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>>;
309
310 /// Acknowledges a message, indicating it has been successfully processed.
311 ///
312 /// # Parameters
313 ///
314 /// - `message_id`: The ID of the message to acknowledge
315 ///
316 /// # Returns
317 ///
318 /// A `DMSCResult<()>` indicating success or failure
319 async fn ack(&self, message_id: &str) -> DMSCResult<()>;
320
321 /// Negatively acknowledges a message, indicating it failed to process and should be retried.
322 ///
323 /// # Parameters
324 ///
325 /// - `message_id`: The ID of the message to negatively acknowledge
326 ///
327 /// # Returns
328 ///
329 /// A `DMSCResult<()>` indicating success or failure
330 async fn nack(&self, message_id: &str) -> DMSCResult<()>;
331
332 /// Pauses message consumption.
333 ///
334 /// # Returns
335 ///
336 /// A `DMSCResult<()>` indicating success or failure
337 async fn pause(&self) -> DMSCResult<()>;
338
339 /// Resumes message consumption after pausing.
340 ///
341 /// # Returns
342 ///
343 /// A `DMSCResult<()>` indicating success or failure
344 async fn resume(&self) -> DMSCResult<()>;
345
346 async fn receive_multi(&self, count: usize) -> DMSCResult<Vec<Option<DMSCQueueMessage>>> {
347 let mut messages = Vec::with_capacity(count);
348 for _ in 0..count {
349 messages.push(self.receive().await?);
350 }
351 Ok(messages)
352 }
353
354 async fn ack_multi(&self, message_ids: &[String]) -> DMSCResult<()> {
355 for id in message_ids {
356 self.ack(id).await?;
357 }
358 Ok(())
359 }
360}
361
362/// Main queue trait defining queue operations.
363///
364/// This trait defines the core operations for queues, including creating producers and consumers,
365/// getting statistics, purging queues, and deleting queues.
366#[async_trait]
367pub trait DMSCQueue: Send + Sync {
368 /// Creates a new producer for this queue.
369 ///
370 /// # Returns
371 ///
372 /// A `DMSCResult<Box<dyn DMSCQueueProducer>>` containing the producer
373 async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>>;
374
375 /// Creates a new consumer for this queue with the given consumer group.
376 ///
377 /// # Parameters
378 ///
379 /// - `consumer_group`: The name of the consumer group
380 ///
381 /// # Returns
382 ///
383 /// A `DMSCResult<Box<dyn DMSCQueueConsumer>>` containing the consumer
384 async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>>;
385
386 /// Gets statistics for this queue.
387 ///
388 /// # Returns
389 ///
390 /// A `DMSCResult<DMSCQueueStats>` containing the queue statistics
391 async fn get_stats(&self) -> DMSCResult<DMSCQueueStats>;
392
393 /// Purges all messages from this queue.
394 ///
395 /// # Returns
396 ///
397 /// A `DMSCResult<()>` indicating success or failure
398 async fn purge(&self) -> DMSCResult<()>;
399
400 /// Deletes this queue.
401 ///
402 /// # Returns
403 ///
404 /// A `DMSCResult<()>` indicating success or failure
405 async fn delete(&self) -> DMSCResult<()>;
406}