dmsc/device/discovery_scheduler.rs
1//! Copyright © 2025 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18use std::time::{Instant, Duration};
19use std::collections::{HashMap, VecDeque};
20use serde::{Serialize, Deserialize};
21use crate::device::{DMSCDevice, DMSCDeviceType, DMSCDeviceCapabilities};
22
23/// # Device Discovery and Resource Scheduling
24///
25/// This file implements advanced device discovery and resource scheduling algorithms for DMSC.
26/// It provides two main components:
27///
28/// 1. **DMSCDeviceDiscoveryEngine**: Advanced device discovery using machine learning and heuristics
29/// 2. **DMSCResourceScheduler**: Resource scheduling algorithm using performance history and policies
30///
31/// ## Design Principles
32///
33/// 1. **Machine Learning Integration**: Uses pattern recognition and confidence scoring for device identification
34/// 2. **Heuristic Optimization**: Implements intelligent resource allocation based on device capabilities and load
35/// 3. **Scalability**: Designed to handle large numbers of devices and requests
36/// 4. **Flexibility**: Supports custom scheduling policies and device fingerprints
37/// 5. **Performance Focus**: Optimizes for low latency and high throughput
38/// 6. **Adaptability**: Learns from discovery and performance history
39///
40/// ## Usage Examples
41///
42/// ```rust
43/// use dmsc::device::{DMSCDeviceDiscoveryEngine, DMSCResourceScheduler, ResourceRequest, DeviceScanResult};
44///
45/// fn example() {
46/// // Create discovery engine
47/// let mut discovery_engine = DMSCDeviceDiscoveryEngine::new();
48///
49/// // Create scan results
50/// let scan_results = vec![
51/// DeviceScanResult {
52/// device_id: "device-123".to_string(),
53/// device_name: "NVIDIA RTX 3090".to_string(),
54/// device_info: HashMap::from([
55/// ("device_name".to_string(), "NVIDIA RTX 3090".to_string()),
56/// ("driver".to_string(), "CUDA 12.0".to_string())
57/// ])
58/// }
59/// ];
60///
61/// // Discover devices
62/// let discovered_devices = discovery_engine.discover_devices(scan_results);
63///
64/// // Create resource scheduler
65/// let mut scheduler = DMSCResourceScheduler::new();
66///
67/// // Create resource request
68/// let request = ResourceRequest {
69/// request_id: "req-456".to_string(),
70/// required_memory_gb: Some(16.0),
71/// required_compute_units: Some(512),
72/// required_bandwidth_gbps: Some(900.0),
73/// required_custom_capabilities: HashMap::from([
74/// ("cuda_support".to_string(), "true".to_string())
75/// ]),
76/// priority: 5,
77/// deadline: None
78/// };
79///
80/// // Schedule resource
81/// let assigned_device = scheduler.schedule_resource(&request, &discovered_devices);
82/// }
83/// ```
84/// Advanced device discovery algorithm using machine learning and heuristics
85///
86/// This engine uses pattern recognition, confidence scoring, and historical data to identify
87/// devices with high accuracy. It maintains a database of device fingerprints and uses
88/// them to match discovered devices based on identification patterns.
89#[derive(Debug, Clone)]
90#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
91pub struct DMSCDeviceDiscoveryEngine {
92 /// Device fingerprint database mapping fingerprint IDs to their details
93 fingerprints: HashMap<String, DeviceFingerprint>,
94 /// Discovery history for pattern recognition and learning
95 discovery_history: VecDeque<DiscoveryRecord>,
96 /// Confidence threshold for device identification (0.0 to 1.0)
97 confidence_threshold: f64,
98}
99
100/// Device fingerprint containing identification patterns and capabilities
101///
102/// This struct represents a device fingerprint used for identifying devices during discovery.
103/// It contains identification patterns with weights that are used to calculate confidence scores.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105struct DeviceFingerprint {
106 /// Type of device this fingerprint identifies
107 device_type: DMSCDeviceType,
108 /// Capabilities associated with this device type
109 capabilities: DMSCDeviceCapabilities,
110 /// Identification patterns used to match this device type
111 identification_patterns: Vec<IdentificationPattern>,
112 /// Base confidence score for this fingerprint
113 confidence_score: f64,
114}
115
116/// Identification pattern with field, pattern, and weight
117///
118/// This struct defines a single identification pattern used in device fingerprinting.
119/// Each pattern has a field to match against, a pattern string, and a weight that determines
120/// how important this pattern is for identification.
121#[derive(Debug, Clone, Serialize, Deserialize)]
122struct IdentificationPattern {
123 /// Device information field to match against (e.g., "device_name", "driver")
124 field: String,
125 /// Pattern string to match (e.g., "nvidia", "cuda")
126 pattern: String,
127 /// Weight of this pattern in the overall confidence calculation (0.0 to 1.0)
128 weight: f64,
129}
130
131/// Record of a device discovery attempt
132///
133/// This struct records information about a device discovery attempt, including the device
134/// information, identified type (if any), and confidence score. These records are used for
135/// pattern learning and improving future discovery accuracy.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137struct DiscoveryRecord {
138 /// Timestamp of the discovery attempt (UNIX seconds)
139 timestamp: u64, // Changed from Instant to u64 for serialization
140 /// Unique identifier of the discovered device
141 device_id: String,
142 /// Device information collected during discovery
143 device_info: HashMap<String, String>,
144 /// Identified device type (if confidence threshold was met)
145 identified_type: Option<DMSCDeviceType>,
146 /// Confidence score for the identification
147 confidence: f64,
148}
149
150impl Default for DMSCDeviceDiscoveryEngine {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl DMSCDeviceDiscoveryEngine {
157 #[cfg(not(feature = "pyo3"))]
158 pub fn new() -> Self {
159 let mut engine = Self {
160 fingerprints: HashMap::new(),
161 discovery_history: VecDeque::with_capacity(1000),
162 confidence_threshold: 0.7,
163 };
164
165 // Initialize with default fingerprints
166 engine.initialize_default_fingerprints();
167 engine
168 }
169
170 #[cfg(not(feature = "pyo3"))]
171 pub fn discover_devices(&mut self, scan_results: Vec<DeviceScanResult>) -> Vec<DMSCDevice> {
172 let mut discovered_devices = Vec::new();
173
174 for scan_result in scan_results {
175 if let Some(device) = self.identify_device(scan_result) {
176 discovered_devices.push(device);
177 }
178 }
179
180 discovered_devices
181 }
182
183 /// Identify a single device from scan results using fingerprint matching.
184 ///
185 /// This method attempts to identify a device by matching its information against known
186 /// device fingerprints. It calculates confidence scores for each fingerprint match and
187 /// returns the best match if it meets the confidence threshold.
188 ///
189 /// # Parameters
190 ///
191 /// - `scan_result`: Single device scan result to identify
192 ///
193 /// # Returns
194 ///
195 /// An `Option<DMSCDevice>` containing the identified device if a match was found with
196 /// sufficient confidence, or `None` if no match was found.
197 fn identify_device(&mut self, scan_result: DeviceScanResult) -> Option<DMSCDevice> {
198 let device_info = scan_result.device_info;
199
200 // Try to match against known fingerprints
201 let mut best_match: Option<(String, f64)> = None;
202
203 for (fingerprint_id, fingerprint) in &self.fingerprints {
204 let confidence = self.calculate_match_confidence(&device_info, fingerprint);
205
206 if confidence > self.confidence_threshold {
207 match best_match {
208 None => best_match = Some((fingerprint_id.clone(), confidence)),
209 Some((_, best_confidence)) if confidence > best_confidence => {
210 best_match = Some((fingerprint_id.clone(), confidence));
211 }
212 _ => {}
213 }
214 }
215 }
216
217 if let Some((fingerprint_id, confidence)) = best_match {
218 let fingerprint = match self.fingerprints.get(&fingerprint_id) {
219 Some(f) => f,
220 None => return None,
221 };
222
223 let device = DMSCDevice::new(
224 scan_result.device_name.clone(),
225 fingerprint.device_type,
226 ).with_capabilities(fingerprint.capabilities.clone());
227
228 // Record discovery for future learning
229 self.record_discovery(
230 scan_result.device_id,
231 device_info,
232 Some(fingerprint.device_type),
233 confidence,
234 );
235
236 Some(device)
237 } else {
238 // Record failed discovery for analysis
239 self.record_discovery(
240 scan_result.device_id,
241 device_info,
242 None,
243 0.0,
244 );
245
246 None
247 }
248 }
249
250 /// Calculate confidence score for device identification.
251 ///
252 /// This method calculates a confidence score by matching device information against
253 /// the identification patterns in a fingerprint. The score is a weighted sum of matching
254 /// patterns divided by the total weight of all patterns.
255 ///
256 /// # Parameters
257 ///
258 /// - `device_info`: Device information to match
259 /// - `fingerprint`: Fingerprint containing identification patterns
260 ///
261 /// # Returns
262 ///
263 /// A confidence score between 0.0 and 1.0, where higher scores indicate a stronger match.
264 fn calculate_match_confidence(&self, device_info: &HashMap<String, String>, fingerprint: &DeviceFingerprint) -> f64 {
265 let mut total_weight = 0.0;
266 let mut matched_weight = 0.0;
267
268 for pattern in &fingerprint.identification_patterns {
269 total_weight += pattern.weight;
270
271 if let Some(value) = device_info.get(&pattern.field) {
272 if self.matches_pattern(value, &pattern.pattern) {
273 matched_weight += pattern.weight;
274 }
275 }
276 }
277
278 if total_weight > 0.0 {
279 matched_weight / total_weight
280 } else {
281 0.0
282 }
283 }
284
285 /// Check if a value matches a pattern using simple string matching.
286 ///
287 /// This method performs a case-insensitive substring match to determine if a value
288 /// matches a pattern. It can be enhanced with regex support in future versions.
289 ///
290 /// # Parameters
291 ///
292 /// - `value`: Device information value to check
293 /// - `pattern`: Pattern string to match against
294 ///
295 /// # Returns
296 ///
297 /// `true` if the value matches the pattern, `false` otherwise.
298 fn matches_pattern(&self, value: &str, pattern: &str) -> bool {
299 // Simple pattern matching - can be enhanced with regex
300 value.to_lowercase().contains(&pattern.to_lowercase())
301 }
302
303 /// Record discovery for pattern learning and analysis.
304 ///
305 /// This method records a discovery attempt in the history, including the device information,
306 /// identified type (if any), and confidence score. This history is used for pattern recognition
307 /// and improving future discovery accuracy.
308 ///
309 /// # Parameters
310 ///
311 /// - `device_id`: Unique identifier of the discovered device
312 /// - `device_info`: Device information collected during discovery
313 /// - `identified_type`: Identified device type (if any)
314 /// - `confidence`: Confidence score for the identification
315 fn record_discovery(&mut self, device_id: String, device_info: HashMap<String, String>, identified_type: Option<DMSCDeviceType>, confidence: f64) {
316 let record = DiscoveryRecord {
317 timestamp: std::time::SystemTime::now()
318 .duration_since(std::time::UNIX_EPOCH)
319 .unwrap_or(Duration::from_secs(0))
320 .as_secs(),
321 device_id,
322 device_info,
323 identified_type,
324 confidence,
325 };
326
327 self.discovery_history.push_back(record);
328
329 // Keep only recent history
330 if self.discovery_history.len() > 1000 {
331 self.discovery_history.pop_front();
332 }
333 }
334
335 /// Initialize default device fingerprints for common device types.
336 ///
337 /// This method populates the fingerprint database with default fingerprints for common
338 /// device types, including GPUs and TPUs. These fingerprints are used as a starting point
339 /// for device identification.
340 fn initialize_default_fingerprints(&mut self) {
341 // GPU Device Fingerprint
342 let gpu_fingerprint = DeviceFingerprint {
343 device_type: DMSCDeviceType::GPU,
344 capabilities: DMSCDeviceCapabilities {
345 memory_gb: Some(16.0),
346 compute_units: Some(512),
347 storage_gb: Some(500.0),
348 bandwidth_gbps: Some(900.0),
349 custom_capabilities: vec![("cuda_support".to_string(), "true".to_string()), ("tensor_cores".to_string(), "true".to_string())].into_iter().collect(),
350 },
351 identification_patterns: vec![
352 IdentificationPattern {
353 field: "device_name".to_string(),
354 pattern: "nvidia".to_string(),
355 weight: 0.4,
356 },
357 IdentificationPattern {
358 field: "driver".to_string(),
359 pattern: "cuda".to_string(),
360 weight: 0.6,
361 },
362 ],
363 confidence_score: 0.9,
364 };
365
366 // TPU Device Fingerprint
367 let tpu_fingerprint = DeviceFingerprint {
368 device_type: DMSCDeviceType::Custom, // Using Custom for TPU until we have a proper TPU type
369 capabilities: DMSCDeviceCapabilities {
370 memory_gb: Some(32.0),
371 compute_units: Some(128),
372 storage_gb: Some(1000.0),
373 bandwidth_gbps: Some(1200.0),
374 custom_capabilities: vec![("tpu_support".to_string(), "true".to_string()), ("ml_accelerator".to_string(), "true".to_string())].into_iter().collect(),
375 },
376 identification_patterns: vec![
377 IdentificationPattern {
378 field: "device_name".to_string(),
379 pattern: "tpu".to_string(),
380 weight: 0.8,
381 },
382 IdentificationPattern {
383 field: "vendor".to_string(),
384 pattern: "google".to_string(),
385 weight: 0.2,
386 },
387 ],
388 confidence_score: 0.95,
389 };
390
391 self.fingerprints.insert("gpu".to_string(), gpu_fingerprint);
392 self.fingerprints.insert("tpu".to_string(), tpu_fingerprint); // Using proper key for TPU
393 }
394
395 /// Get discovery statistics and performance metrics.
396 ///
397 /// This method calculates and returns statistics about the discovery process, including
398 /// total attempts, successful identifications, success rate, and average confidence score.
399 ///
400 /// # Returns
401 ///
402 /// A `DiscoveryStats` struct containing the discovery statistics.
403 #[cfg(not(feature = "pyo3"))]
404 pub fn get_discovery_stats(&self) -> DiscoveryStats {
405 let total_attempts = self.discovery_history.len();
406 let successful_identifications = self.discovery_history
407 .iter()
408 .filter(|record| record.identified_type.is_some())
409 .count();
410
411 let avg_confidence = if total_attempts > 0 {
412 self.discovery_history
413 .iter()
414 .map(|record| record.confidence)
415 .sum::<f64>() / total_attempts as f64
416 } else {
417 0.0
418 };
419
420 DiscoveryStats {
421 total_attempts,
422 successful_identifications,
423 success_rate: if total_attempts > 0 {
424 successful_identifications as f64 / total_attempts as f64
425 } else {
426 0.0
427 },
428 avg_confidence,
429 }
430 }
431}
432
433#[cfg(feature = "pyo3")]
434#[pyo3::prelude::pymethods]
435impl DMSCDeviceDiscoveryEngine {
436 #[new]
437 pub fn new() -> Self {
438 let mut engine = Self {
439 fingerprints: HashMap::new(),
440 discovery_history: VecDeque::with_capacity(1000),
441 confidence_threshold: 0.7,
442 };
443 engine.initialize_default_fingerprints();
444 engine
445 }
446
447 pub fn discover_devices(&mut self, scan_results: Vec<DeviceScanResult>) -> Vec<DMSCDevice> {
448 let mut discovered_devices = Vec::new();
449 for scan_result in scan_results {
450 if let Some(device) = self.identify_device(scan_result) {
451 discovered_devices.push(device);
452 }
453 }
454 discovered_devices
455 }
456
457 pub fn get_confidence_threshold(&self) -> f64 {
458 self.confidence_threshold
459 }
460
461 pub fn set_confidence_threshold(&mut self, threshold: f64) {
462 self.confidence_threshold = threshold;
463 }
464
465 pub fn get_discovery_stats(&self) -> DiscoveryStats {
466 let total_attempts = self.discovery_history.len();
467 let successful_identifications = self.discovery_history
468 .iter()
469 .filter(|r| r.identified_type.is_some())
470 .count();
471 let avg_confidence = if total_attempts > 0 {
472 self.discovery_history
473 .iter()
474 .map(|record| record.confidence)
475 .sum::<f64>() / total_attempts as f64
476 } else {
477 0.0
478 };
479
480 DiscoveryStats {
481 total_attempts,
482 successful_identifications,
483 success_rate: if total_attempts > 0 {
484 successful_identifications as f64 / total_attempts as f64
485 } else {
486 0.0
487 },
488 avg_confidence,
489 }
490 }
491}
492
493/// Device scan result from hardware discovery
494///
495/// This struct represents the result of a device scan, containing basic device information
496/// that is used for identification and fingerprint matching.
497#[derive(Debug, Clone)]
498#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
499pub struct DeviceScanResult {
500 /// Unique identifier for the discovered device
501 pub device_id: String,
502 /// Human-readable name of the device
503 pub device_name: String,
504 /// Additional device information collected during scanning
505 pub device_info: HashMap<String, String>,
506}
507
508/// Discovery statistics and performance metrics
509///
510/// This struct contains statistics about the device discovery process, including
511/// success rates, confidence scores, and total attempts.
512#[derive(Debug, Clone, Serialize, Deserialize)]
513#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
514pub struct DiscoveryStats {
515 /// Total number of device discovery attempts
516 pub total_attempts: usize,
517 /// Number of successful device identifications
518 pub successful_identifications: usize,
519 /// Success rate for device identification (0.0 to 1.0)
520 pub success_rate: f64,
521 /// Average confidence score for successful identifications
522 pub avg_confidence: f64,
523}
524
525/// Resource scheduling algorithm using performance history and policies
526///
527/// This scheduler uses device performance history, current load, and custom policies
528/// to intelligently allocate resources to the most suitable devices.
529#[derive(Debug, Clone)]
530pub struct DMSCResourceScheduler {
531 /// Device performance history mapping device IDs to performance records
532 performance_history: HashMap<String, Vec<PerformanceRecord>>,
533 /// Current device loads (0.0 to 1.0)
534 device_loads: HashMap<String, f64>,
535 /// Custom scheduling policies sorted by priority
536 policies: Vec<SchedulingPolicy>,
537}
538
539/// Performance record for a device
540///
541/// This struct records performance metrics for a device at a specific point in time,
542/// including latency, throughput, error rate, and utilization.
543#[derive(Debug, Clone, Serialize, Deserialize)]
544struct PerformanceRecord {
545 /// Timestamp of the performance measurement (UNIX seconds)
546 timestamp: u64, // Changed from Instant to u64 for serialization
547 /// Latency in milliseconds
548 latency_ms: f64,
549 /// Throughput in operations per second
550 throughput: f64,
551 /// Error rate as a fraction (0.0 to 1.0)
552 error_rate: f64,
553 /// Resource utilization as a fraction (0.0 to 1.0)
554 utilization: f64,
555}
556
557/// Scheduling policy for resource allocation
558///
559/// This struct defines a custom scheduling policy that can be applied to resource requests.
560/// Policies consist of conditions and actions, and are evaluated in priority order.
561#[derive(Debug, Clone, Serialize, Deserialize)]
562pub struct SchedulingPolicy {
563 /// Name of the policy
564 name: String,
565 /// Priority of the policy (higher numbers = higher priority)
566 priority: u8,
567 /// Conditions that must be met for the policy to apply
568 conditions: Vec<PolicyCondition>,
569 /// Action to take if conditions are met
570 action: PolicyAction,
571}
572
573/// Policy condition for scheduling decisions
574///
575/// This struct defines a single condition that must be met for a scheduling policy to apply.
576/// Conditions compare metrics against thresholds using specified operators.
577#[derive(Debug, Clone, Serialize, Deserialize)]
578struct PolicyCondition {
579 /// Metric to evaluate (e.g., "device_type", "request_priority", "time_of_day")
580 metric: String,
581 /// Comparison operator (>, >=, <, <=, ==)
582 operator: String,
583 /// Threshold value for the comparison
584 threshold: f64,
585}
586
587/// Policy action for scheduling decisions
588///
589/// This enum defines the actions that can be taken when a scheduling policy's conditions are met.
590#[derive(Debug, Clone, Serialize, Deserialize)]
591enum PolicyAction {
592 /// Prefer a specific device for resource allocation
593 PreferDevice(String),
594 /// Avoid a specific device for resource allocation
595 AvoidDevice(String),
596 /// Use load balancing for resource allocation
597 LoadBalance,
598 /// Use priority-based scheduling
599 PriorityBased,
600}
601
602impl Default for DMSCResourceScheduler {
603 fn default() -> Self {
604 Self::new()
605 }
606}
607
608impl DMSCResourceScheduler {
609 /// Create a new resource scheduler with default settings.
610 ///
611 /// This method initializes the scheduler with empty performance history, device loads,
612 /// and scheduling policies.
613 ///
614 /// # Returns
615 ///
616 /// A new `DMSCResourceScheduler` instance with default settings.
617 pub fn new() -> Self {
618 Self {
619 performance_history: HashMap::new(),
620 device_loads: HashMap::new(),
621 policies: Vec::new(),
622 }
623 }
624
625 /// Schedule a resource request to the most suitable device based on capabilities, load, and policies.
626 ///
627 /// This method evaluates available devices against the resource request requirements,
628 /// calculates a score for each suitable device, and returns the ID of the best device.
629 ///
630 /// # Parameters
631 ///
632 /// - `request`: Resource request with requirements and priority
633 /// - `available_devices`: List of available devices to consider for scheduling
634 ///
635 /// # Returns
636 ///
637 /// The ID of the most suitable device for the request, or `None` if no suitable device is found.
638 pub fn schedule_resource(
639 &mut self,
640 request: &ResourceRequest,
641 available_devices: &[DMSCDevice],
642 ) -> Option<String> {
643 // Filter devices that meet requirements
644 let suitable_devices: Vec<&DMSCDevice> = available_devices
645 .iter()
646 .filter(|device| self.meets_requirements(device, request))
647 .collect();
648
649 if suitable_devices.is_empty() {
650 return None;
651 }
652
653 // Score each suitable device
654 let mut device_scores: Vec<(String, f64)> = suitable_devices
655 .iter()
656 .map(|device| {
657 let score = self.calculate_device_score(device, request);
658 (device.id().to_string(), score)
659 })
660 .collect();
661
662 // Sort by score (highest first)
663 device_scores.sort_by(|a, b| {
664 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
665 });
666
667 // Return the best device
668 device_scores.first().map(|(device_id, _)| device_id.clone())
669 }
670
671 /// Check if a device meets the requirements of a resource request.
672 ///
673 /// This method verifies that a device has the necessary capabilities, memory, compute units,
674 /// bandwidth, and custom capabilities to handle a resource request.
675 ///
676 /// # Parameters
677 ///
678 /// - `device`: Device to check against requirements
679 /// - `request`: Resource request with requirements
680 ///
681 /// # Returns
682 ///
683 /// `true` if the device meets all requirements, `false` otherwise.
684 fn meets_requirements(&self, device: &DMSCDevice, request: &ResourceRequest) -> bool {
685 let capabilities = device.capabilities();
686
687 // Check memory requirement
688 if let Some(required_memory) = request.required_memory_gb {
689 if let Some(available_memory) = capabilities.memory_gb {
690 if available_memory < required_memory {
691 return false;
692 }
693 } else {
694 return false; // No memory available
695 }
696 }
697
698 // Check compute units requirement
699 if let Some(required_compute) = request.required_compute_units {
700 if let Some(available_compute) = capabilities.compute_units {
701 if available_compute < required_compute {
702 return false;
703 }
704 } else {
705 return false; // No compute units available
706 }
707 }
708
709 // Check bandwidth requirement
710 if let Some(required_bandwidth) = request.required_bandwidth_gbps {
711 if let Some(available_bandwidth) = capabilities.bandwidth_gbps {
712 if available_bandwidth < required_bandwidth {
713 return false;
714 }
715 } else {
716 return false; // No bandwidth available
717 }
718 }
719
720 // Check custom capabilities
721 for (required_key, required_value) in &request.required_custom_capabilities {
722 if let Some(available_value) = capabilities.custom_capabilities.get(required_key) {
723 if available_value != required_value {
724 return false;
725 }
726 } else {
727 return false; // Required capability not found
728 }
729 }
730
731 true
732 }
733
734 /// Calculate a score for a device based on its capabilities, load, performance history, and policies.
735 ///
736 /// This method calculates a score for a device by considering:
737 /// 1. Base score (100.0)
738 /// 2. Penalty for current load
739 /// 3. Bonus for good performance history (latency, throughput, error rate, utilization)
740 /// 4. Health score adjustment
741 /// 5. Responsiveness adjustment
742 /// 6. Policy-based adjustments
743 ///
744 /// # Parameters
745 ///
746 /// - `device`: Device to score
747 /// - `request`: Resource request being scheduled
748 ///
749 /// # Returns
750 ///
751 /// A score between 0.0 and potentially over 100.0, where higher scores indicate better suitability.
752 fn calculate_device_score(&self, device: &DMSCDevice, request: &ResourceRequest) -> f64 {
753 let device_id = device.id();
754 let base_score = 100.0;
755 let mut score = base_score;
756
757 // Penalize based on current load (exponential penalty for high load)
758 if let Some(&load) = self.device_loads.get(device_id) {
759 // Exponential penalty: 0% load = 0 penalty, 100% load = 70 point penalty
760 score -= load.powi(2) * 70.0;
761 }
762
763 // Bonus for device performance history with weighted factors
764 if let Some(performance_records) = self.performance_history.get(device_id) {
765 if !performance_records.is_empty() {
766 // Only consider recent performance records (last 10)
767 let recent_records = &performance_records[performance_records.len().saturating_sub(10)..];
768 let record_count = recent_records.len() as f64;
769
770 let avg_latency = recent_records
771 .iter()
772 .map(|record| record.latency_ms)
773 .sum::<f64>() / record_count;
774
775 let avg_throughput = recent_records
776 .iter()
777 .map(|record| record.throughput)
778 .sum::<f64>() / record_count;
779
780 let avg_error_rate = recent_records
781 .iter()
782 .map(|record| record.error_rate)
783 .sum::<f64>() / record_count;
784
785 let avg_utilization = recent_records
786 .iter()
787 .map(|record| record.utilization)
788 .sum::<f64>() / record_count;
789
790 // Bonus for low latency (up to 20 points)
791 score += (100.0 - avg_latency.min(100.0)) * 0.2;
792
793 // Bonus for high throughput (up to 20 points)
794 score += (avg_throughput.min(100.0)) * 0.2;
795
796 // Penalty for high error rate (up to 20 points)
797 score -= (avg_error_rate.min(1.0)) * 20.0;
798
799 // Bonus for optimal utilization (60-80% is ideal, up to 10 points)
800 let utilization_bonus = if (0.6..=0.8).contains(&avg_utilization) {
801 10.0
802 } else if (0.4..=0.9).contains(&avg_utilization) {
803 5.0
804 } else {
805 0.0
806 };
807 score += utilization_bonus;
808 }
809 }
810
811 // Add health score adjustment (up to 20 points)
812 let health_score = device.health_score() as f64;
813 score += (health_score / 100.0) * 20.0;
814
815 // Add responsiveness adjustment (10 points if responsive, 0 otherwise)
816 if device.is_responsive(300) { // 5 minutes timeout
817 score += 10.0;
818 }
819
820 // Apply policy-based adjustments
821 for policy in &self.policies {
822 score = self.apply_policy_score_adjustment(device, request, policy, score);
823 }
824
825 // Ensure score is within reasonable bounds
826 score.clamp(0.0, 200.0)
827 }
828
829 /// Apply policy-based score adjustments to a device's score.
830 ///
831 /// This method evaluates a scheduling policy's conditions and applies the appropriate
832 /// score adjustment if the conditions are met.
833 ///
834 /// # Parameters
835 ///
836 /// - `device`: Device being scored
837 /// - `request`: Resource request being scheduled
838 /// - `policy`: Scheduling policy to apply
839 /// - `current_score`: Current score of the device
840 ///
841 /// # Returns
842 ///
843 /// The updated score after applying the policy adjustment.
844 fn apply_policy_score_adjustment(
845 &self,
846 device: &DMSCDevice,
847 request: &ResourceRequest,
848 policy: &SchedulingPolicy,
849 current_score: f64,
850 ) -> f64 {
851 // Check if policy conditions are met
852 let conditions_met = policy.conditions.iter().all(|condition| {
853 self.evaluate_condition(device, request, condition)
854 });
855
856 if conditions_met {
857 match &policy.action {
858 PolicyAction::PreferDevice(preferred_device) => {
859 if device.id() == preferred_device {
860 current_score + 30.0 // Bonus for preferred device
861 } else {
862 current_score
863 }
864 }
865 PolicyAction::AvoidDevice(avoided_device) => {
866 if device.id() == avoided_device {
867 current_score - 30.0 // Penalty for avoided device
868 } else {
869 current_score
870 }
871 }
872 PolicyAction::LoadBalance => {
873 // Penalty based on current load for load balancing
874 let load_penalty = self.device_loads.get(device.id()).unwrap_or(&0.0) * 20.0;
875 current_score - load_penalty
876 }
877 PolicyAction::PriorityBased => {
878 // Bonus based on request priority
879 let priority_bonus = (request.priority as f64 / 10.0) * 15.0;
880 current_score + priority_bonus
881 }
882 }
883 } else {
884 current_score
885 }
886 }
887
888 /// Evaluate a policy condition against a device and request.
889 ///
890 /// This method evaluates a single policy condition by comparing the appropriate metric
891 /// against the threshold using the specified operator.
892 ///
893 /// # Parameters
894 ///
895 /// - `device`: Device to evaluate
896 /// - `request`: Resource request to evaluate
897 /// - `condition`: Policy condition to evaluate
898 ///
899 /// # Returns
900 ///
901 /// `true` if the condition is met, `false` otherwise.
902 fn evaluate_condition(
903 &self,
904 device: &DMSCDevice,
905 request: &ResourceRequest,
906 condition: &PolicyCondition,
907 ) -> bool {
908 let value = match condition.metric.as_str() {
909 "device_type" => {
910 // Convert device type to numeric value for comparison
911 match device.device_type() {
912 DMSCDeviceType::GPU => 1.0,
913 DMSCDeviceType::Memory => 2.0,
914 DMSCDeviceType::CPU => 3.0,
915 DMSCDeviceType::Storage => 4.0,
916 DMSCDeviceType::Network => 5.0,
917 DMSCDeviceType::Sensor => 6.0,
918 DMSCDeviceType::Actuator => 7.0,
919 DMSCDeviceType::Custom => 8.0,
920 }
921 }
922 "request_priority" => request.priority as f64,
923 "time_of_day" => {
924 // Simple time-based condition (0-24)
925 let now = std::time::SystemTime::now()
926 .duration_since(std::time::UNIX_EPOCH)
927 .unwrap_or_default()
928 .as_secs();
929 (now % 86400) as f64 / 3600.0
930 }
931 _ => 0.0,
932 };
933
934 match condition.operator.as_str() {
935 ">" => value > condition.threshold,
936 ">=" => value >= condition.threshold,
937 "<" => value < condition.threshold,
938 "<=" => value <= condition.threshold,
939 "==" => (value - condition.threshold).abs() < 0.001,
940 _ => false,
941 }
942 }
943
944 /// Record device performance after task completion.
945 ///
946 /// This method records performance metrics for a device, including latency, throughput,
947 /// error rate, and utilization. These records are used for future scheduling decisions.
948 ///
949 /// # Parameters
950 ///
951 /// - `device_id`: ID of the device whose performance is being recorded
952 /// - `latency_ms`: Latency in milliseconds
953 /// - `throughput`: Throughput in operations per second
954 /// - `error_rate`: Error rate as a fraction (0.0 to 1.0)
955 /// - `utilization`: Resource utilization as a fraction (0.0 to 1.0)
956 pub fn record_performance(
957 &mut self,
958 device_id: &str,
959 latency_ms: f64,
960 throughput: f64,
961 error_rate: f64,
962 utilization: f64,
963 ) {
964 let record = PerformanceRecord {
965 timestamp: std::time::SystemTime::now()
966 .duration_since(std::time::UNIX_EPOCH)
967 .unwrap_or_default()
968 .as_secs(),
969 latency_ms,
970 throughput,
971 error_rate,
972 utilization,
973 };
974
975 self.performance_history
976 .entry(device_id.to_string())
977 .or_default()
978 .push(record);
979
980 // Keep only recent performance records (last 100)
981 if let Some(history) = self.performance_history.get_mut(device_id) {
982 if history.len() > 100 {
983 history.remove(0);
984 }
985 }
986 }
987
988 /// Update the current load of a device.
989 ///
990 /// This method updates the load of a device, clamping the value between 0.0 and 1.0.
991 /// The load is used in scheduling decisions to avoid overloading devices.
992 ///
993 /// # Parameters
994 ///
995 /// - `device_id`: ID of the device whose load is being updated
996 /// - `load`: New load value (0.0 to 1.0)
997 pub fn update_device_load(&mut self, device_id: &str, load: f64) {
998 self.device_loads.insert(device_id.to_string(), load.clamp(0.0, 1.0));
999 }
1000
1001 /// Add a scheduling policy to the scheduler.
1002 ///
1003 /// This method adds a scheduling policy to the scheduler and sorts the policies
1004 /// by priority (highest first) to ensure they are evaluated in the correct order.
1005 ///
1006 /// # Parameters
1007 ///
1008 /// - `policy`: Scheduling policy to add
1009 pub fn add_policy(&mut self, policy: SchedulingPolicy) {
1010 self.policies.push(policy);
1011 // Sort by priority (highest first)
1012 self.policies.sort_by(|a, b| b.priority.cmp(&a.priority));
1013 }
1014}
1015
1016/// Resource request for scheduling
1017///
1018/// This struct represents a request for resources, including memory, compute units,
1019/// bandwidth, and custom capabilities. It also includes priority and deadline information.
1020#[derive(Debug, Clone)]
1021pub struct ResourceRequest {
1022 /// Unique identifier for the resource request
1023 pub request_id: String,
1024 /// Required memory in gigabytes (optional)
1025 pub required_memory_gb: Option<f64>,
1026 /// Required compute units (optional)
1027 pub required_compute_units: Option<usize>,
1028 /// Required bandwidth in Gbps (optional)
1029 pub required_bandwidth_gbps: Option<f64>,
1030 /// Required custom capabilities as key-value pairs
1031 pub required_custom_capabilities: HashMap<String, String>,
1032 /// Request priority (0-255, higher = higher priority)
1033 pub priority: u8,
1034 /// Optional deadline for the request
1035 pub deadline: Option<Instant>,
1036}