dmsc/c/queue.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18//! # Queue Module C API
19//!
20//! This module provides C language bindings for DMSC's message queue infrastructure. The queue module
21//! delivers high-performance asynchronous message processing with reliable delivery guarantees, multiple
22//! queue semantics, and comprehensive routing capabilities. This C API enables C/C++ applications to
23//! leverage DMSC's messaging functionality for building event-driven architectures, task distribution
24//! systems, and distributed processing pipelines.
25//!
26//! ## Module Architecture
27//!
28//! The queue module comprises three primary components that together provide complete messaging
29//! functionality:
30//!
31//! - **DMSCQueueConfig**: Configuration container for queue parameters including queue type selection,
32//! delivery guarantees, persistence settings, and consumer group configuration. The configuration object
33//! controls queue behavior, resource allocation, and operational characteristics.
34//!
35//! - **DMSCQueueManager**: Central manager for queue lifecycle, message routing, and subscription
36//! management. The manager handles the complete messaging workflow including message production,
37//! consumption, acknowledgment, and dead-letter handling.
38//!
39//! - **DMSCQueueMessage**: Message abstraction representing individual messages in the queue system.
40//! Messages encapsulate payload data, metadata, headers, delivery properties, and routing information.
41//!
42//! ## Queue Types
43//!
44//! The queue system supports multiple queue semantics for different use cases:
45//!
46//! - **FIFO (First-In-First-Out) Queues**: Standard message ordering where messages are delivered
47//! in the exact order they were produced. Essential for sequential processing requirements.
48//!
49//! - **Priority Queues**: Messages are delivered based on priority levels rather than arrival order.
50//! High-priority messages skip ahead of lower-priority messages in the delivery sequence.
51//!
52//! - **Work Queues (Task Queues)**: Multiple workers compete for messages, with each message processed
53//! by exactly one worker. Enables horizontal scaling of processing capacity.
54//!
55//! - **Publish-Subscribe Queues**: One message published to the queue is delivered to all subscribed
56//! consumers. Enables broadcast patterns for event distribution.
57//!
58//! - **Delay Queues**: Messages are held invisible for a configurable delay period before becoming
59//! available for consumption. Useful for retry logic and scheduled processing.
60//!
61//! - **Dead Letter Queues**: Messages that fail processing after multiple attempts are moved to
62//! a separate queue for later inspection and manual handling.
63//!
64//! ## Delivery Guarantees
65//!
66//! The messaging system provides configurable delivery semantics:
67//!
68//! - **At-Most-Once Delivery**: Messages are delivered zero or one time. No duplication possible,
69//! but messages may be lost. Highest performance, lowest reliability.
70//!
71//! - **At-Least-Once Delivery**: Messages are guaranteed to be delivered at least once. Duplicates
72//! possible, but no messages lost. Requires idempotent message handlers.
73//!
74//! - **Exactly-Once Delivery**: Messages are delivered exactly one time. No duplication, no loss.
75//! Most complex and highest overhead. Achieved through deduplication and coordination.
76//!
77//! - **Transactional Delivery**: Messages are produced and consumed within database transactions.
78//! Ensures atomicity across message and data operations.
79//!
80//! ## Message Properties
81//!
82//! Each message carries comprehensive metadata:
83//!
84//! - **Payload**: The actual message content, stored as bytes. Can be JSON, binary, protobuf,
85//! or any custom format the application requires.
86//!
87//! - **Message ID**: Unique identifier for deduplication and tracking. Generated by the system
88//! or optionally specified by the producer.
89//!
90//! - **Correlation ID**: Application-defined identifier for relating messages to each other.
91//! Useful for request-response correlation and tracing.
92//!
93//! - **Timestamp**: When the message was published. Used for ordering and TTL calculations.
94//!
95//! - **Priority**: Message priority level (if supported by queue type). Affects delivery order.
96//!
97//! - **Delay**: Configurable delay before message becomes visible. Supports retry and scheduling.
98//!
99//! - **TTL (Time-To-Live)**: Maximum time message can remain in queue. Expired messages are
100//! removed or moved to dead letter queue.
101//!
102//! - **Headers**: Key-value metadata pairs for routing and processing hints. Similar to HTTP
103//! headers in purpose.
104//!
105//! ## Consumer Groups
106//!
107//! The queue system supports sophisticated consumer patterns:
108//!
109//! - **Shared Consumption**: Multiple consumers share messages from a queue, each message processed
110//! by one consumer. Enables load balancing across consumers.
111//!
112//! - **Exclusive Consumption**: One consumer receives all messages from a queue. Other consumers
113//! are blocked. Useful when ordering or stateful processing is required.
114//!
115//! - **Fan-Out**: Messages are replicated to multiple queues for independent consumption.
116//! Enables parallel processing pipelines from a single source.
117//!
118//! - **Consumer Lag Tracking**: Monitor how far behind consumers are from the message production
119//! rate. Used for capacity planning and alerting.
120//!
121//! ## Acknowledgment Patterns
122//!
123//! Message acknowledgment controls delivery guarantees:
124//!
125//! - **Auto-Acknowledge**: Messages are considered delivered immediately upon receipt. Simplest
126//! pattern but risks message loss on consumer failure.
127//!
128//! - **Manual Acknowledge**: Consumer explicitly acknowledges successful processing. Messages
129//! are only removed after successful ack. Supports reliable processing.
130//!
131//! - **Negative Acknowledge (Nack)**: Consumer signals processing failure, returning message
132//! to the queue for redelivery. Can optionally increase retry count.
133//!
134//! - **Multiple Acknowledge**: Batch multiple messages with a single acknowledgment call.
135//! Improves throughput for high-volume scenarios.
136//!
137//! ## Reliability Features
138//! The messaging system implements comprehensive reliability mechanisms:
139//!
140//! - **Message Persistence**: Messages written to durable storage survive broker restarts.
141//! Configurable durability levels balance performance and reliability.
142//!
143//! - **Replication**: Messages copied to multiple brokers for fault tolerance. Configurable
144//! replication factor determines fault tolerance level.
145//!
146//! - **Checkpointing**: Consumers periodically checkpoint their progress. On restart, consumers
147//! resume from the last checkpoint rather than the beginning.
148//!
149//! - **Idempotent Producers**: Duplicate message detection using sequence numbers and deduplication
150//! windows. Ensures exactly-once semantics despite retries.
151//!
152//! ## Performance Characteristics
153//!
154//! Queue operations are optimized for high throughput:
155//!
156//! - **Message Production**: O(1) for single message, O(n) for batching
157//! - **Message Consumption**: O(1) for retrieval with proper indexing
158//! - **Queue Creation**: O(1) for standard queues
159//! - **Throughput**: Millions of messages per second on modern hardware
160//! - **Latency**: Sub-millisecond end-to-end latency for local queues
161//!
162//! ## Memory Management
163//!
164//! All C API objects use opaque pointers with manual memory management:
165//!
166//! - Constructor functions allocate new instances on the heap
167//! - Destructor functions must be called to release memory
168//! - Message payloads must be freed appropriately
169//! - Queue managers coordinate resource cleanup
170//!
171//! ## Thread Safety
172//!
173//! The underlying implementations are thread-safe:
174//!
175//! - Concurrent message production from multiple threads supported
176//! - Multiple consumers can process messages concurrently
177//! - Queue operations use internal synchronization
178//! - Message handling should be idempotent for concurrent processing
179//!
180//! ## Usage Example
181//!
182//! ```c
183//! // Create queue configuration
184//! DMSCQueueConfig* config = dmsc_queue_config_new();
185//! if (config == NULL) {
186//! fprintf(stderr, "Failed to create queue config\n");
187//! return ERROR_INIT;
188//! }
189//!
190//! // Configure queue settings
191//! dmsc_queue_config_set_queue_type(config, QUEUE_TYPE_FIFO);
192//! dmsc_queue_config_set_delivery_guarantee(config, DELIVERY_AT_LEAST_ONCE);
193//! dmsc_queue_config_set_persistence_enabled(config, true);
194//! dmsc_queue_config_set_consumer_count(config, 4);
195//!
196//! // Create queue manager
197//! DMSCQueueManager* manager = dmsc_queue_manager_new(config);
198//! if (manager == NULL) {
199//! fprintf(stderr, "Failed to create queue manager\n");
200//! dmsc_queue_config_free(config);
201//! return ERROR_INIT;
202//! }
203//!
204//! // Create a message
205//! DMSCQueueMessage* message = dmsc_queue_message_new();
206//! if (message == NULL) {
207//! fprintf(stderr, "Failed to create message\n");
208//! dmsc_queue_manager_free(manager);
209//! dmsc_queue_config_free(config);
210//! return ERROR_INIT;
211//! }
212//!
213//! // Configure message
214//! const char* payload = "{\"event\": \"user_login\", \"user_id\": 12345}";
215//! dmsc_queue_message_set_payload(message, payload, strlen(payload));
216//! dmsc_queue_message_set_correlation_id(message, "login-2024-001");
217//! dmsc_queue_message_set_priority(message, 5);
218//!
219//! // Set headers
220//! dmsc_queue_message_set_header(message, "source", "auth-service");
221//! dmsc_queue_message_set_header(message, "version", "1.0");
222//!
223//! // Publish message to queue
224//! int result = dmsc_queue_manager_publish(manager, "user-events", message);
225//! if (result != 0) {
226//! fprintf(stderr, "Failed to publish message: %d\n", result);
227//! dmsc_queue_message_free(message);
228//! dmsc_queue_manager_free(manager);
229//! dmsc_queue_config_free(config);
230//! return ERROR_PUBLISH;
231//! }
232//!
233//! printf("Message published successfully\n");
234//!
235//! // Consume messages (blocking)
236//! DMSCQueueMessage* consumed = NULL;
237//! result = dmsc_queue_manager_consume(manager, "user-events", &consumed, 10000);
238//!
239//! if (result == 0 && consumed != NULL) {
240//! // Process message
241//! const char* received_payload = dmsc_queue_message_get_payload(consumed);
242//! size_t payload_size = dmsc_queue_message_get_payload_size(consumed);
243//!
244//! printf("Received: %.*s\n", (int)payload_size, received_payload);
245//!
246//! // Get message metadata
247//! const char* msg_id = dmsc_queue_message_get_id(consumed);
248//! const char* corr_id = dmsc_queue_message_get_correlation_id(consumed);
249//! uint64_t timestamp = dmsc_queue_message_get_timestamp(consumed);
250//!
251//! // Process message...
252//!
253//! // Acknowledge successful processing
254//! dmsc_queue_manager_ack(manager, consumed);
255//!
256//! dmsc_queue_message_free(consumed);
257//! } else if (result == TIMEOUT) {
258//! printf("No messages available within timeout\n");
259//! } else {
260//! fprintf(stderr, "Consume error: %d\n", result);
261//! }
262//!
263//! // Subscribe to a queue for continuous consumption
264//! DMSCConsumerHandle* consumer = dmsc_queue_manager_subscribe(
265//! manager,
266//! "user-events",
267//! message_handler_callback,
268//! NULL // user data
269//! );
270//!
271//! if (consumer != NULL) {
272//! // Consumer runs in background
273//! printf("Consumer started, processing messages...\n");
274//!
275//! // Application continues running...
276//!
277//! // Stop consumer when done
278//! dmsc_queue_manager_unsubscribe(consumer);
279//! }
280//!
281//! // Cleanup
282//! dmsc_queue_message_free(message);
283//! dmsc_queue_manager_free(manager);
284//! dmsc_queue_config_free(config);
285//! ```
286//!
287//! ## Message Handler Callback
288//!
289//! Message handlers must conform to the following signature:
290//!
291//! ```c
292//! typedef int (*DMSQueueMessageHandler)(
293//! DMSCQueueManager* manager,
294//! DMSCQueueMessage* message,
295//! void* user_data
296//! );
297//! ```
298//!
299//! Return values:
300//!
301//! - 0: Success, message will be acknowledged
302//! - Positive: Success with value, message acknowledged
303//! - Negative: Error, message will be nacked and retried
304//!
305//! ## Dependencies
306//!
307//! This module depends on the following DMSC components:
308//!
309//! - `crate::queue`: Rust queue module implementation
310//! - `crate::prelude`: Common types and traits
311//! - Async runtime for non-blocking operations
312//!
313//! ## Feature Flags
314//!
315//! The queue module is enabled by the "queue" feature flag.
316//! Disable this feature to reduce binary size when messaging is not required.
317//!
318//! Additional features:
319//!
320//! - `queue-persistence`: Enable message persistence to disk
321//! - `queue-rabbitmq`: Enable RabbitMQ backend support
322//! - `queue-kafka`: Enable Apache Kafka backend support
323//! - `queue-sqs`: Enable AWS SQS backend support
324
325use crate::queue::{DMSCQueueConfig, DMSCQueueManager, DMSCQueueMessage};
326
327
328c_wrapper!(CDMSCQueueConfig, DMSCQueueConfig);
329c_wrapper!(CDMSCQueueManager, DMSCQueueManager);
330c_wrapper!(CDMSCQueueMessage, DMSCQueueMessage);
331
332// DMSCQueueConfig constructors and destructors
333c_constructor!(
334 dmsc_queue_config_new,
335 CDMSCQueueConfig,
336 DMSCQueueConfig,
337 DMSCQueueConfig::default()
338);
339c_destructor!(dmsc_queue_config_free, CDMSCQueueConfig);