1#![allow(non_snake_case)]
19
20use async_trait::async_trait;
21use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
22use rdkafka::client::DefaultClientContext;
23use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
24use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
25use rdkafka::message::{BorrowedHeaders, Headers, Message};
26use rdkafka::producer::{FutureProducer, FutureRecord};
27use rdkafka::topic_partition_list::TopicPartitionList;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::Mutex;
32use crate::core::{DMSCError, DMSCResult};
33use crate::queue::{DMSCQueue, DMSCQueueMessage, DMSCQueueProducer, DMSCQueueConsumer, DMSCQueueStats};
34
35type KafkaConsumer = StreamConsumer<DefaultConsumerContext>;
36
37#[allow(dead_code)]
38#[derive(Clone)]
39#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
40pub struct DMSCKafkaQueue {
41 brokers: String,
42 topic: String,
43 producer: Arc<FutureProducer>,
44 consumer: Arc<KafkaConsumer>,
45 admin_client: Arc<AdminClient<DefaultClientContext>>,
46}
47
48impl DMSCKafkaQueue {
49 pub async fn new(brokers: &str, topic: &str) -> DMSCResult<Self> {
50 let mut config = ClientConfig::new();
51 config.set("bootstrap.servers", brokers);
52 config.set("message.timeout.ms", "30000");
53 config.set("request.timeout.ms", "10000");
54 config.set("session.timeout.ms", "30000");
55 config.set("enable.auto.commit", "false");
56 config.set("auto.offset.reset", "earliest");
57 config.set_log_level(RDKafkaLogLevel::Warning);
58
59 let producer: FutureProducer = config
60 .create::<FutureProducer>()
61 .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka producer: {}", e)))?;
62
63 let consumer: KafkaConsumer = config
64 .create::<KafkaConsumer>()
65 .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka consumer: {}", e)))?;
66
67 let admin_client: AdminClient<DefaultClientContext> = config
68 .create::<AdminClient<DefaultClientContext>>()
69 .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka admin client: {}", e)))?;
70
71 let queue = Self {
72 brokers: brokers.to_string(),
73 topic: topic.to_string(),
74 producer: Arc::new(producer),
75 consumer: Arc::new(consumer),
76 admin_client: Arc::new(admin_client),
77 };
78
79 queue.ensure_topic_exists().await?;
80
81 Ok(queue)
82 }
83
84 async fn ensure_topic_exists(&self) -> DMSCResult<()> {
85 let metadata = self.consumer.fetch_metadata(None, Duration::from_secs(5))
86 .map_err(|e| DMSCError::Queue(format!("Failed to get Kafka metadata: {}", e)))?;
87
88 let topic_exists = metadata.topics().iter().any(|t| t.name() == self.topic);
89
90 if !topic_exists {
91 let new_topic = NewTopic::new(&self.topic, 1, TopicReplication::Fixed(1));
92 let admin_options = AdminOptions::new();
93
94 self.admin_client.create_topics(std::slice::from_ref(&new_topic), &admin_options).await
95 .map_err(|e| DMSCError::Queue(format!("Failed to create Kafka topic: {}", e)))?;
96
97 tokio::time::sleep(Duration::from_secs(1)).await;
98 }
99
100 Ok(())
101 }
102
103 async fn get_topic_metadata(&self) -> DMSCResult<i32> {
104 let metadata = self.consumer.fetch_metadata(Some(&self.topic), Duration::from_secs(5))
105 .map_err(|e| DMSCError::Queue(format!("Failed to get Kafka metadata: {}", e)))?;
106
107 if let Some(topic_meta) = metadata.topics().first() {
108 Ok(topic_meta.partitions().len() as i32)
109 } else {
110 Ok(0)
111 }
112 }
113}
114
115#[async_trait]
116impl DMSCQueue for DMSCKafkaQueue {
117 async fn create_producer(&self) -> DMSCResult<Box<dyn DMSCQueueProducer>> {
118 Ok(Box::new(KafkaQueueProducer {
119 producer: self.producer.clone(),
120 topic: self.topic.clone(),
121 }))
122 }
123
124 async fn create_consumer(&self, consumer_group: &str) -> DMSCResult<Box<dyn DMSCQueueConsumer>> {
125 let consumer = self.consumer.clone();
126
127 let mut partition_list = TopicPartitionList::new();
128 let partition_count = self.get_topic_metadata().await?;
129
130 for i in 0..partition_count.max(1) {
131 partition_list.add_partition(&self.topic, i);
132 }
133
134 consumer.assign(&partition_list)
135 .map_err(|e| DMSCError::Queue(format!("Failed to assign partitions: {}", e)))?;
136
137 Ok(Box::new(KafkaQueueConsumer {
138 consumer,
139 topic: self.topic.clone(),
140 consumer_group: consumer_group.to_string(),
141 paused: Arc::new(Mutex::new(false)),
142 }))
143 }
144
145 async fn get_stats(&self) -> DMSCResult<DMSCQueueStats> {
146 let _partition_count = self.get_topic_metadata().await?;
147 let topic = self.topic.clone();
148
149 Ok(DMSCQueueStats {
150 queue_name: topic.clone(),
151 message_count: 0,
152 consumer_count: 1,
153 producer_count: 1,
154 processed_messages: 0,
155 failed_messages: 0,
156 avg_processing_time_ms: 0.0,
157 total_bytes_sent: 0,
158 total_bytes_received: 0,
159 last_message_time: 0,
160 })
161 }
162
163 async fn purge(&self) -> DMSCResult<()> {
164 let admin_options = AdminOptions::new();
165 self.admin_client.delete_topics(std::slice::from_ref(&self.topic.as_str()), &admin_options).await
166 .map_err(|e| DMSCError::Queue(format!("Failed to purge Kafka topic: {}", e)))?;
167
168 tokio::time::sleep(Duration::from_secs(1)).await;
169 self.ensure_topic_exists().await?;
170
171 Ok(())
172 }
173
174 async fn delete(&self) -> DMSCResult<()> {
175 self.purge().await
176 }
177}
178
179pub struct KafkaQueueProducer {
180 producer: Arc<FutureProducer>,
181 topic: String,
182}
183
184#[async_trait]
185impl DMSCQueueProducer for KafkaQueueProducer {
186 async fn send(&self, message: DMSCQueueMessage) -> DMSCResult<()> {
187 let payload = if message.payload.is_empty() {
188 vec![]
189 } else {
190 message.payload
191 };
192
193 let key = message.id.as_bytes();
194
195 let future_record = FutureRecord::to(&self.topic)
196 .key(key)
197 .payload(&payload);
198
199 self.producer.send(future_record, Duration::from_secs(10)).await
200 .map_err(|(e, _)| DMSCError::Queue(format!("Failed to send message to Kafka: {}", e)))?;
201
202 Ok(())
203 }
204
205 async fn send_batch(&self, messages: Vec<DMSCQueueMessage>) -> DMSCResult<()> {
206 for message in messages {
207 self.send(message).await?;
208 }
209 Ok(())
210 }
211}
212
213#[allow(dead_code)]
214pub struct KafkaQueueConsumer {
215 consumer: Arc<KafkaConsumer>,
216 topic: String,
217 consumer_group: String,
218 paused: Arc<Mutex<bool>>,
219}
220
221#[async_trait]
222impl DMSCQueueConsumer for KafkaQueueConsumer {
223 async fn receive(&self) -> DMSCResult<Option<DMSCQueueMessage>> {
224 let paused = *self.paused.lock().await;
225 if paused {
226 return Ok(None);
227 }
228
229 let message = tokio::time::timeout(Duration::from_secs(5), self.consumer.recv()).await;
230
231 match message {
232 Ok(Ok(msg)) => {
233 let payload = msg.payload().unwrap_or(&[]).to_vec();
234 let key = msg.key().map(|k| String::from_utf8_lossy(k).to_string()).unwrap_or_default();
235 let timestamp = msg.timestamp().to_millis().unwrap_or(0) as u64;
236
237 let headers: HashMap<String, String> = msg.headers()
238 .map(|h: &BorrowedHeaders| {
239 h.iter().filter_map(|header| {
240 header.value.map(|v| (header.key.to_string(), String::from_utf8_lossy(v).to_string()))
241 }).collect()
242 })
243 .unwrap_or_default();
244
245 let message = DMSCQueueMessage {
246 id: key,
247 payload,
248 headers,
249 timestamp: std::time::UNIX_EPOCH + Duration::from_millis(timestamp),
250 retry_count: 0,
251 max_retries: 3,
252 };
253
254 Ok(Some(message))
255 }
256 Ok(Err(e)) => Err(DMSCError::Queue(format!("Kafka receive error: {}", e))),
257 Err(_) => Ok(None),
258 }
259 }
260
261 async fn ack(&self, _message_id: &str) -> DMSCResult<()> {
262 self.consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync)
263 .map_err(|e| DMSCError::Queue(format!("Failed to commit offset: {}", e)))?;
264 Ok(())
265 }
266
267 async fn nack(&self, message_id: &str) -> DMSCResult<()> {
268 log::info!("Message negatively acknowledged: {}", message_id);
269 Ok(())
270 }
271
272 async fn pause(&self) -> DMSCResult<()> {
273 let mut paused = self.paused.lock().await;
274 *paused = true;
275 Ok(())
276 }
277
278 async fn resume(&self) -> DMSCResult<()> {
279 let mut paused = self.paused.lock().await;
280 *paused = false;
281 Ok(())
282 }
283}