dmsc/queue/backends/
kafka_backend.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! you may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20use async_trait::async_trait;
21use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
22use rdkafka::client::DefaultClientContext;
23use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
24use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
25use rdkafka::message::{BorrowedHeaders, Headers, Message};
26use rdkafka::producer::{FutureProducer, FutureRecord};
27use rdkafka::topic_partition_list::TopicPartitionList;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::Mutex;
32use crate::core::{DMSCError, DMSCResult};
33use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
34
35type KafkaConsumer = StreamConsumer<DefaultConsumerContext>;
36
37#[allow(dead_code)]
38#[derive(Clone)]
39#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
40pub struct DMSCKafkaQueue {
41    brokers: String,
42    topic: String,
43    producer: Arc<FutureProducer>,
44    consumer: Arc<KafkaConsumer>,
45    admin_client: Arc<AdminClient<DefaultClientContext>>,
46}
47
48impl DMSCKafkaQueue {
49    pub async fn new(brokers: &str, topic: &str) -> DMSCResult<Self> {
50        let mut config = ClientConfig::new();
51        config.set("bootstrap.servers", brokers);
52        config.set("message.timeout.ms", "30000");
53        config.set("request.timeout.ms", "10000");
54        config.set("session.timeout.ms", "30000");
55        config.set("enable.auto.commit", "false");
56        config.set("auto.offset.reset", "earliest");
57        config.set_log_level(RDKafkaLogLevel::Warning);
58
59        let producer: FutureProducer = config
60            .create::<FutureProducer>()
61            .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka producer: {}", e)))?;
62
63        let consumer: KafkaConsumer = config
64            .create::<KafkaConsumer>()
65            .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka consumer: {}", e)))?;
66
67        let admin_client: AdminClient<DefaultClientContext> = config
68            .create::<AdminClient<DefaultClientContext>>()
69            .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka admin client: {}", e)))?;
70
71        let queue = Self {
72            brokers: brokers.to_string(),
73            topic: topic.to_string(),
74            producer: Arc::new(producer),
75            consumer: Arc::new(consumer),
76            admin_client: Arc::new(admin_client),
77        };
78
79        queue.ensure_topic_exists().await?;
80
81        Ok(queue)
82    }
83
84    async fn ensure_topic_exists(&self) -> DMSCResult<()> {
85        let metadata = self.consumer.fetch_metadata(None, Duration::from_secs(5))
86            .map_err(|e| DMSCError::Queue(format!("Failed to get Kafka metadata: {}", e)))?;
87
88        let topic_exists = metadata.topics().iter().any(|t| t.name() == self.topic);
89
90        if !topic_exists {
91            let new_topic = NewTopic::new(&self.topic, 1, TopicReplication::Fixed(1));
92            let admin_options = AdminOptions::new();
93
94            self.admin_client.create_topics(std::slice::from_ref(&new_topic), &admin_options).await
95                .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka topic: {}", e)))?;
96
97            tokio::time::sleep(Duration::from_secs(1)).await;
98        }
99
100        Ok(())
101    }
102
103    async fn get_topic_metadata(&self) -> DMSCResult<i32> {
104        let metadata = self.consumer.fetch_metadata(Some(&self.topic), Duration::from_secs(5))
105            .map_err(|e| DMSCError::Queue(format!("Failed to get Kafka metadata: {}", e)))?;
106
107        if let Some(topic_meta) = metadata.topics().first() {
108            Ok(topic_meta.partitions().len() as i32)
109        } else {
110            Ok(0)
111        }
112    }
113}
114
115#[async_trait]
116impl DMSCQueue for DMSCKafkaQueue {
117    async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
118        Ok(Box::new(KafkaQueueProducer {
119            producer: self.producer.clone(),
120            topic: self.topic.clone(),
121        }))
122    }
123
124    async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
125        let consumer = self.consumer.clone();
126
127        let mut partition_list = TopicPartitionList::new();
128        let partition_count = self.get_topic_metadata().await?;
129
130        for i in 0..partition_count.max(1) {
131            partition_list.add_partition(&self.topic, i);
132        }
133
134        consumer.assign(&partition_list)
135            .map_err(|e| DMSCError::Queue(format!("Failed to assign partitions: {}", e)))?;
136
137        Ok(Box::new(KafkaQueueConsumer {
138            consumer,
139            topic: self.topic.clone(),
140            consumer_group: consumer_group.to_string(),
141            paused: Arc::new(Mutex::new(false)),
142        }))
143    }
144
145    async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
146        let _partition_count = self.get_topic_metadata().await?;
147        let topic = self.topic.clone();
148
149        Ok(DMSCQueueStats {
150            queue_name: topic.clone(),
151            message_count: 0,
152            consumer_count: 1,
153            producer_count: 1,
154            processed_messages: 0,
155            failed_messages: 0,
156            avg_processing_time_ms: 0.0,
157            total_bytes_sent: 0,
158            total_bytes_received: 0,
159            last_message_time: 0,
160        })
161    }
162
163    async fn purge(&self) -> DMSCResult<()> {
164        let admin_options = AdminOptions::new();
165        self.admin_client.delete_topics(std::slice::from_ref(&self.topic.as_str()), &admin_options).await
166            .map_err(|e| DMSCError::Queue(format!("Failed to purge Kafka topic: {}", e)))?;
167
168        tokio::time::sleep(Duration::from_secs(1)).await;
169        self.ensure_topic_exists().await?;
170
171        Ok(())
172    }
173
174    async fn delete(&self) -> DMSCResult<()> {
175        self.purge().await
176    }
177}
178
179pub struct KafkaQueueProducer {
180    producer: Arc<FutureProducer>,
181    topic: String,
182}
183
184#[async_trait]
185impl DMSCQueueProducer for KafkaQueueProducer {
186    async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
187        let payload = if message.payload.is_empty() {
188            vec![]
189        } else {
190            message.payload
191        };
192
193        let key = message.id.as_bytes();
194
195        let future_record = FutureRecord::to(&self.topic)
196            .key(key)
197            .payload(&payload);
198
199        self.producer.send(future_record, Duration::from_secs(10)).await
200            .map_err(|(e, _)| DMSCError::Queue(format!("Failed to send message to Kafka: {}", e)))?;
201
202        Ok(())
203    }
204
205    async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
206        for message in messages {
207            self.send(message).await?;
208        }
209        Ok(())
210    }
211}
212
213#[allow(dead_code)]
214pub struct KafkaQueueConsumer {
215    consumer: Arc<KafkaConsumer>,
216    topic: String,
217    consumer_group: String,
218    paused: Arc<Mutex<bool>>,
219}
220
221#[async_trait]
222impl DMSCQueueConsumer for KafkaQueueConsumer {
223    async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
224        let paused = *self.paused.lock().await;
225        if paused {
226            return Ok(None);
227        }
228
229        let message = tokio::time::timeout(Duration::from_secs(5), self.consumer.recv()).await;
230
231        match message {
232            Ok(Ok(msg)) => {
233                let payload = msg.payload().unwrap_or(&[]).to_vec();
234                let key = msg.key().map(|k| String::from_utf8_lossy(k).to_string()).unwrap_or_default();
235                let timestamp = msg.timestamp().to_millis().unwrap_or(0) as u64;
236
237                let headers: HashMap<String, String> = msg.headers()
238                    .map(|h: &BorrowedHeaders| {
239                        h.iter().filter_map(|header| {
240                            header.value.map(|v| (header.key.to_string(), String::from_utf8_lossy(v).to_string()))
241                        }).collect()
242                    })
243                    .unwrap_or_default();
244
245                let message = DMSCQueueMessage {
246                    id: key,
247                    payload,
248                    headers,
249                    timestamp: std::time::UNIX_EPOCH + Duration::from_millis(timestamp),
250                    retry_count: 0,
251                    max_retries: 3,
252                };
253
254                Ok(Some(message))
255            }
256            Ok(Err(e)) => Err(DMSCError::Queue(format!("Kafka receive error: {}", e))),
257            Err(_) => Ok(None),
258        }
259    }
260
261    async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
262        self.consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync)
263            .map_err(|e| DMSCError::Queue(format!("Failed to commit offset: {}", e)))?;
264        Ok(())
265    }
266
267    async fn nack(&self, message_id: &str) -> DMSCResult<()> {
268        log::info!("Message negatively acknowledged: {}", message_id);
269        Ok(())
270    }
271
272    async fn pause(&self) -> DMSCResult<()> {
273        let mut paused = self.paused.lock().await;
274        *paused = true;
275        Ok(())
276    }
277
278    async fn resume(&self) -> DMSCResult<()> {
279        let mut paused = self.paused.lock().await;
280        *paused = false;
281        Ok(())
282    }
283}