dmsc/device/scheduler.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20use serde::{Serialize, Deserialize};
21use std::collections::HashMap;
22use chrono::{DateTime, Utc};
23use crate::device::DMSCResourceAllocation;
24use tokio::sync::RwLock;
25
26use super::core::{DMSCDevice, DMSCDeviceType, DMSCDeviceCapabilities};
27use super::pool::DMSCResourcePoolManager;
28use std::sync::Arc;
29
30/// Resource scheduler for device management
31#[allow(dead_code)]
32#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
33pub struct DMSCResourceScheduler {
34 /// Active allocations
35 allocations: Arc<RwLock<HashMap<String, DMSCResourceAllocation>>>,
36 /// Allocation history for analytics
37 allocation_history: Arc<RwLock<Vec<DMSCResourceAllocation>>>,
38}
39
40#[cfg_attr(feature = "pyo3", pyo3::prelude::pymethods)]
41impl DMSCResourceScheduler {
42 #[cfg(feature = "pyo3")]
43 #[new]
44 fn new() -> Self {
45 Self {
46 allocations: Arc::new(RwLock::new(HashMap::new())),
47 allocation_history: Arc::new(RwLock::new(Vec::new())),
48 }
49 }
50}
51
52/// # Device Scheduler
53///
54/// This file implements a comprehensive device scheduler for DMSC, responsible for:
55/// 1. Managing device resource allocation and scheduling
56/// 2. Implementing multiple scheduling algorithms
57/// 3. Recording allocation history and statistics
58/// 4. Providing scheduling recommendations based on historical data
59///
60/// ## Design Principles
61///
62/// 1. **Multiple Scheduling Algorithms**: Supports FirstFit, BestFit, WorstFit, RoundRobin, PriorityBased, and LoadBalanced policies
63/// 2. **Policy Per Device Type**: Different device types can have different scheduling policies
64/// 3. **Allocation History**: Maintains detailed records of all allocations
65/// 4. **Statistics and Recommendations**: Provides insights into scheduling effectiveness and recommendations for optimization
66/// 5. **Resource Pool Integration**: Works closely with the resource pool manager
67/// 6. **Thread Safety**: Uses Arc and Mutex for thread-safe operations
68/// 7. **Scalability**: Designed to handle large numbers of devices and allocations
69///
70/// ## Usage Examples
71///
72/// ```rust
73/// use dmsc::device::{DMSCDeviceScheduler, DMSCAllocationRequest, DMSCDeviceType, DMSCDeviceCapabilities};
74/// use dmsc::device::pool::DMSCResourcePoolManager;
75/// use std::sync::{Arc, Mutex};
76///
77/// fn example() {
78/// // Create resource pool manager
79/// let pool_manager = Arc::new(Mutex::new(DMSCResourcePoolManager::new()));
80///
81/// // Create device scheduler
82/// let mut scheduler = DMSCDeviceScheduler::new(pool_manager);
83///
84/// // Set scheduling policy for GPUs
85/// scheduler.set_policy(DMSCDeviceType::GPU, dmsc::device::DMSCSchedulingPolicy::PriorityBased);
86///
87/// // Create allocation request
88/// let request = DMSCAllocationRequest {
89/// device_type: DMSCDeviceType::GPU,
90/// capabilities: DMSCDeviceCapabilities {
91/// memory_gb: Some(16.0),
92/// compute_units: Some(512),
93/// storage_gb: None,
94/// bandwidth_gbps: None,
95/// custom_capabilities: HashMap::new(),
96/// },
97/// priority: 5,
98/// timeout_secs: 30,
99/// };
100///
101/// // Allocate device
102/// if let Some(allocation_id) = scheduler.allocate(&request) {
103/// println!("Allocated device with ID: {}", allocation_id);
104///
105/// // Record release when done
106/// scheduler.record_release(&allocation_id);
107/// }
108///
109/// // Get statistics
110/// let stats = scheduler.get_statistics();
111/// println!("Total allocations: {}", stats.total_allocations);
112/// }
113/// ```
114/// Device scheduler - manages device resource allocation and scheduling
115///
116/// This struct implements a comprehensive device scheduling system that manages
117/// resource allocation using various scheduling algorithms. It works closely with
118/// the resource pool manager to access available devices and implements multiple
119/// scheduling policies per device type.
120#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
121pub struct DMSCDeviceScheduler {
122 scheduling_policies: HashMap<DMSCDeviceType, DMSCSchedulingPolicy>,
123 allocation_history: Arc<RwLock<Vec<DMSCAllocationRecord>>>,
124 resource_pool_manager: Arc<RwLock<DMSCResourcePoolManager>>,
125 round_robin_counters: Arc<RwLock<HashMap<DMSCDeviceType, usize>>>,
126}
127
128/// Scheduling policy enum - defines different algorithms for device selection
129///
130/// This enum defines the available scheduling policies that can be applied to different
131/// device types. Each policy implements a different algorithm for selecting devices.
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
134pub enum DMSCSchedulingPolicy {
135 /// FirstFit: Select the first device that meets requirements
136 FirstFit,
137 /// BestFit: Select the device that best matches the requirements
138 BestFit,
139 /// WorstFit: Select the device with the most remaining capacity
140 WorstFit,
141 /// RoundRobin: Select devices in rotation
142 RoundRobin,
143 /// PriorityBased: Select device based on request priority and device health
144 PriorityBased,
145 /// LoadBalanced: Select device with lowest current load
146 LoadBalanced,
147}
148
149/// Allocation record - details of a device allocation
150///
151/// This struct records detailed information about each device allocation, including
152/// the device used, capabilities required, allocation and release times, and success status.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
155pub struct DMSCAllocationRecord {
156 /// Unique allocation identifier
157 pub allocation_id: String,
158 /// ID of the allocated device
159 pub device_id: String,
160 /// Type of the allocated device
161 pub device_type: DMSCDeviceType,
162 /// Time when the device was allocated
163 pub allocated_at: DateTime<Utc>,
164 /// Time when the device was released (if applicable)
165 pub released_at: Option<DateTime<Utc>>,
166 /// Duration of the allocation in seconds (if completed)
167 pub duration_seconds: Option<f64>,
168 /// Whether the allocation was successful
169 pub success: bool,
170 /// Capabilities required for this allocation
171 pub capabilities_required: DMSCDeviceCapabilities,
172}
173
174/// Allocation request - request for device resources
175///
176/// This struct represents a request for device resources, including the device type,
177/// required capabilities, priority, timeout, and additional scheduling hints such as
178/// SLA class, resource weights, and affinity rules. These extra fields are optional
179/// and are used only by advanced scheduling logic.
180#[derive(Debug, Clone)]
181#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
182pub struct DMSCAllocationRequest {
183 /// Type of device requested
184 pub device_type: DMSCDeviceType,
185 /// Capabilities required from the device
186 pub capabilities: DMSCDeviceCapabilities,
187 /// Priority of this request (0-255, higher = higher priority)
188 pub priority: u32,
189 /// Timeout in seconds for this request
190 pub timeout_secs: u64,
191 /// Optional SLA class propagated from the external resource request
192 pub sla_class: Option<super::DMSCRequestSlaClass>,
193 /// Optional resource weights propagated from the external resource request
194 pub resource_weights: Option<super::DMSCResourceWeights>,
195 /// Optional affinity rules propagated from the external resource request
196 pub affinity: Option<super::DMSCAffinityRules>,
197 /// Optional anti-affinity rules propagated from the external resource request
198 pub anti_affinity: Option<super::DMSCAffinityRules>,
199}
200
201impl DMSCDeviceScheduler {
202 /// Create a new device scheduler with default policies and empty allocation history.
203 ///
204 /// This method initializes the scheduler with default scheduling policies for each device type
205 /// and connects it to the provided resource pool manager.
206 ///
207 /// # Parameters
208 ///
209 /// - `resource_pool_manager`: Resource pool manager for accessing available devices
210 ///
211 /// # Returns
212 ///
213 /// A new `DMSCDeviceScheduler` instance with default policies and settings.
214 pub fn new(resource_pool_manager: Arc<RwLock<DMSCResourcePoolManager>>) -> Self {
215 let mut scheduling_policies = HashMap::new();
216
217 // Set default policies for different device types
218 scheduling_policies.insert(DMSCDeviceType::CPU, DMSCSchedulingPolicy::LoadBalanced);
219 scheduling_policies.insert(DMSCDeviceType::GPU, DMSCSchedulingPolicy::PriorityBased);
220 scheduling_policies.insert(DMSCDeviceType::Memory, DMSCSchedulingPolicy::BestFit);
221 scheduling_policies.insert(DMSCDeviceType::Storage, DMSCSchedulingPolicy::FirstFit);
222 scheduling_policies.insert(DMSCDeviceType::Network, DMSCSchedulingPolicy::RoundRobin);
223 scheduling_policies.insert(DMSCDeviceType::Sensor, DMSCSchedulingPolicy::FirstFit);
224 scheduling_policies.insert(DMSCDeviceType::Actuator, DMSCSchedulingPolicy::PriorityBased);
225 scheduling_policies.insert(DMSCDeviceType::Custom, DMSCSchedulingPolicy::LoadBalanced);
226
227 Self {
228 scheduling_policies,
229 allocation_history: Arc::new(RwLock::new(Vec::new())),
230 resource_pool_manager,
231 round_robin_counters: Arc::new(RwLock::new(HashMap::new())),
232 }
233 }
234
235 /// Get the scheduling policy for a specific device type.
236 ///
237 /// This method returns the scheduling policy configured for the specified device type.
238 /// If no policy is configured, it returns the default FirstFit policy.
239 ///
240 /// # Parameters
241 ///
242 /// - `device_type`: Device type to get the policy for
243 ///
244 /// # Returns
245 ///
246 /// A reference to the `DMSCSchedulingPolicy` for the specified device type.
247 pub fn get_policy(&self, device_type: &DMSCDeviceType) -> &DMSCSchedulingPolicy {
248 self.scheduling_policies.get(device_type).unwrap_or(&DMSCSchedulingPolicy::FirstFit)
249 }
250
251 /// Set the scheduling policy for a specific device type.
252 ///
253 /// This method configures the scheduling policy for the specified device type.
254 ///
255 /// # Parameters
256 ///
257 /// - `device_type`: Device type to set the policy for
258 /// - `policy`: Scheduling policy to use for this device type
259 pub fn set_policy(&mut self, device_type: DMSCDeviceType, policy: DMSCSchedulingPolicy) {
260 self.scheduling_policies.insert(device_type, policy);
261 }
262
263 /// Select a device based on the scheduling policy for the requested device type.
264 ///
265 /// This method selects the best device for the allocation request by:
266 /// 1. Getting the appropriate scheduling policy for the device type
267 /// 2. Collecting available devices from the resource pool manager
268 /// 3. Filtering devices that meet the requirements
269 /// 4. Applying the scheduling policy to select the best device
270 ///
271 /// # Parameters
272 ///
273 /// - `request`: Allocation request with device type, capabilities, priority, and timeout
274 ///
275 /// # Returns
276 ///
277 /// An `Arc<DMSCDevice>` if a suitable device was found, or `None` if no device meets the requirements.
278 pub async fn select_device(&self, request: &DMSCAllocationRequest) -> Option<Arc<DMSCDevice>> {
279 let policy = self.get_policy(&request.device_type);
280
281 let available_devices = {
282 let pool_manager = self.resource_pool_manager.read().await;
283 let pools = pool_manager.get_pools_by_type(request.device_type);
284
285 if pools.is_empty() {
286 return None;
287 }
288
289 let mut devices: Vec<Arc<DMSCDevice>> = Vec::new();
290
291 for pool in &pools {
292 let pool_devices = pool.get_available_devices();
293 devices.extend(pool_devices.into_iter());
294 }
295
296 devices
297 };
298
299 // Stage 1: filter candidates according to basic requirements.
300 // This currently replicates the original capabilities-based filtering
301 // and will be extended with SLA / affinity rules in future iterations.
302 let filtered = self.filter_candidates(available_devices, request);
303 if filtered.is_empty() {
304 return None;
305 }
306
307 // Stage 2: scoring hook. For now this is effectively a no-op that
308 // preserves the original behavior. Later we will use this hook to
309 // incorporate SLA, resource weights and affinity into scoring.
310 let scored = self.score_candidates(&filtered, request);
311
312 // Stage 3: apply scheduling policy using the (optionally) scored list.
313 match policy {
314 DMSCSchedulingPolicy::FirstFit => self.first_fit(&scored),
315 DMSCSchedulingPolicy::BestFit => self.best_fit(&scored, &request.capabilities),
316 DMSCSchedulingPolicy::WorstFit => self.worst_fit(&scored, &request.capabilities),
317 DMSCSchedulingPolicy::RoundRobin => self.round_robin(&scored, request.device_type).await,
318 DMSCSchedulingPolicy::PriorityBased => self.priority_based(&scored, request.priority),
319 DMSCSchedulingPolicy::LoadBalanced => self.load_balanced(&scored),
320 }
321 }
322
323 /// Filters raw available devices into scheduling candidates.
324 ///
325 /// Currently this only applies basic capability checks to preserve the
326 /// original behavior. In future iterations, SLA and affinity rules from
327 /// the allocation request can be incorporated here.
328 fn filter_candidates(
329 &self,
330 devices: Vec<Arc<DMSCDevice>>,
331 request: &DMSCAllocationRequest,
332 ) -> Vec<Arc<DMSCDevice>> {
333 devices
334 .into_iter()
335 .filter(|device| device.capabilities().meets_requirements(&request.capabilities))
336 .filter(|device| {
337 // Apply hard affinity / anti-affinity rules when present
338 if let Some(rules) = &request.affinity {
339 // required_labels: all must match
340 for (key, val) in &rules.required_labels {
341 match device.metadata().get(key) {
342 Some(v) if v == val => {}
343 _ => return false,
344 }
345 }
346 }
347
348 if let Some(rules) = &request.anti_affinity {
349 // forbidden_labels: none may match
350 for (key, val) in &rules.forbidden_labels {
351 if let Some(v) = device.metadata().get(key) {
352 if v == val {
353 return false;
354 }
355 }
356 }
357 }
358
359 true
360 })
361 .collect()
362 }
363
364 /// Scores candidates for advanced scheduling.
365 ///
366 /// This function computes a composite score per device based on:
367 /// - resource fitness (how well the device matches requested capabilities)
368 /// - remaining capacity
369 /// - device health
370 /// - optional multi-dimensional resource weights
371 /// - optional SLA class
372 ///
373 /// The list is then sorted in descending order of score so that subsequent
374 /// policy functions (FirstFit/BestFit/etc.) operate on a preference-ordered
375 /// candidate set.
376 fn score_candidates(
377 &self,
378 candidates: &[Arc<DMSCDevice>],
379 request: &DMSCAllocationRequest,
380 ) -> Vec<Arc<DMSCDevice>> {
381 if candidates.is_empty() {
382 return Vec::new();
383 }
384
385 // Derive SLA multiplier
386 let sla_multiplier: f64 = match request.sla_class {
387 Some(super::DMSCRequestSlaClass::Critical) => 1.5,
388 Some(super::DMSCRequestSlaClass::High) => 1.2,
389 Some(super::DMSCRequestSlaClass::Medium) => 1.0,
390 Some(super::DMSCRequestSlaClass::Low) => 0.8,
391 None => 1.0,
392 };
393
394 // Resource dimension weights (fallback to 1.0 if not provided)
395 let (compute_weight, memory_weight, storage_weight, bandwidth_weight) =
396 match &request.resource_weights {
397 Some(w) => (w.compute_weight, w.memory_weight, w.storage_weight, w.bandwidth_weight),
398 None => (1.0, 1.0, 1.0, 1.0),
399 };
400
401 let mut scored: Vec<(Arc<DMSCDevice>, f64)> = candidates
402 .iter()
403 .cloned()
404 .map(|device| {
405 let caps = device.capabilities();
406
407 // Base fitness: lower is better; convert to [0,1] where 1 is best.
408 let fitness = self.calculate_fitness_score(caps, &request.capabilities);
409 let fitness_score = 1.0 / (1.0 + fitness.max(0.0));
410
411 // Remaining capacity score: already higher-is-better.
412 let remaining = self.calculate_remaining_capacity_score(caps);
413
414 // Health score normalized to [0,1].
415 let health = device.health_score() as f64 / 100.0;
416
417 // Dimension-specific ratios for weighting.
418 let mut dim_score = 0.0;
419
420 if let (Some(req), Some(avail)) = (request.capabilities.compute_units, caps.compute_units) {
421 if avail > 0 {
422 let ratio = req as f64 / avail as f64;
423 dim_score += compute_weight * (1.0 / (1.0 + ratio.max(0.0)));
424 }
425 }
426
427 if let (Some(req), Some(avail)) = (request.capabilities.memory_gb, caps.memory_gb) {
428 if avail > 0.0 {
429 let ratio = req / avail;
430 dim_score += memory_weight * (1.0 / (1.0 + ratio.max(0.0)));
431 }
432 }
433
434 if let (Some(req), Some(avail)) = (request.capabilities.storage_gb, caps.storage_gb) {
435 if avail > 0.0 {
436 let ratio = req / avail;
437 dim_score += storage_weight * (1.0 / (1.0 + ratio.max(0.0)));
438 }
439 }
440
441 if let (Some(req), Some(avail)) = (request.capabilities.bandwidth_gbps, caps.bandwidth_gbps) {
442 if avail > 0.0 {
443 let ratio = req / avail;
444 dim_score += bandwidth_weight * (1.0 / (1.0 + ratio.max(0.0)));
445 }
446 }
447
448 // Normalize remaining capacity to a softer influence.
449 let remaining_score = (remaining / (1.0 + remaining.abs())).max(0.0);
450
451 // Affinity preference bonus: reward preferred label matches when defined
452 let mut affinity_bonus = 0.0;
453 if let Some(rules) = &request.affinity {
454 for (key, val) in &rules.preferred_labels {
455 if let Some(v) = device.metadata().get(key) {
456 if v == val {
457 affinity_bonus += 0.05;
458 }
459 }
460 }
461 }
462
463 // Composite score
464 let score = sla_multiplier
465 * (
466 0.4 * fitness_score // how tightly it matches requirements
467 + 0.3 * dim_score // per-dimension weighted match
468 + 0.2 * remaining_score
469 + 0.1 * health
470 + affinity_bonus
471 );
472
473 (device, score)
474 })
475 .collect();
476
477 // Sort descending by score (best first)
478 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
479
480 scored.into_iter().map(|(device, _)| device).collect()
481 }
482
483 /// First Fit algorithm: select the first device that meets requirements.
484 ///
485 /// This algorithm selects the first device in the list that meets the requirements.
486 /// It's simple and fast, but may not be the most efficient in terms of resource utilization.
487 ///
488 /// # Parameters
489 ///
490 /// - `devices`: List of devices that meet the requirements
491 ///
492 /// # Returns
493 ///
494 /// The first device in the list, or `None` if the list is empty.
495 fn first_fit(&self, devices: &[Arc<DMSCDevice>]) -> Option<Arc<DMSCDevice>> {
496 devices.first().cloned()
497 }
498
499 /// Best Fit algorithm: select the device that best matches the requirements.
500 ///
501 /// This algorithm selects the device that best matches the requirements by calculating
502 /// a fitness score based on the ratio of required resources to available resources.
503 /// It aims to minimize wasted resources.
504 ///
505 /// # Parameters
506 ///
507 /// - `devices`: List of devices that meet the requirements
508 /// - `requirements`: Required capabilities for the allocation
509 ///
510 /// # Returns
511 ///
512 /// The device with the best fitness score, or `None` if no devices meet the requirements.
513 fn best_fit(&self, devices: &[Arc<DMSCDevice>], requirements: &DMSCDeviceCapabilities) -> Option<Arc<DMSCDevice>> {
514 devices.iter()
515 .filter(|device| device.capabilities().meets_requirements(requirements))
516 .min_by(|a, b| {
517 // Calculate fitness score based on resource usage ratios
518 let a_fitness = self.calculate_fitness_score(a.capabilities(), requirements);
519 let b_fitness = self.calculate_fitness_score(b.capabilities(), requirements);
520 a_fitness.partial_cmp(&b_fitness).unwrap_or(std::cmp::Ordering::Equal)
521 })
522 .cloned()
523 }
524
525 /// Worst Fit algorithm: select the device with the most remaining capacity.
526 ///
527 /// This algorithm selects the device with the most remaining capacity, which can help
528 /// prevent fragmentation of resources. It's useful when dealing with varying allocation sizes.
529 ///
530 /// # Parameters
531 ///
532 /// - `devices`: List of devices that meet the requirements
533 /// - `requirements`: Required capabilities for the allocation
534 ///
535 /// # Returns
536 ///
537 /// The device with the highest remaining capacity score, or `None` if no devices meet the requirements.
538 fn worst_fit(&self, devices: &[Arc<DMSCDevice>], requirements: &DMSCDeviceCapabilities) -> Option<Arc<DMSCDevice>> {
539 devices.iter()
540 .filter(|device| device.capabilities().meets_requirements(requirements))
541 .max_by(|a, b| {
542 // Calculate remaining capacity score
543 let a_score = self.calculate_remaining_capacity_score(a.capabilities());
544 let b_score = self.calculate_remaining_capacity_score(b.capabilities());
545 a_score.partial_cmp(&b_score).unwrap_or(std::cmp::Ordering::Equal)
546 })
547 .cloned()
548 }
549
550 /// Round Robin algorithm: select devices in rotation.
551 ///
552 /// This algorithm selects devices in rotation, distributing allocations evenly across all
553 /// available devices. It's simple and ensures fair distribution of work.
554 ///
555 /// # Parameters
556 ///
557 /// - `devices`: List of devices that meet the requirements
558 /// - `device_type`: Type of device being allocated
559 ///
560 /// # Returns
561 ///
562 /// The next device in the rotation, or `None` if no devices meet the requirements.
563 async fn round_robin(&self, devices: &[Arc<DMSCDevice>], device_type: DMSCDeviceType) -> Option<Arc<DMSCDevice>> {
564 let mut counters = self.round_robin_counters.write().await;
565 let counter = counters.entry(device_type)
566 .or_insert(0);
567
568 let index = *counter % devices.len();
569 *counter += 1;
570
571 devices.get(index).cloned()
572 }
573
574 /// Priority Based algorithm: select device based on request priority and device health.
575 ///
576 /// This algorithm selects devices based on a combination of request priority and device health.
577 /// Higher priority requests get higher quality devices (higher health scores).
578 ///
579 /// # Parameters
580 ///
581 /// - `devices`: List of devices that meet the requirements
582 /// - `priority`: Priority of the allocation request
583 ///
584 /// # Returns
585 ///
586 /// The device with the highest priority-weighted health score, or `None` if no devices meet the requirements.
587 fn priority_based(&self, devices: &[Arc<DMSCDevice>], priority: u32) -> Option<Arc<DMSCDevice>> {
588 // For higher priority requests, select devices with higher health scores
589 devices.iter()
590 .max_by(|a, b| {
591 let a_score = a.health_score() as u32 * priority;
592 let b_score = b.health_score() as u32 * priority;
593 a_score.cmp(&b_score)
594 })
595 .cloned()
596 }
597
598 /// Load Balanced algorithm: select device with lowest current load.
599 ///
600 /// This algorithm selects the device with the lowest current load, using health score as a proxy for load.
601 /// It aims to distribute work evenly based on device capacity.
602 ///
603 /// # Parameters
604 ///
605 /// - `devices`: List of devices that meet the requirements
606 ///
607 /// # Returns
608 ///
609 /// The device with the lowest load (highest health score), or `None` if no devices meet the requirements.
610 fn load_balanced(&self, devices: &[Arc<DMSCDevice>]) -> Option<Arc<DMSCDevice>> {
611 devices.iter()
612 .min_by(|a, b| {
613 // Use health score as a proxy for load (inverse relationship)
614 a.health_score().cmp(&b.health_score()).reverse()
615 })
616 .cloned()
617 }
618
619 /// Calculate fitness score for Best Fit algorithm.
620 ///
621 /// This method calculates a fitness score based on the ratio of required resources to available resources.
622 /// Lower scores indicate better fit (more efficient resource utilization).
623 ///
624 /// # Parameters
625 ///
626 /// - `device_cap`: Device capabilities
627 /// - `requirements`: Required capabilities
628 ///
629 /// # Returns
630 ///
631 /// A fitness score between 0.0 and potentially over 1.0, where lower scores indicate better fit.
632 fn calculate_fitness_score(&self, device_cap: &DMSCDeviceCapabilities, requirements: &DMSCDeviceCapabilities) -> f64 {
633 let mut score = 0.0;
634
635 // Calculate memory fitness
636 if let (Some(req_mem), Some(avail_mem)) = (requirements.memory_gb, device_cap.memory_gb) {
637 let used_ratio = req_mem / avail_mem;
638 score += used_ratio;
639 }
640
641 // Calculate compute units fitness
642 if let (Some(req_units), Some(avail_units)) = (requirements.compute_units, device_cap.compute_units) {
643 let used_ratio = req_units as f64 / avail_units as f64;
644 score += used_ratio;
645 }
646
647 // Calculate storage fitness
648 if let (Some(req_storage), Some(avail_storage)) = (requirements.storage_gb, device_cap.storage_gb) {
649 let used_ratio = req_storage / avail_storage;
650 score += used_ratio;
651 }
652
653 score
654 }
655
656 /// Calculate remaining capacity score for Worst Fit algorithm.
657 ///
658 /// This method calculates a score based on the remaining capacity of a device.
659 /// Higher scores indicate more remaining capacity.
660 ///
661 /// # Parameters
662 ///
663 /// - `device_cap`: Device capabilities
664 ///
665 /// # Returns
666 ///
667 /// A score representing the remaining capacity, where higher scores indicate more capacity.
668 fn calculate_remaining_capacity_score(&self, device_cap: &DMSCDeviceCapabilities) -> f64 {
669 let mut score = 0.0;
670
671 // Add memory capacity
672 if let Some(mem) = device_cap.memory_gb {
673 score += mem;
674 }
675
676 // Add compute units (weighted by 100 to balance with memory)
677 if let Some(units) = device_cap.compute_units {
678 score += units as f64 * 100.0;
679 }
680
681 // Add storage capacity
682 if let Some(storage) = device_cap.storage_gb {
683 score += storage;
684 }
685
686 score
687 }
688
689 /// Allocate a device based on the scheduling policy.
690 ///
691 /// This method selects a device using the appropriate scheduling policy and records the allocation.
692 ///
693 /// # Parameters
694 ///
695 /// - `request`: Allocation request with device type, capabilities, priority, and timeout
696 ///
697 /// # Returns
698 ///
699 /// An allocation ID if successful, or `None` if no suitable device was found.
700 pub async fn allocate(&self, request: &DMSCAllocationRequest) -> Option<String> {
701 if let Some(device) = self.select_device(request).await {
702 // Generate unique allocation ID
703 let allocation_id = uuid::Uuid::new_v4().to_string();
704
705 // Note: In a real implementation, we'd need to lock the device and update its status
706 // This is simplified for demonstration
707
708 // Record the allocation
709 self.record_allocation(allocation_id.clone(), device.id().to_string(), device.device_type(), request.capabilities.clone()).await;
710
711 Some(allocation_id)
712 } else {
713 None
714 }
715 }
716
717 /// Record an allocation in the history.
718 ///
719 /// This method records a new allocation in the history, including device information,
720 /// capabilities required, and allocation time.
721 ///
722 /// # Parameters
723 ///
724 /// - `allocation_id`: Unique allocation identifier
725 /// - `device_id`: ID of the allocated device
726 /// - `device_type`: Type of the allocated device
727 /// - `capabilities_required`: Capabilities required for this allocation
728 pub async fn record_allocation(&self, allocation_id: String, device_id: String, device_type: DMSCDeviceType, capabilities_required: DMSCDeviceCapabilities) {
729 let record = DMSCAllocationRecord {
730 allocation_id,
731 device_id,
732 device_type,
733 allocated_at: Utc::now(),
734 released_at: None,
735 duration_seconds: None,
736 success: true,
737 capabilities_required,
738 };
739
740 let mut history = self.allocation_history.write().await;
741 history.push(record);
742
743 if history.len() > 1000 {
744 history.remove(0);
745 }
746 }
747
748 /// Record the release of an allocation.
749 ///
750 /// This method updates an allocation record to mark it as released, including the release time
751 /// and duration of the allocation.
752 ///
753 /// # Parameters
754 ///
755 /// - `allocation_id`: ID of the allocation to release
756 pub async fn record_release(&self, allocation_id: &str) {
757 let mut history = self.allocation_history.write().await;
758 if let Some(record) = history.iter_mut().find(|r| r.allocation_id == allocation_id) {
759 record.released_at = Some(Utc::now());
760
761 if let Some(released_at) = record.released_at {
762 if let Ok(duration) = released_at.signed_duration_since(record.allocated_at).to_std() {
763 record.duration_seconds = Some(duration.as_secs_f64());
764 }
765 }
766 }
767 }
768
769 /// Get allocation statistics and metrics.
770 ///
771 /// This method calculates comprehensive statistics about allocations, including:
772 /// - Total and successful allocations
773 /// - Success rate
774 /// - Average allocation duration
775 /// - Statistics by device type
776 ///
777 /// # Returns
778 ///
779 /// A `DMSCAllocationStatistics` struct containing comprehensive allocation statistics.
780 pub async fn get_statistics(&self) -> DMSCAllocationStatistics {
781 let history = self.allocation_history.read().await;
782 let total_allocations = history.len();
783 let successful_allocations = history.iter().filter(|r| r.success).count();
784 let failed_allocations = total_allocations - successful_allocations;
785
786 let completed_allocations: Vec<&DMSCAllocationRecord> = history.iter()
787 .filter(|r| r.released_at.is_some())
788 .collect();
789
790 let total_duration_seconds: f64 = completed_allocations.iter()
791 .filter_map(|r| r.duration_seconds)
792 .sum();
793
794 let average_duration_seconds = if !completed_allocations.is_empty() {
795 total_duration_seconds / completed_allocations.len() as f64
796 } else {
797 0.0
798 };
799
800 let mut by_device_type = HashMap::new();
801 for device_type in [DMSCDeviceType::CPU, DMSCDeviceType::GPU, DMSCDeviceType::Memory,
802 DMSCDeviceType::Storage, DMSCDeviceType::Network, DMSCDeviceType::Sensor,
803 DMSCDeviceType::Actuator, DMSCDeviceType::Custom] {
804 let type_allocations = history.iter()
805 .filter(|r| r.device_type == device_type)
806 .count();
807
808 if type_allocations > 0 {
809 let type_completed = history.iter()
810 .filter(|r| r.device_type == device_type && r.released_at.is_some())
811 .count();
812
813 let type_duration: f64 = history.iter()
814 .filter(|r| r.device_type == device_type)
815 .filter_map(|r| r.duration_seconds)
816 .sum();
817
818 let type_avg_duration = if type_completed > 0 {
819 type_duration / type_completed as f64
820 } else {
821 0.0
822 };
823
824 by_device_type.insert(device_type, DMSCDeviceTypeStatistics {
825 total_allocations: type_allocations,
826 completed_allocations: type_completed,
827 average_duration_seconds: type_avg_duration,
828 });
829 }
830 }
831
832 DMSCAllocationStatistics {
833 total_allocations,
834 successful_allocations,
835 failed_allocations,
836 success_rate: if total_allocations > 0 {
837 (successful_allocations as f64 / total_allocations as f64) * 100.0
838 } else {
839 0.0
840 },
841 average_duration_seconds,
842 by_device_type,
843 }
844 }
845
846 /// Get scheduling recommendations based on historical data.
847 pub async fn get_recommendations(&self, device_type: &DMSCDeviceType) -> Vec<DMSCSchedulingRecommendation> {
848 let mut recommendations = Vec::new();
849
850 let history = self.allocation_history.read().await;
851 let recent_allocations: Vec<&DMSCAllocationRecord> = history.iter()
852 .filter(|r| r.device_type == *device_type)
853 .rev()
854 .take(100)
855 .collect();
856
857 if recent_allocations.is_empty() {
858 recommendations.push(DMSCSchedulingRecommendation {
859 recommendation_type: DMSCSchedulingRecommendationType::UseDefaultPolicy,
860 description: format!("No recent allocation data for {device_type:?}, using default policy"),
861 priority: 1,
862 confidence: 0.5,
863 });
864 return recommendations;
865 }
866
867 let avg_duration = recent_allocations.iter()
868 .filter_map(|r| r.duration_seconds)
869 .sum::<f64>() / recent_allocations.len() as f64;
870
871 let success_rate = recent_allocations.iter().filter(|r| r.success).count() as f64
872 / recent_allocations.len() as f64;
873
874 if success_rate < 0.8 {
875 recommendations.push(DMSCSchedulingRecommendation {
876 recommendation_type: DMSCSchedulingRecommendationType::ConsiderPolicyChange,
877 description: format!("Low success rate ({:.1}%) for {:?}, consider changing scheduling policy",
878 success_rate * 100.0, device_type),
879 priority: 3,
880 confidence: 0.8,
881 });
882 }
883
884 if avg_duration > 300.0 {
885 recommendations.push(DMSCSchedulingRecommendation {
886 recommendation_type: DMSCSchedulingRecommendationType::OptimizeForLongRunning,
887 description: format!("Average allocation duration is {avg_duration:.1} seconds for {device_type:?}, consider load balancing"),
888 priority: 2,
889 confidence: 0.7,
890 });
891 }
892
893 if recent_allocations.len() > 50 && avg_duration < 60.0 {
894 recommendations.push(DMSCSchedulingRecommendation {
895 recommendation_type: DMSCSchedulingRecommendationType::OptimizeForShortRunning,
896 description: format!("High frequency of short allocations for {device_type:?}, consider round-robin scheduling"),
897 priority: 2,
898 confidence: 0.6,
899 });
900 }
901
902 recommendations.push(DMSCSchedulingRecommendation {
903 recommendation_type: DMSCSchedulingRecommendationType::ContinueCurrentPolicy,
904 description: format!("Current scheduling policy appears effective for {device_type:?}"),
905 priority: 1,
906 confidence: 0.9,
907 });
908
909 recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
910 recommendations
911 }
912}
913
914/// Allocation statistics - comprehensive metrics about device allocations
915///
916/// This struct contains detailed statistics about device allocations, including success rates,
917/// durations, and breakdowns by device type.
918#[derive(Debug, Clone, Serialize, Deserialize)]
919#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
920pub struct DMSCAllocationStatistics {
921 /// Total number of allocations
922 pub total_allocations: usize,
923 /// Number of successful allocations
924 pub successful_allocations: usize,
925 /// Number of failed allocations
926 pub failed_allocations: usize,
927 /// Success rate as a percentage (0.0-100.0)
928 pub success_rate: f64,
929 /// Average duration of completed allocations in seconds
930 pub average_duration_seconds: f64,
931 /// Statistics broken down by device type
932 pub by_device_type: HashMap<DMSCDeviceType, DMSCDeviceTypeStatistics>,
933}
934
935/// Device type statistics - metrics for a specific device type
936///
937/// This struct contains allocation statistics for a specific device type.
938#[derive(Debug, Clone, Serialize, Deserialize)]
939#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
940pub struct DMSCDeviceTypeStatistics {
941 /// Total number of allocations for this device type
942 pub total_allocations: usize,
943 /// Number of completed allocations for this device type
944 pub completed_allocations: usize,
945 /// Average duration of completed allocations for this device type in seconds
946 pub average_duration_seconds: f64,
947}
948
949/// Scheduling recommendation types - categories of scheduling recommendations
950///
951/// This enum defines the different types of scheduling recommendations that can be generated.
952#[derive(Debug, Clone, Serialize, Deserialize)]
953#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
954pub enum DMSCSchedulingRecommendationType {
955 /// Use the default policy for this device type
956 UseDefaultPolicy,
957 /// Continue using the current policy
958 ContinueCurrentPolicy,
959 /// Consider changing the scheduling policy
960 ConsiderPolicyChange,
961 /// Optimize for long-running allocations
962 OptimizeForLongRunning,
963 /// Optimize for short-running allocations
964 OptimizeForShortRunning,
965 /// Use load balancing
966 LoadBalance,
967 /// Use priority-based scheduling
968 Prioritize,
969}
970
971/// Scheduling recommendation - suggestion for optimizing scheduling
972///
973/// This struct represents a recommendation for optimizing scheduling, including the recommendation type,
974/// description, priority, and confidence level.
975#[derive(Debug, Clone, Serialize, Deserialize)]
976#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
977pub struct DMSCSchedulingRecommendation {
978 /// Type of recommendation
979 pub recommendation_type: DMSCSchedulingRecommendationType,
980 /// Human-readable description of the recommendation
981 pub description: String,
982 /// Priority of the recommendation (1-10, higher = more important)
983 pub priority: u8,
984 /// Confidence in the recommendation (0.0-1.0)
985 pub confidence: f64,
986}