dmsc/core/app_runtime.rs
1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of DMSC.
4//! The DMSC project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! you may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//! http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Application Runtime
21//!
22//! This module provides the application runtime for DMSC applications.
23//! The `DMSCAppRuntime` manages the application lifecycle, including module initialization,
24//! startup, and shutdown. It also handles the execution of both synchronous and asynchronous modules.
25
26use crate::core::{DMSCResult, DMSCServiceContext};
27use crate::hooks::{DMSCHookKind, DMSCModulePhase};
28use super::module_types::{ModuleSlot, ModuleType};
29use tokio::sync::RwLock as AsyncRwLock;
30use std::sync::Arc;
31#[cfg(feature = "pyo3")]
32use pyo3::prelude::*;
33
34/// Public-facing application runtime.
35///
36/// The `DMSCAppRuntime` manages the application lifecycle, including module initialization,
37/// startup, and shutdown. It also handles the execution of both synchronous and asynchronous modules.
38///
39/// ## Usage
40///
41/// ```rust
42/// use dmsc::prelude::*;
43///
44/// #[tokio::main]
45/// async fn main() -> DMSCResult<()> {
46/// let app = DMSCAppBuilder::new()
47/// .with_config("config.yaml")?
48/// .build()?;
49///
50/// app.run(|ctx| async move {
51/// ctx.logger().info("service", "DMSC service started")?;
52/// Ok(())
53/// }).await
54/// }
55/// ```
56
57#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
58#[derive(Clone)]
59pub struct DMSCAppRuntime {
60 /// Service context providing access to core functionalities
61 ctx: DMSCServiceContext,
62 /// Vector of modules with their state, protected by an async RwLock
63 modules: Arc<AsyncRwLock<Vec<ModuleSlot>>>,
64}
65
66impl DMSCAppRuntime {
67 /// Create a new application runtime with the given context and modules.
68 ///
69 /// This method is typically called by the `DMSCAppBuilder` during the build process.
70 ///
71 /// # Parameters
72 ///
73 /// - `ctx`: Service context with core functionalities
74 /// - `modules`: Vector of modules with their initial state
75 ///
76 /// # Returns
77 ///
78 /// A new `DMSCAppRuntime` instance.
79 pub fn new(ctx: DMSCServiceContext, modules: Vec<ModuleSlot>) -> Self {
80 Self {
81 ctx,
82 modules: Arc::new(AsyncRwLock::new(modules)),
83 }
84 }
85
86 /// Run the application lifecycle.
87 ///
88 /// This method executes the complete application lifecycle, including:
89 /// 1. Emitting startup hooks
90 /// 2. Initializing synchronous modules
91 /// 3. Starting synchronous modules
92 /// 4. Initializing and starting asynchronous modules
93 /// 5. Running the application business logic via the provided closure
94 /// 6. Shutting down asynchronous modules
95 /// 7. Shutting down synchronous modules
96 /// 8. Emitting shutdown hooks
97 ///
98 /// # Parameters
99 ///
100 /// - `f`: A closure that takes a `DMSCServiceContext` and returns a `DMSCResult<()>`.
101 /// This closure contains the application's business logic and is executed after all
102 /// modules have been initialized and started, but before any modules are shut down.
103 ///
104 /// # Returns
105 ///
106 /// A `DMSCResult` indicating success or failure.
107 ///
108 /// # Errors
109 ///
110 /// Returns an error if:
111 /// - A critical module fails during execution
112 /// - The provided closure returns an error
113 pub async fn run<F, Fut>(mut self, f: F) -> DMSCResult<()>
114 where
115 F: FnOnce(&DMSCServiceContext) -> Fut,
116 Fut: std::future::Future<Output = DMSCResult<()>>,
117 {
118 // Emit startup hook
119 self.ctx.hooks().emit_with(&DMSCHookKind::Startup, &self.ctx, None, None)?;
120
121 // Emit before modules init hook
122 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesInit, &self.ctx, None, None)?;
123
124 // Get module count
125 let modules_guard = self.modules.read().await;
126 let module_len = modules_guard.len();
127 drop(modules_guard); // Release lock early
128
129 // Collect module states first to avoid repeated lock acquisitions
130 let mut module_states = Vec::new();
131 {
132 let modules_guard = self.modules.read().await;
133 for idx in 0..module_len {
134 if idx < modules_guard.len() {
135 let slot = &modules_guard[idx];
136 module_states.push((
137 idx,
138 !slot.failed,
139 if !slot.failed {
140 slot.module.name().to_string()
141 } else {
142 String::new()
143 },
144 if !slot.failed {
145 slot.module.is_critical()
146 } else {
147 false
148 },
149 ));
150 } else {
151 module_states.push((idx, false, String::new(), false));
152 }
153 }
154 }
155
156 // Initialize synchronous modules
157 for (idx, skip, module_name, critical) in module_states.iter().cloned() {
158 if !skip {
159 // Emit before module init hook
160 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesInit, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Init))?;
161
162 // Initialize module with single write lock acquisition
163 let mut error = None;
164 {
165 let mut modules_guard = self.modules.write().await;
166 if idx < modules_guard.len() {
167 match &mut modules_guard[idx].module {
168 ModuleType::Sync(_module) => {
169 if let Err(err) = _module.init(&mut self.ctx) {
170 error = Some(err);
171 }
172 }
173 ModuleType::Async(_module) => {
174 // Async modules are handled separately in the async phase
175 }
176 }
177 }
178 }
179
180 // Handle module initialization error
181 if let Some(err) = error {
182 self.log_module_error("init", &module_name, &err);
183 if critical {
184 return Err(err);
185 } else {
186 // Mark module as failed with single write lock acquisition
187 let mut modules_guard = self.modules.write().await;
188 if idx < modules_guard.len() {
189 modules_guard[idx].failed = true;
190 }
191 }
192 }
193 }
194 }
195
196 // Emit after modules init hook
197 self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesInit, &self.ctx, None, None)?;
198
199 // Emit before modules start hook
200 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, None, None)?;
201
202 // Start synchronous modules with optimized locking
203 for (idx, skip, module_name, critical) in module_states.iter().cloned() {
204 if !skip {
205 let mut err_phase = "start";
206
207 // Execute all sync module phases with single write lock acquisition
208 let mut error = None;
209 {
210 let mut modules_guard = self.modules.write().await;
211 if idx < modules_guard.len() {
212 match &mut modules_guard[idx].module {
213 ModuleType::Sync(_module) => {
214 // Execute before_start phase
215 if let Err(err) = _module.before_start(&mut self.ctx) {
216 err_phase = "before_start";
217 error = Some(err);
218 }
219
220 // Execute start phase if no error
221 if error.is_none() {
222 if let Err(err) = _module.start(&mut self.ctx) {
223 err_phase = "start";
224 error = Some(err);
225 }
226 }
227
228 // Execute after_start phase if no error
229 if error.is_none() {
230 if let Err(err) = _module.after_start(&mut self.ctx) {
231 err_phase = "after_start";
232 error = Some(err);
233 }
234 }
235 }
236 ModuleType::Async(_module) => {
237 // Async modules are handled separately in the async phase
238 }
239 }
240 }
241 }
242
243 // Emit hooks outside of lock to avoid potential deadlocks
244 if error.is_none() {
245 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::BeforeStart))?;
246 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Start))?;
247 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AfterStart))?;
248 }
249
250 // Handle module start error
251 if let Some(err) = error {
252 self.log_module_error(err_phase, &module_name, &err);
253 if critical {
254 return Err(err);
255 } else {
256 // Mark module as failed with single write lock acquisition
257 let mut modules_guard = self.modules.write().await;
258 if idx < modules_guard.len() {
259 modules_guard[idx].failed = true;
260 }
261 }
262 }
263 }
264 }
265
266 // Emit after modules start hook
267 self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesStart, &self.ctx, None, None)?;
268
269 // Initialize and start asynchronous modules with optimized locking
270 for idx in 0..module_len {
271 let mut err_phase = "async_start";
272
273 // Check if this is an async module and get its state
274 let (skip, module_name, critical) = {
275 let modules_guard = self.modules.read().await;
276 if idx < modules_guard.len() {
277 let slot = &modules_guard[idx];
278 if !slot.failed {
279 match &slot.module {
280 ModuleType::Async(module) => (
281 false,
282 module.name().to_string(),
283 module.is_critical(),
284 ),
285 ModuleType::Sync(_) => (true, String::new(), false),
286 }
287 } else {
288 (true, String::new(), false)
289 }
290 } else {
291 (true, String::new(), false)
292 }
293 };
294
295 if !skip {
296 // Execute all async module phases with single write lock acquisition
297 let mut error = None;
298 {
299 let mut modules_guard = self.modules.write().await;
300 if idx < modules_guard.len() {
301 if let ModuleType::Async(_module) = &mut modules_guard[idx].module {
302 // Execute async init phase
303 if let Err(err) = _module.init(&mut self.ctx).await {
304 err_phase = "async_init";
305 error = Some(err);
306 }
307
308 // Execute async before_start phase if no error
309 if error.is_none() {
310 if let Err(err) = _module.before_start(&mut self.ctx).await {
311 err_phase = "async_before_start";
312 error = Some(err);
313 }
314 }
315
316 // Execute async start phase if no error
317 if error.is_none() {
318 if let Err(err) = _module.start(&mut self.ctx).await {
319 err_phase = "async_start";
320 error = Some(err);
321 }
322 }
323
324 // Execute async after_start phase if no error
325 if error.is_none() {
326 if let Err(err) = _module.after_start(&mut self.ctx).await {
327 err_phase = "async_after_start";
328 error = Some(err);
329 }
330 }
331 }
332 }
333 }
334
335 // Emit hooks outside of lock to avoid potential deadlocks
336 if error.is_none() {
337 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncInit))?;
338 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncBeforeStart))?;
339 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncStart))?;
340 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesStart, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncAfterStart))?;
341 }
342
343 // Handle async module error
344 if let Some(err) = error {
345 self.log_module_error(err_phase, &module_name, &err);
346 if critical {
347 return Err(err);
348 } else {
349 // Mark module as failed with single write lock acquisition
350 let mut modules_guard = self.modules.write().await;
351 if idx < modules_guard.len() {
352 modules_guard[idx].failed = true;
353 }
354 }
355 }
356 }
357 }
358
359 // Emit after async modules start hook
360 self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesStart, &self.ctx, None, None)?;
361
362 // Run the application business logic (provided closure)
363 let result = f(&self.ctx).await;
364
365 // Emit before modules shutdown hook
366 // Note: We're using a new context here since we've moved the original to the closure
367 let _ = self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, None, None);
368
369 // Shutdown synchronous modules in reverse order with optimized locking
370 for idx in (0..module_len).rev() {
371 // Check if this is a sync module and get its state
372 let (skip, module_name, critical) = {
373 let modules_guard = self.modules.read().await;
374 if idx < modules_guard.len() {
375 let slot = &modules_guard[idx];
376 if !slot.failed {
377 match &slot.module {
378 ModuleType::Sync(module) => (
379 false,
380 module.name().to_string(),
381 module.is_critical(),
382 ),
383 ModuleType::Async(_) => (true, String::new(), false),
384 }
385 } else {
386 (true, String::new(), false)
387 }
388 } else {
389 (true, String::new(), false)
390 }
391 };
392
393 if !skip {
394 // Execute all sync module shutdown phases with single write lock acquisition
395 let mut err_phase = "shutdown";
396 let mut error = None;
397 {
398 let mut modules_guard = self.modules.write().await;
399 if idx < modules_guard.len() {
400 if let ModuleType::Sync(_module) = &mut modules_guard[idx].module {
401 // Execute before_shutdown phase
402 if let Err(err) = _module.before_shutdown(&mut self.ctx) {
403 err_phase = "before_shutdown";
404 error = Some(err);
405 }
406
407 // Execute shutdown phase if no error
408 if error.is_none() {
409 if let Err(err) = _module.shutdown(&mut self.ctx) {
410 err_phase = "shutdown";
411 error = Some(err);
412 }
413 }
414
415 // Execute after_shutdown phase if no error
416 if error.is_none() {
417 if let Err(err) = _module.after_shutdown(&mut self.ctx) {
418 err_phase = "after_shutdown";
419 error = Some(err);
420 }
421 }
422 }
423 }
424 }
425
426 // Emit hooks outside of lock to avoid potential deadlocks
427 if error.is_none() {
428 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::BeforeShutdown))?;
429 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::Shutdown))?;
430 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AfterShutdown))?;
431 }
432
433 // Handle module shutdown error
434 if let Some(err) = error {
435 self.log_module_error(err_phase, &module_name, &err);
436 if critical {
437 return Err(err);
438 } else {
439 // Mark module as failed
440 let mut modules_guard = self.modules.write().await;
441 if idx < modules_guard.len() {
442 modules_guard[idx].failed = true;
443 }
444 }
445 }
446 }
447 }
448
449 // Emit after modules shutdown hook
450 self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesShutdown, &self.ctx, None, None)?;
451
452 // Shutdown asynchronous modules in reverse order with optimized locking
453 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, None, None)?;
454
455 for idx in (0..module_len).rev() {
456 // Check if this is an async module and get its state
457 let (skip, module_name, critical) = {
458 let modules_guard = self.modules.read().await;
459 if idx < modules_guard.len() {
460 let slot = &modules_guard[idx];
461 if !slot.failed {
462 match &slot.module {
463 ModuleType::Async(module) => (
464 false,
465 module.name().to_string(),
466 module.is_critical(),
467 ),
468 ModuleType::Sync(_) => (true, String::new(), false),
469 }
470 } else {
471 (true, String::new(), false)
472 }
473 } else {
474 (true, String::new(), false)
475 }
476 };
477
478 if !skip {
479 // Execute all async module shutdown phases with single write lock acquisition
480 let mut err_phase = "async_shutdown";
481 let mut error = None;
482 {
483 let mut modules_guard = self.modules.write().await;
484 if idx < modules_guard.len() {
485 if let ModuleType::Async(_module) = &mut modules_guard[idx].module {
486 // Execute async before_shutdown phase
487 if let Err(err) = _module.before_shutdown(&mut self.ctx).await {
488 err_phase = "async_before_shutdown";
489 error = Some(err);
490 }
491
492 // Execute async shutdown phase if no error
493 if error.is_none() {
494 if let Err(err) = _module.shutdown(&mut self.ctx).await {
495 err_phase = "async_shutdown";
496 error = Some(err);
497 }
498 }
499
500 // Execute async after_shutdown phase if no error
501 if error.is_none() {
502 if let Err(err) = _module.after_shutdown(&mut self.ctx).await {
503 err_phase = "async_after_shutdown";
504 error = Some(err);
505 }
506 }
507 }
508 }
509 }
510
511 // Emit hooks outside of lock to avoid potential deadlocks
512 if error.is_none() {
513 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncBeforeShutdown))?;
514 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncShutdown))?;
515 self.ctx.hooks().emit_with(&DMSCHookKind::BeforeModulesShutdown, &self.ctx, Some(&module_name), Some(DMSCModulePhase::AsyncAfterShutdown))?;
516 }
517
518 // Handle async module shutdown error
519 if let Some(err) = error {
520 self.log_module_error(err_phase, &module_name, &err);
521 if critical {
522 return Err(err);
523 } else {
524 // Mark module as failed with single write lock acquisition
525 let mut modules_guard = self.modules.write().await;
526 if idx < modules_guard.len() {
527 modules_guard[idx].failed = true;
528 }
529 }
530 }
531 }
532 }
533
534 // Emit after async modules shutdown hook
535 self.ctx.hooks().emit_with(&DMSCHookKind::AfterModulesShutdown, &self.ctx, None, None)?;
536
537 // Emit shutdown hook
538 self.ctx.hooks().emit_with(&DMSCHookKind::Shutdown, &self.ctx, None, None)?;
539
540 // Return the result of the closure execution
541 result
542 }
543
544 /// Log a module error.
545 ///
546 /// This method logs an error that occurred during module execution, including
547 /// the module name, phase, and error message.
548 ///
549 /// # Parameters
550 ///
551 /// - `phase`: The lifecycle phase during which the error occurred
552 /// - `module_name`: The name of the module that failed
553 /// - `err`: The error that occurred
554 fn log_module_error(&self, phase: &str, module_name: &str, err: &crate::core::DMSCError) {
555 let logger = self.ctx.logger();
556 let message = format!("module={module_name} phase={phase} error={err}");
557 let _ = logger.error("DMSC.Runtime", message);
558 }
559}
560
561#[cfg(feature = "pyo3")]
562#[pyo3::prelude::pymethods]
563impl DMSCAppRuntime {
564 fn py_run(&self, callback: Py<pyo3::PyAny>) -> PyResult<()> {
565 let rt = tokio::runtime::Builder::new_current_thread()
566 .enable_all()
567 .build()
568 .map_err::<pyo3::PyErr, _>(|e| e.into())?;
569
570 let runtime = self.clone();
571 pyo3::Python::attach(|py| {
572 rt.block_on(async move {
573 let _ = callback.call0(py);
574 runtime.run(|_ctx| async move { Ok(()) }).await
575 }).map_err(|e| e.into())
576 })
577 }
578
579 fn get_context(&self) -> PyResult<DMSCServiceContext> {
580 Ok(self.ctx.clone())
581 }
582
583 #[pyo3(name = "logger")]
584 fn logger_py(&self) -> crate::log::DMSCLogger {
585 self.ctx.logger().clone()
586 }
587}