Expand description
Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
This file is part of DMSC. The DMSC project belongs to the Dunimd Team.
Licensed under the Apache License, Version 2.0 (the “License”); You may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
§Queue Module C API
This module provides C language bindings for DMSC’s message queue infrastructure. The queue module delivers high-performance asynchronous message processing with reliable delivery guarantees, multiple queue semantics, and comprehensive routing capabilities. This C API enables C/C++ applications to leverage DMSC’s messaging functionality for building event-driven architectures, task distribution systems, and distributed processing pipelines.
§Module Architecture
The queue module comprises three primary components that together provide complete messaging functionality:
-
DMSCQueueConfig: Configuration container for queue parameters including queue type selection, delivery guarantees, persistence settings, and consumer group configuration. The configuration object controls queue behavior, resource allocation, and operational characteristics.
-
DMSCQueueManager: Central manager for queue lifecycle, message routing, and subscription management. The manager handles the complete messaging workflow including message production, consumption, acknowledgment, and dead-letter handling.
-
DMSCQueueMessage: Message abstraction representing individual messages in the queue system. Messages encapsulate payload data, metadata, headers, delivery properties, and routing information.
§Queue Types
The queue system supports multiple queue semantics for different use cases:
-
FIFO (First-In-First-Out) Queues: Standard message ordering where messages are delivered in the exact order they were produced. Essential for sequential processing requirements.
-
Priority Queues: Messages are delivered based on priority levels rather than arrival order. High-priority messages skip ahead of lower-priority messages in the delivery sequence.
-
Work Queues (Task Queues): Multiple workers compete for messages, with each message processed by exactly one worker. Enables horizontal scaling of processing capacity.
-
Publish-Subscribe Queues: One message published to the queue is delivered to all subscribed consumers. Enables broadcast patterns for event distribution.
-
Delay Queues: Messages are held invisible for a configurable delay period before becoming available for consumption. Useful for retry logic and scheduled processing.
-
Dead Letter Queues: Messages that fail processing after multiple attempts are moved to a separate queue for later inspection and manual handling.
§Delivery Guarantees
The messaging system provides configurable delivery semantics:
-
At-Most-Once Delivery: Messages are delivered zero or one time. No duplication possible, but messages may be lost. Highest performance, lowest reliability.
-
At-Least-Once Delivery: Messages are guaranteed to be delivered at least once. Duplicates possible, but no messages lost. Requires idempotent message handlers.
-
Exactly-Once Delivery: Messages are delivered exactly one time. No duplication, no loss. Most complex and highest overhead. Achieved through deduplication and coordination.
-
Transactional Delivery: Messages are produced and consumed within database transactions. Ensures atomicity across message and data operations.
§Message Properties
Each message carries comprehensive metadata:
-
Payload: The actual message content, stored as bytes. Can be JSON, binary, protobuf, or any custom format the application requires.
-
Message ID: Unique identifier for deduplication and tracking. Generated by the system or optionally specified by the producer.
-
Correlation ID: Application-defined identifier for relating messages to each other. Useful for request-response correlation and tracing.
-
Timestamp: When the message was published. Used for ordering and TTL calculations.
-
Priority: Message priority level (if supported by queue type). Affects delivery order.
-
Delay: Configurable delay before message becomes visible. Supports retry and scheduling.
-
TTL (Time-To-Live): Maximum time message can remain in queue. Expired messages are removed or moved to dead letter queue.
-
Headers: Key-value metadata pairs for routing and processing hints. Similar to HTTP headers in purpose.
§Consumer Groups
The queue system supports sophisticated consumer patterns:
-
Shared Consumption: Multiple consumers share messages from a queue, each message processed by one consumer. Enables load balancing across consumers.
-
Exclusive Consumption: One consumer receives all messages from a queue. Other consumers are blocked. Useful when ordering or stateful processing is required.
-
Fan-Out: Messages are replicated to multiple queues for independent consumption. Enables parallel processing pipelines from a single source.
-
Consumer Lag Tracking: Monitor how far behind consumers are from the message production rate. Used for capacity planning and alerting.
§Acknowledgment Patterns
Message acknowledgment controls delivery guarantees:
-
Auto-Acknowledge: Messages are considered delivered immediately upon receipt. Simplest pattern but risks message loss on consumer failure.
-
Manual Acknowledge: Consumer explicitly acknowledges successful processing. Messages are only removed after successful ack. Supports reliable processing.
-
Negative Acknowledge (Nack): Consumer signals processing failure, returning message to the queue for redelivery. Can optionally increase retry count.
-
Multiple Acknowledge: Batch multiple messages with a single acknowledgment call. Improves throughput for high-volume scenarios.
§Reliability Features
The messaging system implements comprehensive reliability mechanisms:
-
Message Persistence: Messages written to durable storage survive broker restarts. Configurable durability levels balance performance and reliability.
-
Replication: Messages copied to multiple brokers for fault tolerance. Configurable replication factor determines fault tolerance level.
-
Checkpointing: Consumers periodically checkpoint their progress. On restart, consumers resume from the last checkpoint rather than the beginning.
-
Idempotent Producers: Duplicate message detection using sequence numbers and deduplication windows. Ensures exactly-once semantics despite retries.
§Performance Characteristics
Queue operations are optimized for high throughput:
- Message Production: O(1) for single message, O(n) for batching
- Message Consumption: O(1) for retrieval with proper indexing
- Queue Creation: O(1) for standard queues
- Throughput: Millions of messages per second on modern hardware
- Latency: Sub-millisecond end-to-end latency for local queues
§Memory Management
All C API objects use opaque pointers with manual memory management:
- Constructor functions allocate new instances on the heap
- Destructor functions must be called to release memory
- Message payloads must be freed appropriately
- Queue managers coordinate resource cleanup
§Thread Safety
The underlying implementations are thread-safe:
- Concurrent message production from multiple threads supported
- Multiple consumers can process messages concurrently
- Queue operations use internal synchronization
- Message handling should be idempotent for concurrent processing
§Usage Example
// Create queue configuration
DMSCQueueConfig* config = dmsc_queue_config_new();
if (config == NULL) {
fprintf(stderr, "Failed to create queue config\n");
return ERROR_INIT;
}
// Configure queue settings
dmsc_queue_config_set_queue_type(config, QUEUE_TYPE_FIFO);
dmsc_queue_config_set_delivery_guarantee(config, DELIVERY_AT_LEAST_ONCE);
dmsc_queue_config_set_persistence_enabled(config, true);
dmsc_queue_config_set_consumer_count(config, 4);
// Create queue manager
DMSCQueueManager* manager = dmsc_queue_manager_new(config);
if (manager == NULL) {
fprintf(stderr, "Failed to create queue manager\n");
dmsc_queue_config_free(config);
return ERROR_INIT;
}
// Create a message
DMSCQueueMessage* message = dmsc_queue_message_new();
if (message == NULL) {
fprintf(stderr, "Failed to create message\n");
dmsc_queue_manager_free(manager);
dmsc_queue_config_free(config);
return ERROR_INIT;
}
// Configure message
const char* payload = "{\"event\": \"user_login\", \"user_id\": 12345}";
dmsc_queue_message_set_payload(message, payload, strlen(payload));
dmsc_queue_message_set_correlation_id(message, "login-2024-001");
dmsc_queue_message_set_priority(message, 5);
// Set headers
dmsc_queue_message_set_header(message, "source", "auth-service");
dmsc_queue_message_set_header(message, "version", "1.0");
// Publish message to queue
int result = dmsc_queue_manager_publish(manager, "user-events", message);
if (result != 0) {
fprintf(stderr, "Failed to publish message: %d\n", result);
dmsc_queue_message_free(message);
dmsc_queue_manager_free(manager);
dmsc_queue_config_free(config);
return ERROR_PUBLISH;
}
printf("Message published successfully\n");
// Consume messages (blocking)
DMSCQueueMessage* consumed = NULL;
result = dmsc_queue_manager_consume(manager, "user-events", &consumed, 10000);
if (result == 0 && consumed != NULL) {
// Process message
const char* received_payload = dmsc_queue_message_get_payload(consumed);
size_t payload_size = dmsc_queue_message_get_payload_size(consumed);
printf("Received: %.*s\n", (int)payload_size, received_payload);
// Get message metadata
const char* msg_id = dmsc_queue_message_get_id(consumed);
const char* corr_id = dmsc_queue_message_get_correlation_id(consumed);
uint64_t timestamp = dmsc_queue_message_get_timestamp(consumed);
// Process message...
// Acknowledge successful processing
dmsc_queue_manager_ack(manager, consumed);
dmsc_queue_message_free(consumed);
} else if (result == TIMEOUT) {
printf("No messages available within timeout\n");
} else {
fprintf(stderr, "Consume error: %d\n", result);
}
// Subscribe to a queue for continuous consumption
DMSCConsumerHandle* consumer = dmsc_queue_manager_subscribe(
manager,
"user-events",
message_handler_callback,
NULL // user data
);
if (consumer != NULL) {
// Consumer runs in background
printf("Consumer started, processing messages...\n");
// Application continues running...
// Stop consumer when done
dmsc_queue_manager_unsubscribe(consumer);
}
// Cleanup
dmsc_queue_message_free(message);
dmsc_queue_manager_free(manager);
dmsc_queue_config_free(config);§Message Handler Callback
Message handlers must conform to the following signature:
typedef int (*DMSQueueMessageHandler)(
DMSCQueueManager* manager,
DMSCQueueMessage* message,
void* user_data
);Return values:
- 0: Success, message will be acknowledged
- Positive: Success with value, message acknowledged
- Negative: Error, message will be nacked and retried
§Dependencies
This module depends on the following DMSC components:
crate::queue: Rust queue module implementationcrate::prelude: Common types and traits- Async runtime for non-blocking operations
§Feature Flags
The queue module is enabled by the “queue” feature flag. Disable this feature to reduce binary size when messaging is not required.
Additional features:
queue-persistence: Enable message persistence to diskqueue-rabbitmq: Enable RabbitMQ backend supportqueue-kafka: Enable Apache Kafka backend supportqueue-sqs: Enable AWS SQS backend support