dmsc/core/
app_runtime.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//! 
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//! 
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! you may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//! 
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//! 
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Application Runtime
21//! 
22//! This module provides the application runtime for DMSC applications.
23//! The `DMSCAppRuntime` manages the application lifecycle, including module initialization,
24//! startup, and shutdown. It also handles the execution of both synchronous and asynchronous modules.
25
26use crate::core::{DMSCResult, DMSCServiceContext};
27use crate::hooks::{DMSCHookKind, DMSCModulePhase};
28use super::module_types::{ModuleSlot, ModuleType};
29use tokio::sync::RwLock as AsyncRwLock;
30use std::sync::Arc;
31#[cfg(feature = "pyo3")]
32use pyo3::prelude::*;
33
34/// Public-facing application runtime.
35/// 
36/// The `DMSCAppRuntime` manages the application lifecycle, including module initialization,
37/// startup, and shutdown. It also handles the execution of both synchronous and asynchronous modules.
38/// 
39/// ## Usage
40/// 
41/// ```rust
42/// use dmsc::prelude::*;
43/// 
44/// #[tokio::main]
45/// async fn main() -> DMSCResult<()> {
46///     let app = DMSCAppBuilder::new()
47///         .with_config("config.yaml")?
48///         .build()?;
49///     
50///     app.run(|ctx| async move {
51///         ctx.logger().info("service", "DMSC service started")?;
52///         Ok(())
53///     }).await
54/// }
55/// ```
56
57#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
58#[derive(Clone)]
59pub struct DMSCAppRuntime {
60    /// Service context providing access to core functionalities
61    ctx: DMSCServiceContext,
62    /// Vector of modules with their state, protected by an async RwLock
63    modules: Arc<AsyncRwLock<Vec<ModuleSlot>>>,
64}
65
66impl DMSCAppRuntime {
67    /// Create a new application runtime with the given context and modules.
68    /// 
69    /// This method is typically called by the `DMSCAppBuilder` during the build process.
70    /// 
71    /// # Parameters
72    /// 
73    /// - `ctx`: Service context with core functionalities
74    /// - `modules`: Vector of modules with their initial state
75    /// 
76    /// # Returns
77    /// 
78    /// A new `DMSCAppRuntime` instance.
79    pub fn new(ctx: DMSCServiceContext, modules: Vec<ModuleSlot>) -> Self {
80        Self {
81            ctx,
82            modules: Arc::new(AsyncRwLock::new(modules)),
83        }
84    }
85    
86    /// Run the application lifecycle.
87    /// 
88    /// This method executes the complete application lifecycle, including:
89    /// 1. Emitting startup hooks
90    /// 2. Initializing synchronous modules
91    /// 3. Starting synchronous modules
92    /// 4. Initializing and starting asynchronous modules
93    /// 5. Running the application business logic via the provided closure
94    /// 6. Shutting down asynchronous modules
95    /// 7. Shutting down synchronous modules
96    /// 8. Emitting shutdown hooks
97    /// 
98    /// # Parameters
99    /// 
100    /// - `f`: A closure that takes a `DMSCServiceContext` and returns a `DMSCResult<()>`. 
101    ///   This closure contains the application's business logic and is executed after all
102    ///   modules have been initialized and started, but before any modules are shut down.
103    /// 
104    /// # Returns
105    /// 
106    /// A `DMSCResult` indicating success or failure.
107    /// 
108    /// # Errors
109    /// 
110    /// Returns an error if:
111    /// - A critical module fails during execution
112    /// - The provided closure returns an error
113    pub async fn run<F, Fut>(mut self, f: F) -> DMSCResult<()>
114    where
115        F: FnOnce(&DMSCServiceContext) -> Fut,
116        Fut: std::future::Future<Output = DMSCResult<()>>,
117    {
118        // Emit startup hook
119        self.ctx.hooks().emit_with(&DMSCHookKind::Startup, &self.ctx, None, None)?;
120
121        // Emit before modules init hook
122        self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesInit, &self.ctx, None, None)?;
123        
124        // Get module count
125        let modules_guard = self.modules.read().await;
126        let module_len = modules_guard.len();
127        drop(modules_guard); // Release lock early
128        
129        // Collect module states first to avoid repeated lock acquisitions
130        let mut module_states = Vec::new();
131        {
132            let modules_guard = self.modules.read().await;
133            for idx in 0..module_len {
134                if idx < modules_guard.len() {
135                    let slot = &modules_guard[idx];
136                    module_states.push((
137                        idx,
138                        !slot.failed,
139                        if !slot.failed {
140                            slot.module.name().to_string()
141                        } else {
142                            String::new()
143                        },
144                        if !slot.failed {
145                            slot.module.is_critical()
146                        } else {
147                            false
148                        },
149                    ));
150                } else {
151                    module_states.push((idx, false, String::new(), false));
152                }
153            }
154        }
155
156        // Initialize synchronous modules
157        for (idx, skip, module_name, critical) in module_states.iter().cloned() {
158            if !skip {
159                // Emit before module init hook
160                self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesInit, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Init))?;
161                
162                // Initialize module with single write lock acquisition
163                let mut error = None;
164                {
165                    let mut modules_guard = self.modules.write().await;
166                    if idx < modules_guard.len() {
167                        match &mut modules_guard[idx].module {
168                            ModuleType::Sync(_module) => {
169                                if let Err(err) = _module.init(&mut self.ctx) {
170                                    error = Some(err);
171                                }
172                            }
173                            ModuleType::Async(_module) => {
174                                // Async modules are handled separately in the async phase
175                            }
176                        }
177                    }
178                }
179                
180                // Handle module initialization error
181                if let Some(err) = error {
182                    self.log_module_error("init", &module_name, &err);
183                    if critical {
184                        return Err(err);
185                    } else {
186                        // Mark module as failed with single write lock acquisition
187                        let mut modules_guard = self.modules.write().await;
188                        if idx < modules_guard.len() {
189                            modules_guard[idx].failed = true;
190                        }
191                    }
192                }
193            }
194        }
195        
196        // Emit after modules init hook
197        self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesInit, &self.ctx, None, None)?;
198
199        // Emit before modules start hook
200        self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, None, None)?;
201        
202        // Start synchronous modules with optimized locking
203        for (idx, skip, module_name, critical) in module_states.iter().cloned() {
204            if !skip {
205                let mut err_phase = "start";
206                
207                // Execute all sync module phases with single write lock acquisition
208                let mut error = None;
209                {
210                    let mut modules_guard = self.modules.write().await;
211                    if idx < modules_guard.len() {
212                        match &mut modules_guard[idx].module {
213                            ModuleType::Sync(_module) => {
214                                // Execute before_start phase
215                                if let Err(err) = _module.before_start(&mut self.ctx) {
216                                    err_phase = "before_start";
217                                    error = Some(err);
218                                }
219                                
220                                // Execute start phase if no error
221                                if error.is_none() {
222                                    if let Err(err) = _module.start(&mut self.ctx) {
223                                        err_phase = "start";
224                                        error = Some(err);
225                                    }
226                                }
227                                
228                                // Execute after_start phase if no error
229                                if error.is_none() {
230                                    if let Err(err) = _module.after_start(&mut self.ctx) {
231                                        err_phase = "after_start";
232                                        error = Some(err);
233                                    }
234                                }
235                            }
236                            ModuleType::Async(_module) => {
237                                // Async modules are handled separately in the async phase
238                            }
239                        }
240                    }
241                }
242                
243                // Emit hooks outside of lock to avoid potential deadlocks
244                if error.is_none() {
245                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::BeforeStart))?;
246                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Start))?;
247                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AfterStart))?;
248                }
249                
250                // Handle module start error
251                if let Some(err) = error {
252                    self.log_module_error(err_phase, &module_name, &err);
253                    if critical {
254                        return Err(err);
255                    } else {
256                        // Mark module as failed with single write lock acquisition
257                        let mut modules_guard = self.modules.write().await;
258                        if idx < modules_guard.len() {
259                            modules_guard[idx].failed = true;
260                        }
261                    }
262                }
263            }
264        }
265        
266        // Emit after modules start hook
267        self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesStart, &self.ctx, None, None)?;
268
269        // Initialize and start asynchronous modules with optimized locking
270        for idx in 0..module_len {
271            let mut err_phase = "async_start";
272            
273            // Check if this is an async module and get its state
274            let (skip, module_name, critical) = {
275                let modules_guard = self.modules.read().await;
276                if idx < modules_guard.len() {
277                    let slot = &modules_guard[idx];
278                    if !slot.failed {
279                        match &slot.module {
280                            ModuleType::Async(module) => (
281                                false,
282                                module.name().to_string(),
283                                module.is_critical(),
284                            ),
285                            ModuleType::Sync(_) => (true, String::new(), false),
286                        }
287                    } else {
288                        (true, String::new(), false)
289                    }
290                } else {
291                    (true, String::new(), false)
292                }
293            };
294            
295            if !skip {
296                // Execute all async module phases with single write lock acquisition
297                let mut error = None;
298                {
299                    let mut modules_guard = self.modules.write().await;
300                    if idx < modules_guard.len() {
301                        if let ModuleType::Async(_module) = &mut modules_guard[idx].module {
302                            // Execute async init phase
303                            if let Err(err) = _module.init(&mut self.ctx).await {
304                                err_phase = "async_init";
305                                error = Some(err);
306                            }
307                            
308                            // Execute async before_start phase if no error
309                            if error.is_none() {
310                                if let Err(err) = _module.before_start(&mut self.ctx).await {
311                                    err_phase = "async_before_start";
312                                    error = Some(err);
313                                }
314                            }
315                            
316                            // Execute async start phase if no error
317                            if error.is_none() {
318                                if let Err(err) = _module.start(&mut self.ctx).await {
319                                    err_phase = "async_start";
320                                    error = Some(err);
321                                }
322                            }
323                            
324                            // Execute async after_start phase if no error
325                            if error.is_none() {
326                                if let Err(err) = _module.after_start(&mut self.ctx).await {
327                                    err_phase = "async_after_start";
328                                    error = Some(err);
329                                }
330                            }
331                        }
332                    }
333                }
334                
335                // Emit hooks outside of lock to avoid potential deadlocks
336                if error.is_none() {
337                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncInit))?;
338                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncBeforeStart))?;
339                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncStart))?;
340                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncAfterStart))?;
341                }
342                
343                // Handle async module error
344                if let Some(err) = error {
345                    self.log_module_error(err_phase, &module_name, &err);
346                    if critical {
347                        return Err(err);
348                    } else {
349                        // Mark module as failed with single write lock acquisition
350                        let mut modules_guard = self.modules.write().await;
351                        if idx < modules_guard.len() {
352                            modules_guard[idx].failed = true;
353                        }
354                    }
355                }
356            }
357        }
358        
359        // Emit after async modules start hook
360        self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesStart, &self.ctx, None, None)?;
361        
362        // Run the application business logic (provided closure)
363        let result = f(&self.ctx).await;
364        
365        // Emit before modules shutdown hook
366        // Note: We're using a new context here since we've moved the original to the closure
367        let _ = self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, None, None);
368        
369        // Shutdown synchronous modules in reverse order with optimized locking
370        for idx in (0..module_len).rev() {
371            // Check if this is a sync module and get its state
372            let (skip, module_name, critical) = {
373                let modules_guard = self.modules.read().await;
374                if idx < modules_guard.len() {
375                    let slot = &modules_guard[idx];
376                    if !slot.failed {
377                        match &slot.module {
378                            ModuleType::Sync(module) => (
379                                false,
380                                module.name().to_string(),
381                                module.is_critical(),
382                            ),
383                            ModuleType::Async(_) => (true, String::new(), false),
384                        }
385                    } else {
386                        (true, String::new(), false)
387                    }
388                } else {
389                    (true, String::new(), false)
390                }
391            };
392            
393            if !skip {
394                // Execute all sync module shutdown phases with single write lock acquisition
395                let mut err_phase = "shutdown";
396                let mut error = None;
397                {
398                    let mut modules_guard = self.modules.write().await;
399                    if idx < modules_guard.len() {
400                        if let ModuleType::Sync(_module) = &mut modules_guard[idx].module {
401                            // Execute before_shutdown phase
402                            if let Err(err) = _module.before_shutdown(&mut self.ctx) {
403                                err_phase = "before_shutdown";
404                                error = Some(err);
405                            }
406                            
407                            // Execute shutdown phase if no error
408                            if error.is_none() {
409                                if let Err(err) = _module.shutdown(&mut self.ctx) {
410                                    err_phase = "shutdown";
411                                    error = Some(err);
412                                }
413                            }
414                            
415                            // Execute after_shutdown phase if no error
416                            if error.is_none() {
417                                if let Err(err) = _module.after_shutdown(&mut self.ctx) {
418                                    err_phase = "after_shutdown";
419                                    error = Some(err);
420                                }
421                            }
422                        }
423                    }
424                }
425                
426                // Emit hooks outside of lock to avoid potential deadlocks
427                if error.is_none() {
428                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::BeforeShutdown))?;
429                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Shutdown))?;
430                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AfterShutdown))?;
431                }
432                
433                // Handle module shutdown error
434                if let Some(err) = error {
435                    self.log_module_error(err_phase, &module_name, &err);
436                    if critical {
437                        return Err(err);
438                    } else {
439                        // Mark module as failed
440                        let mut modules_guard = self.modules.write().await;
441                        if idx < modules_guard.len() {
442                            modules_guard[idx].failed = true;
443                        }
444                    }
445                }
446            }
447        }
448        
449        // Emit after modules shutdown hook
450        self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesShutdown, &self.ctx, None, None)?;
451
452        // Shutdown asynchronous modules in reverse order with optimized locking
453        self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, None, None)?;
454        
455        for idx in (0..module_len).rev() {
456            // Check if this is an async module and get its state
457            let (skip, module_name, critical) = {
458                let modules_guard = self.modules.read().await;
459                if idx < modules_guard.len() {
460                    let slot = &modules_guard[idx];
461                    if !slot.failed {
462                        match &slot.module {
463                            ModuleType::Async(module) => (
464                                false,
465                                module.name().to_string(),
466                                module.is_critical(),
467                            ),
468                            ModuleType::Sync(_) => (true, String::new(), false),
469                        }
470                    } else {
471                        (true, String::new(), false)
472                    }
473                } else {
474                    (true, String::new(), false)
475                }
476            };
477            
478            if !skip {
479                // Execute all async module shutdown phases with single write lock acquisition
480                let mut err_phase = "async_shutdown";
481                let mut error = None;
482                {
483                    let mut modules_guard = self.modules.write().await;
484                    if idx < modules_guard.len() {
485                        if let ModuleType::Async(_module) = &mut modules_guard[idx].module {
486                            // Execute async before_shutdown phase
487                            if let Err(err) = _module.before_shutdown(&mut self.ctx).await {
488                                err_phase = "async_before_shutdown";
489                                error = Some(err);
490                            }
491                            
492                            // Execute async shutdown phase if no error
493                            if error.is_none() {
494                                if let Err(err) = _module.shutdown(&mut self.ctx).await {
495                                    err_phase = "async_shutdown";
496                                    error = Some(err);
497                                }
498                            }
499                            
500                            // Execute async after_shutdown phase if no error
501                            if error.is_none() {
502                                if let Err(err) = _module.after_shutdown(&mut self.ctx).await {
503                                    err_phase = "async_after_shutdown";
504                                    error = Some(err);
505                                }
506                            }
507                        }
508                    }
509                }
510                
511                // Emit hooks outside of lock to avoid potential deadlocks
512                if error.is_none() {
513                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncBeforeShutdown))?;
514                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncShutdown))?;
515                    self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncAfterShutdown))?;
516                }
517                
518                // Handle async module shutdown error
519                if let Some(err) = error {
520                    self.log_module_error(err_phase, &module_name, &err);
521                    if critical {
522                        return Err(err);
523                    } else {
524                        // Mark module as failed with single write lock acquisition
525                        let mut modules_guard = self.modules.write().await;
526                        if idx < modules_guard.len() {
527                            modules_guard[idx].failed = true;
528                        }
529                    }
530                }
531            }
532        }
533        
534        // Emit after async modules shutdown hook
535        self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesShutdown, &self.ctx, None, None)?;
536
537        // Emit shutdown hook
538        self.ctx.hooks().emit_with(&DMSCHookKind::Shutdown, &self.ctx, None, None)?;
539
540        // Return the result of the closure execution
541        result
542    }
543
544    /// Log a module error.
545    /// 
546    /// This method logs an error that occurred during module execution, including
547    /// the module name, phase, and error message.
548    /// 
549    /// # Parameters
550    /// 
551    /// - `phase`: The lifecycle phase during which the error occurred
552    /// - `module_name`: The name of the module that failed
553    /// - `err`: The error that occurred
554    fn log_module_error(&self, phase: &str, module_name: &str, err: &crate::core::DMSCError) {
555        let logger = self.ctx.logger();
556        let message = format!("module={module_name} phase={phase} error={err}");
557        let _ = logger.error("DMSC.Runtime", message);
558    }
559}
560
561#[cfg(feature = "pyo3")]
562#[pyo3::prelude::pymethods]
563impl DMSCAppRuntime {
564    fn py_run(&self, callback: Py<pyo3::PyAny>) -> PyResult<()> {
565        let rt = tokio::runtime::Builder::new_current_thread()
566            .enable_all()
567            .build()
568            .map_err::<pyo3::PyErr, _>(|e| e.into())?;
569        
570        let runtime = self.clone();
571        pyo3::Python::attach(|py| {
572            rt.block_on(async move {
573                let _ = callback.call0(py);
574                runtime.run(|_ctx| async move { Ok(()) }).await
575            }).map_err(|e| e.into())
576        })
577    }
578
579    fn get_context(&self) -> PyResult<DMSCServiceContext> {
580        Ok(self.ctx.clone())
581    }
582
583    #[pyo3(name = "logger")]
584    fn logger_py(&self) -> crate::log::DMSCLogger {
585        self.ctx.logger().clone()
586    }
587}