Skip to main content

ri/queue/
core.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of Ri.
4//! The Ri project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Queue Core Implementation
21//! 
22//! This file defines the core queueing interfaces and message structures for the Ri queue system.
23//! It provides the fundamental building blocks for implementing various queue backends.
24//! 
25//! ## Key Components
26//! 
27//! - **RiQueueMessage**: Message structure for queue operations
28//! - **QueueStats**: Statistics for queue monitoring
29//! - **RiQueueProducer**: Trait for producing messages to queues
30//! - **RiQueueConsumer**: Trait for consuming messages from queues
31//! - **RiQueue**: Main queue trait defining queue operations
32//! 
33//! ## Design Principles
34//! 
35//! 1. **Async-First**: All queue operations are asynchronous
36//! 2. **Type Safety**: Strongly typed message structures
37//! 3. **Retry Mechanism**: Built-in support for message retry with configurable maximum attempts
38//! 4. **Header Support**: Allows adding custom headers to messages
39//! 5. **Statistics**: Comprehensive queue statistics for monitoring
40//! 6. **Extensible**: Easy to implement new queue backends
41//! 7. **Thread-safe**: All traits are Send + Sync for safe concurrent use
42//! 
43//! ## Usage
44//! 
45//! ```rust
46//! use ri::queue::{RiQueueMessage, RiQueueProducer, RiQueueConsumer, RiQueue};
47//! use ri::core::RiResult;
48//! use serde_json::json;
49//! 
50//! async fn example(queue: &dyn RiQueue) -> RiResult<()> {
51//!     // Create a producer
52//!     let producer = queue.create_producer().await?;
53//!     
54//!     // Create a message
55//!     let payload = json!({ "key": "value" }).to_string().into_bytes();
56//!     let message = RiQueueMessage::new(payload)
57//!         .with_max_retries(5);
58//!     
59//!     // Send the message
60//!     producer.send(message).await?;
61//!     
62//!     // Create a consumer
63//!     let consumer = queue.create_consumer("consumer_group_1").await?;
64//!     
65//!     // Receive a message
66//!     if let Some(message) = consumer.receive().await? {
67//!         // Process the message
68//!         let payload = String::from_utf8_lossy(&message.payload);
69//!         println!("Received message: {}", payload);
70//!         
71//!         // Acknowledge the message
72//!         consumer.ack(&message.id).await?;
73//!     }
74//!     
75//!     Ok(())
76//! }
77//! ```
78
79use async_trait::async_trait;
80use serde::{Serialize, Deserialize};
81use std::collections::HashMap;
82use std::time::SystemTime;
83use crate::core::RiResult;
84
85/// Error types for queue operations.
86#[derive(Debug, Clone)]
87pub enum RiQueueError {
88    /// Backend-specific error with descriptive message
89    BackendError(String),
90    /// Configuration error
91    ConfigError(String),
92    /// Connection error
93    ConnectionError(String),
94    /// Message not found
95    MessageNotFound(String),
96    /// Consumer group error
97    ConsumerGroupError(String),
98    /// Serialization error
99    SerializationError(String),
100}
101
102impl std::fmt::Display for RiQueueError {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match self {
105            RiQueueError::BackendError(msg) => write!(f, "Queue backend error: {}", msg),
106            RiQueueError::ConfigError(msg) => write!(f, "Queue configuration error: {}", msg),
107            RiQueueError::ConnectionError(msg) => write!(f, "Queue connection error: {}", msg),
108            RiQueueError::MessageNotFound(msg) => write!(f, "Message not found: {}", msg),
109            RiQueueError::ConsumerGroupError(msg) => write!(f, "Consumer group error: {}", msg),
110            RiQueueError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
111        }
112    }
113}
114
115impl std::error::Error for RiQueueError {}
116
117impl From<RiQueueError> for crate::core::RiError {
118    fn from(error: RiQueueError) -> Self {
119        crate::core::RiError::Queue(error.to_string())
120    }
121}
122
123/// Message structure for queue operations.
124/// 
125/// This struct represents a message that can be sent to and received from queues. It includes
126/// a unique ID, payload, headers, timestamp, and retry information.
127#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct RiQueueMessage {
130    /// Unique message ID
131    pub id: String,
132    /// Message payload as bytes
133    pub payload: Vec<u8>,
134    /// Custom headers for the message
135    pub headers: HashMap<String, String>,
136    /// Timestamp when the message was created
137    pub timestamp: SystemTime,
138    /// Number of times this message has been retried
139    pub retry_count: u32,
140    /// Maximum number of retry attempts allowed
141    pub max_retries: u32,
142}
143
144impl RiQueueMessage {
145    /// Creates a new message with the given payload.
146    /// 
147    /// # Parameters
148    /// 
149    /// - `payload`: The message payload as bytes
150    /// 
151    /// # Returns
152    /// 
153    /// A new `RiQueueMessage` instance
154    pub fn new(payload: Vec<u8>) -> Self {
155        Self {
156            id: uuid::Uuid::new_v4().to_string(),
157            payload,
158            headers: HashMap::new(),
159            timestamp: SystemTime::now(),
160            retry_count: 0,
161            max_retries: 3,
162        }
163    }
164
165    /// Adds custom headers to the message.
166    /// 
167    /// # Parameters
168    /// 
169    /// - `headers`: A HashMap of custom headers
170    /// 
171    /// # Returns
172    /// 
173    /// The updated `RiQueueMessage` instance
174    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
175        self.headers = headers;
176        self
177    }
178
179    /// Sets the maximum number of retry attempts for this message.
180    /// 
181    /// # Parameters
182    /// 
183    /// - `max_retries`: The maximum number of retry attempts
184    /// 
185    /// # Returns
186    /// 
187    /// The updated `RiQueueMessage` instance
188    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
189        self.max_retries = max_retries;
190        self
191    }
192
193    /// Increments the retry count for this message.
194    pub fn increment_retry(&mut self) {
195        self.retry_count += 1;
196    }
197
198    /// Checks if this message can be retried.
199    /// 
200    /// # Returns
201    /// 
202    /// `true` if the message can be retried, `false` otherwise
203    pub fn can_retry(&self) -> bool {
204        self.retry_count < self.max_retries
205    }
206}
207
208#[cfg(feature = "pyo3")]
209/// Python bindings for RiQueueMessage
210#[pyo3::prelude::pymethods]
211impl RiQueueMessage {
212    #[new]
213    fn py_new(payload: Vec<u8>) -> Self {
214        Self::new(payload)
215    }
216    
217    #[staticmethod]
218    fn py_new_with_string(payload: String) -> Self {
219        Self::new(payload.into_bytes())
220    }
221    
222    fn get_payload_string(&self) -> String {
223        String::from_utf8_lossy(&self.payload).to_string()
224    }
225    
226    fn get_id(&self) -> String {
227        self.id.clone()
228    }
229}
230
231/// Statistics for queue monitoring.
232/// 
233/// This struct contains comprehensive statistics about a queue's performance and usage.
234#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
235#[derive(Debug, Clone)]
236pub struct RiQueueStats {
237    /// Name of the queue
238    pub queue_name: String,
239    /// Current number of messages in the queue
240    pub message_count: u64,
241    /// Number of active consumers
242    pub consumer_count: u32,
243    /// Number of active producers
244    pub producer_count: u32,
245    /// Total number of processed messages
246    pub processed_messages: u64,
247    /// Total number of failed messages
248    pub failed_messages: u64,
249    /// Average processing time in milliseconds
250    pub avg_processing_time_ms: f64,
251    /// Total bytes sent
252    pub total_bytes_sent: u64,
253    /// Total bytes received
254    pub total_bytes_received: u64,
255    /// Timestamp of last message (milliseconds since start)
256    pub last_message_time: u64,
257}
258
259#[cfg(feature = "pyo3")]
260/// Python bindings for RiQueueStats
261#[pyo3::prelude::pymethods]
262impl RiQueueStats {
263    #[new]
264    fn py_new(queue_name: String) -> Self {
265        Self {
266            queue_name,
267            message_count: 0,
268            consumer_count: 0,
269            producer_count: 0,
270            processed_messages: 0,
271            failed_messages: 0,
272            avg_processing_time_ms: 0.0,
273            total_bytes_sent: 0,
274            total_bytes_received: 0,
275            last_message_time: 0,
276        }
277    }
278}
279
280/// Trait for producing messages to queues.
281/// 
282/// This trait defines the interface for sending messages to queues, including single message
283/// sends and batch sends.
284#[async_trait]
285pub trait RiQueueProducer: Send + Sync {
286    async fn send(&self, message: RiQueueMessage) -> RiResult<()>;
287    
288    async fn send_batch(&self, messages: Vec<RiQueueMessage>) -> RiResult<()>;
289
290    async fn send_multi(&self, messages: &[RiQueueMessage]) -> RiResult<()> {
291        for message in messages {
292            self.send(message.clone()).await?;
293        }
294        Ok(())
295    }
296}
297
298/// Trait for consuming messages from queues.
299/// 
300/// This trait defines the interface for receiving and acknowledging messages from queues.
301#[async_trait]
302pub trait RiQueueConsumer: Send + Sync {
303    /// Receives a message from the queue.
304    /// 
305    /// # Returns
306    /// 
307    /// A `RiResult<Option<RiQueueMessage>>` containing the message if available, or None if no message is available
308    async fn receive(&self) -> RiResult<Option<RiQueueMessage>>;
309    
310    /// Acknowledges a message, indicating it has been successfully processed.
311    /// 
312    /// # Parameters
313    /// 
314    /// - `message_id`: The ID of the message to acknowledge
315    /// 
316    /// # Returns
317    /// 
318    /// A `RiResult<()>` indicating success or failure
319    async fn ack(&self, message_id: &str) -> RiResult<()>;
320    
321    /// Negatively acknowledges a message, indicating it failed to process and should be retried.
322    /// 
323    /// # Parameters
324    /// 
325    /// - `message_id`: The ID of the message to negatively acknowledge
326    /// 
327    /// # Returns
328    /// 
329    /// A `RiResult<()>` indicating success or failure
330    async fn nack(&self, message_id: &str) -> RiResult<()>;
331    
332    /// Pauses message consumption.
333    /// 
334    /// # Returns
335    /// 
336    /// A `RiResult<()>` indicating success or failure
337    async fn pause(&self) -> RiResult<()>;
338    
339    /// Resumes message consumption after pausing.
340    /// 
341    /// # Returns
342    /// 
343    /// A `RiResult<()>` indicating success or failure
344    async fn resume(&self) -> RiResult<()>;
345
346    async fn receive_multi(&self, count: usize) -> RiResult<Vec<Option<RiQueueMessage>>> {
347        let mut messages = Vec::with_capacity(count);
348        for _ in 0..count {
349            messages.push(self.receive().await?);
350        }
351        Ok(messages)
352    }
353
354    async fn ack_multi(&self, message_ids: &[String]) -> RiResult<()> {
355        for id in message_ids {
356            self.ack(id).await?;
357        }
358        Ok(())
359    }
360}
361
362/// Main queue trait defining queue operations.
363/// 
364/// This trait defines the core operations for queues, including creating producers and consumers,
365/// getting statistics, purging queues, and deleting queues.
366#[async_trait]
367pub trait RiQueue: Send + Sync {
368    /// Creates a new producer for this queue.
369    /// 
370    /// # Returns
371    /// 
372    /// A `RiResult<Box<dyn RiQueueProducer>>` containing the producer
373    async fn create_producer(&self) -> RiResult<Box<dyn RiQueueProducer>>;
374    
375    /// Creates a new consumer for this queue with the given consumer group.
376    /// 
377    /// # Parameters
378    /// 
379    /// - `consumer_group`: The name of the consumer group
380    /// 
381    /// # Returns
382    /// 
383    /// A `RiResult<Box<dyn RiQueueConsumer>>` containing the consumer
384    async fn create_consumer(&self, consumer_group: &str) -> RiResult<Box<dyn RiQueueConsumer>>;
385    
386    /// Gets statistics for this queue.
387    /// 
388    /// # Returns
389    /// 
390    /// A `RiResult<RiQueueStats>` containing the queue statistics
391    async fn get_stats(&self) -> RiResult<RiQueueStats>;
392    
393    /// Purges all messages from this queue.
394    /// 
395    /// # Returns
396    /// 
397    /// A `RiResult<()>` indicating success or failure
398    async fn purge(&self) -> RiResult<()>;
399    
400    /// Deletes this queue.
401    /// 
402    /// # Returns
403    /// 
404    /// A `RiResult<()>` indicating success or failure
405    async fn delete(&self) -> RiResult<()>;
406}