1use crate::core::RiResult;
19use crate::database::{RiDatabase, RiDatabaseConfig, RiDBResult, RiDBRow};
20use dashmap::DashMap;
21use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
22use std::sync::Arc;
23use tokio::sync::Semaphore;
24use tokio::time::{Duration, Instant};
25use std::sync::RwLock;
26
27#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
28#[derive(Debug, Clone, Default)]
29pub struct DatabaseMetrics {
30 pub active_connections: u64,
31 pub idle_connections: u64,
32 pub total_connections: u64,
33 pub queries_executed: u64,
34 pub query_duration_ms: f64,
35 pub errors: u64,
36 pub utilization_rate: f64,
37}
38
39#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
40#[derive(Debug, Clone)]
41pub struct DynamicPoolConfig {
42 pub enable_dynamic_scaling: bool,
43 pub scale_up_threshold: f64,
44 pub scale_down_threshold: f64,
45 pub scale_down_cooldown_secs: u64,
46 pub min_connections: u32,
47 pub max_connections: u32,
48 pub scale_up_step: u32,
49 pub scale_down_step: u32,
50}
51
52impl Default for DynamicPoolConfig {
53 fn default() -> Self {
54 Self {
55 enable_dynamic_scaling: true,
56 scale_up_threshold: 0.8,
57 scale_down_threshold: 0.3,
58 scale_down_cooldown_secs: 300,
59 min_connections: 2,
60 max_connections: 50,
61 scale_up_step: 2,
62 scale_down_step: 1,
63 }
64 }
65}
66
67#[cfg(feature = "pyo3")]
68#[pyo3::prelude::pymethods]
69impl DynamicPoolConfig {
70 #[new]
71 fn py_new() -> Self {
72 Self::default()
73 }
74
75 #[staticmethod]
76 fn create() -> Self {
77 Self::default()
78 }
79
80 fn get_enable_dynamic_scaling(&self) -> bool {
81 self.enable_dynamic_scaling
82 }
83
84 fn set_enable_dynamic_scaling(&mut self, value: bool) {
85 self.enable_dynamic_scaling = value;
86 }
87
88 fn get_scale_up_threshold(&self) -> f64 {
89 self.scale_up_threshold
90 }
91
92 fn set_scale_up_threshold(&mut self, value: f64) {
93 self.scale_up_threshold = value;
94 }
95
96 fn get_scale_down_threshold(&self) -> f64 {
97 self.scale_down_threshold
98 }
99
100 fn set_scale_down_threshold(&mut self, value: f64) {
101 self.scale_down_threshold = value;
102 }
103
104 fn get_scale_down_cooldown_secs(&self) -> u64 {
105 self.scale_down_cooldown_secs
106 }
107
108 fn set_scale_down_cooldown_secs(&mut self, value: u64) {
109 self.scale_down_cooldown_secs = value;
110 }
111
112 fn get_min_connections(&self) -> u32 {
113 self.min_connections
114 }
115
116 fn set_min_connections(&mut self, value: u32) {
117 self.min_connections = value;
118 }
119
120 fn get_max_connections(&self) -> u32 {
121 self.max_connections
122 }
123
124 fn set_max_connections(&mut self, value: u32) {
125 self.max_connections = value;
126 }
127
128 fn get_scale_up_step(&self) -> u32 {
129 self.scale_up_step
130 }
131
132 fn set_scale_up_step(&mut self, value: u32) {
133 self.scale_up_step = value;
134 }
135
136 fn get_scale_down_step(&self) -> u32 {
137 self.scale_down_step
138 }
139
140 fn set_scale_down_step(&mut self, value: u32) {
141 self.scale_down_step = value;
142 }
143}
144
145#[derive(Clone)]
146pub struct PooledDatabase {
147 id: u32,
148 inner: Arc<dyn RiDatabase>,
149 pool: Arc<RiDatabasePool>,
150}
151
152impl PooledDatabase {
153 pub fn new(id: u32, inner: Arc<dyn RiDatabase>, pool: Arc<RiDatabasePool>) -> Self {
154 Self { id, inner, pool }
155 }
156
157 pub fn id(&self) -> u32 {
158 self.id
159 }
160
161 pub async fn execute(&self, sql: &str) -> RiResult<u64> {
162 self.inner.execute(sql).await
163 }
164
165 pub async fn query(&self, sql: &str) -> RiResult<RiDBResult> {
166 self.inner.query(sql).await
167 }
168
169 pub async fn query_one(&self, sql: &str) -> RiResult<Option<RiDBRow>> {
170 self.inner.query_one(sql).await
171 }
172
173 pub async fn ping(&self) -> RiResult<bool> {
174 self.inner.ping().await
175 }
176
177 pub fn is_connected(&self) -> bool {
178 self.inner.is_connected()
179 }
180
181 pub fn pool_metrics(&self) -> DatabaseMetrics {
182 self.pool.metrics()
183 }
184}
185
186#[async_trait::async_trait]
187impl RiDatabase for PooledDatabase {
188 fn database_type(&self) -> crate::database::DatabaseType {
189 self.inner.database_type()
190 }
191
192 async fn execute(&self, sql: &str) -> RiResult<u64> {
193 self.inner.execute(sql).await
194 }
195
196 async fn query(&self, sql: &str) -> RiResult<RiDBResult> {
197 self.inner.query(sql).await
198 }
199
200 async fn query_one(&self, sql: &str) -> RiResult<Option<RiDBRow>> {
201 self.inner.query_one(sql).await
202 }
203
204 async fn ping(&self) -> RiResult<bool> {
205 self.inner.ping().await
206 }
207
208 fn is_connected(&self) -> bool {
209 self.inner.is_connected()
210 }
211
212 async fn close(&self) -> RiResult<()> {
213 self.pool.close().await
214 }
215
216 async fn batch_execute(&self, sql: &str, params: &[Vec<serde_json::Value>]) -> RiResult<Vec<u64>> {
217 self.inner.batch_execute(sql, params).await
218 }
219
220 async fn batch_query(&self, sql: &str, params: &[Vec<serde_json::Value>]) -> RiResult<Vec<RiDBResult>> {
221 self.inner.batch_query(sql, params).await
222 }
223
224 async fn execute_with_params(&self, sql: &str, params: &[serde_json::Value]) -> RiResult<u64> {
225 self.inner.execute_with_params(sql, params).await
226 }
227
228 async fn query_with_params(&self, sql: &str, params: &[serde_json::Value]) -> RiResult<RiDBResult> {
229 self.inner.query_with_params(sql, params).await
230 }
231
232 async fn transaction(&self) -> RiResult<Box<dyn crate::database::RiDatabaseTransaction>> {
233 self.inner.transaction().await
234 }
235}
236
237struct PoolConnection {
238 db: Arc<dyn RiDatabase>,
239 acquired_at: Instant,
240 created_at: Instant,
241}
242
243struct LowUtilizationTracker {
244 below_threshold_since: Option<Instant>,
245 was_below_threshold: bool,
246}
247
248impl LowUtilizationTracker {
249 fn new() -> Self {
250 Self {
251 below_threshold_since: None,
252 was_below_threshold: false,
253 }
254 }
255
256 fn update(&mut self, is_below_threshold: bool) {
257 if is_below_threshold && !self.was_below_threshold {
258 self.below_threshold_since = Some(Instant::now());
259 self.was_below_threshold = true;
260 } else if !is_below_threshold {
261 self.below_threshold_since = None;
262 self.was_below_threshold = false;
263 }
264 }
265
266 fn duration_below_threshold(&self) -> Option<Duration> {
267 self.below_threshold_since.map(|since| since.elapsed())
268 }
269}
270
271#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
272pub struct RiDatabasePool {
273 config: RiDatabaseConfig,
274 connections: Arc<DashMap<u32, PoolConnection>>,
275 available: Arc<DashMap<u32, PoolConnection>>,
276 connection_ids: Arc<AtomicU64>,
277 semaphore: Arc<Semaphore>,
278 max_idle_time: Duration,
279 max_lifetime: Duration,
280 idle_connections: Arc<AtomicU64>,
281 active_connections: Arc<AtomicU64>,
282 total_connections: Arc<AtomicU64>,
283 queries_executed: Arc<AtomicU64>,
284 errors: Arc<AtomicU64>,
285 dynamic_config: Arc<RwLock<DynamicPoolConfig>>,
286 low_utilization_tracker: Arc<RwLock<LowUtilizationTracker>>,
287 scaling_in_progress: Arc<AtomicBool>,
288 current_max_connections: Arc<AtomicU64>,
289}
290
291impl RiDatabasePool {
292 pub async fn new(config: RiDatabaseConfig) -> RiResult<Self> {
293 let dynamic_config = DynamicPoolConfig {
294 enable_dynamic_scaling: true,
295 scale_up_threshold: 0.8,
296 scale_down_threshold: 0.3,
297 scale_down_cooldown_secs: 300,
298 min_connections: config.min_idle_connections,
299 max_connections: config.max_connections,
300 scale_up_step: 2,
301 scale_down_step: 1,
302 };
303
304 let pool = Self {
305 config: config.clone(),
306 connections: Arc::new(DashMap::new()),
307 available: Arc::new(DashMap::new()),
308 connection_ids: Arc::new(AtomicU64::new(0)),
309 semaphore: Arc::new(Semaphore::new(config.max_connections as usize)),
310 max_idle_time: Duration::from_secs(config.idle_timeout_secs),
311 max_lifetime: Duration::from_secs(config.max_lifetime_secs),
312 idle_connections: Arc::new(AtomicU64::new(0)),
313 active_connections: Arc::new(AtomicU64::new(0)),
314 total_connections: Arc::new(AtomicU64::new(0)),
315 queries_executed: Arc::new(AtomicU64::new(0)),
316 errors: Arc::new(AtomicU64::new(0)),
317 dynamic_config: Arc::new(RwLock::new(dynamic_config)),
318 low_utilization_tracker: Arc::new(RwLock::new(LowUtilizationTracker::new())),
319 scaling_in_progress: Arc::new(AtomicBool::new(false)),
320 current_max_connections: Arc::new(AtomicU64::new(config.max_connections as u64)),
321 };
322
323 for _ in 0..config.min_idle_connections {
324 if let Ok(conn) = pool.create_connection().await {
325 let id = pool.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
326 let now = Instant::now();
327 pool.available.insert(id, PoolConnection {
328 db: conn,
329 acquired_at: now,
330 created_at: now,
331 });
332 pool.idle_connections.fetch_add(1, Ordering::SeqCst);
333 pool.total_connections.fetch_add(1, Ordering::SeqCst);
334 }
335 }
336
337 Ok(pool)
338 }
339
340 async fn create_connection(&self) -> RiResult<Arc<dyn RiDatabase>> {
341 match self.config.database_type {
342 #[cfg(feature = "postgres")]
343 crate::database::DatabaseType::Postgres => {
344 let connection_string = self.config.connection_string();
345 let db = crate::database::postgres::PostgresDatabase::new(&connection_string, self.config.clone()).await
346 .map_err(|e| crate::core::RiError::Config(e.to_string()))?;
347 Ok(Arc::new(db) as Arc<dyn RiDatabase>)
348 }
349 #[cfg(feature = "mysql")]
350 crate::database::DatabaseType::MySQL => {
351 let connection_string = self.config.connection_string();
352 let db = crate::database::mysql::MySQLDatabase::new(&connection_string, self.config.clone()).await
353 .map_err(|e| crate::core::RiError::Config(e.to_string()))?;
354 Ok(Arc::new(db) as Arc<dyn RiDatabase>)
355 }
356 #[cfg(feature = "sqlite")]
357 crate::database::DatabaseType::SQLite => {
358 let url = format!("sqlite:{}", self.config.database);
359 let db = tokio::runtime::Handle::current().block_on(
360 crate::database::sqlite::SQLiteDatabase::new(&url, self.config.clone())
361 );
362 match db {
363 Ok(db) => Ok(Arc::new(db) as Arc<dyn RiDatabase>),
364 Err(e) => Err(crate::core::RiError::Config(e.to_string())),
365 }
366 }
367 _ => Err(crate::core::RiError::Config("Unsupported database type".to_string())),
368 }
369 }
370
371 pub fn utilization_rate(&self) -> f64 {
372 let active = self.active_connections.load(Ordering::SeqCst);
373 let total = self.total_connections.load(Ordering::SeqCst);
374
375 if total == 0 {
376 return 0.0;
377 }
378
379 (active as f64) / (total as f64)
380 }
381
382 pub async fn check_and_scale(&self) -> RiResult<()> {
383 let dynamic_config = self.dynamic_config.read().unwrap().clone();
384
385 if !dynamic_config.enable_dynamic_scaling {
386 return Ok(());
387 }
388
389 if self.scaling_in_progress.compare_exchange(
390 false,
391 true,
392 Ordering::SeqCst,
393 Ordering::SeqCst,
394 ).is_err() {
395 return Ok(());
396 }
397
398 let result = self.do_scaling(&dynamic_config).await;
399
400 self.scaling_in_progress.store(false, Ordering::SeqCst);
401
402 result
403 }
404
405 async fn do_scaling(&self, dynamic_config: &DynamicPoolConfig) -> RiResult<()> {
406 let utilization = self.utilization_rate();
407 let total = self.total_connections.load(Ordering::SeqCst) as u32;
408 let _active = self.active_connections.load(Ordering::SeqCst) as u32;
409
410 if utilization > dynamic_config.scale_up_threshold {
411 {
412 let mut tracker = self.low_utilization_tracker.write().unwrap();
413 tracker.update(false);
414 }
415
416 if total < dynamic_config.max_connections {
417 let to_add = std::cmp::min(
418 dynamic_config.scale_up_step,
419 dynamic_config.max_connections - total,
420 );
421
422 for _ in 0..to_add {
423 match self.create_connection().await {
424 Ok(conn) => {
425 let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
426 let now = Instant::now();
427 self.available.insert(id, PoolConnection {
428 db: conn,
429 acquired_at: now,
430 created_at: now,
431 });
432 self.idle_connections.fetch_add(1, Ordering::SeqCst);
433 self.total_connections.fetch_add(1, Ordering::SeqCst);
434 }
435 Err(e) => {
436 self.errors.fetch_add(1, Ordering::SeqCst);
437 return Err(e);
438 }
439 }
440 }
441
442 let new_total = self.total_connections.load(Ordering::SeqCst);
443 self.current_max_connections.store(new_total, Ordering::SeqCst);
444 }
445 } else if utilization < dynamic_config.scale_down_threshold {
446 let should_scale_down = {
447 let mut tracker = self.low_utilization_tracker.write().unwrap();
448 tracker.update(true);
449
450 if let Some(duration) = tracker.duration_below_threshold() {
451 duration >= Duration::from_secs(dynamic_config.scale_down_cooldown_secs)
452 } else {
453 false
454 }
455 };
456
457 if should_scale_down && total > dynamic_config.min_connections {
458 let idle = self.idle_connections.load(Ordering::SeqCst) as u32;
459 let to_remove = std::cmp::min(
460 std::cmp::min(dynamic_config.scale_down_step, idle),
461 total - dynamic_config.min_connections,
462 );
463
464 if to_remove > 0 {
465 self.remove_idle_connections(to_remove).await;
466 }
467 }
468 } else {
469 let mut tracker = self.low_utilization_tracker.write().unwrap();
470 tracker.update(false);
471 }
472
473 Ok(())
474 }
475
476 async fn remove_idle_connections(&self, count: u32) {
477 let mut removed = 0u32;
478 let now = Instant::now();
479
480 let to_remove: Vec<u32> = self.available
481 .iter()
482 .filter(|entry| {
483 let conn = entry.value();
484 now.duration_since(conn.acquired_at) > self.max_idle_time
485 })
486 .take(count as usize)
487 .map(|entry| *entry.key())
488 .collect();
489
490 for id in to_remove {
491 if removed >= count {
492 break;
493 }
494
495 if let Some((_, conn)) = self.available.remove(&id) {
496 let _ = conn.db.close().await;
497 self.idle_connections.fetch_sub(1, Ordering::SeqCst);
498 self.total_connections.fetch_sub(1, Ordering::SeqCst);
499 removed += 1;
500 }
501 }
502
503 if removed < count {
504 let additional_remove: Vec<u32> = self.available
505 .iter()
506 .take((count - removed) as usize)
507 .map(|entry| *entry.key())
508 .collect();
509
510 for id in additional_remove {
511 if removed >= count {
512 break;
513 }
514
515 if let Some((_, conn)) = self.available.remove(&id) {
516 let _ = conn.db.close().await;
517 self.idle_connections.fetch_sub(1, Ordering::SeqCst);
518 self.total_connections.fetch_sub(1, Ordering::SeqCst);
519 removed += 1;
520 }
521 }
522 }
523
524 let new_total = self.total_connections.load(Ordering::SeqCst);
525 self.current_max_connections.store(new_total, Ordering::SeqCst);
526 }
527
528 pub async fn get(&self) -> RiResult<PooledDatabase> {
529 let _permit = self.semaphore.acquire().await.map_err(|e| crate::core::RiError::Config(e.to_string()))?;
530
531 let mut reused_db = None;
532 let mut reused_id = None;
533
534 let now = Instant::now();
535
536 for entry in self.available.iter() {
537 let id = *entry.key();
538 let conn = entry.value();
539 if now.duration_since(conn.acquired_at) > self.max_idle_time || now.duration_since(conn.created_at) > self.max_lifetime {
540 self.available.remove(&id);
541 let _ = conn.db.close().await;
542 self.idle_connections.fetch_sub(1, Ordering::SeqCst);
543 self.total_connections.fetch_sub(1, Ordering::SeqCst);
544 } else {
545 reused_db = Some(conn.db.clone());
546 reused_id = Some(id);
547 self.available.remove(&id);
548 self.idle_connections.fetch_sub(1, Ordering::SeqCst);
549 self.active_connections.fetch_add(1, Ordering::SeqCst);
550 break;
551 }
552 }
553
554 let (db, id) = if let Some((existing_db, existing_id)) = reused_db.zip(reused_id) {
555 (existing_db, existing_id)
556 } else {
557 match self.create_connection().await {
558 Ok(new_conn) => {
559 let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
560 self.total_connections.fetch_add(1, Ordering::SeqCst);
561 self.active_connections.fetch_add(1, Ordering::SeqCst);
562 (new_conn, id)
563 }
564 Err(e) => {
565 self.errors.fetch_add(1, Ordering::SeqCst);
566 return Err(e);
567 }
568 }
569 };
570
571 let _ = self.check_and_scale().await;
572
573 Ok(PooledDatabase::new(id, db, Arc::new(self.clone())))
574 }
575
576 pub async fn release(&self, db: PooledDatabase) {
577 self.active_connections.fetch_sub(1, Ordering::SeqCst);
578 self.idle_connections.fetch_add(1, Ordering::SeqCst);
579
580 self.available.insert(db.id(), PoolConnection {
581 db: db.inner.clone(),
582 acquired_at: Instant::now(),
583 created_at: Instant::now(),
584 });
585
586 let _ = self.check_and_scale().await;
587 }
588
589 pub async fn close(&self) -> RiResult<()> {
590 self.semaphore.close();
591 for entry in self.connections.iter() {
592 let _ = entry.value().db.close().await;
593 }
594 self.connections.clear();
595 self.available.clear();
596 Ok(())
597 }
598
599 pub fn metrics(&self) -> DatabaseMetrics {
600 let active = self.active_connections.load(Ordering::SeqCst);
601 let total = self.total_connections.load(Ordering::SeqCst);
602
603 DatabaseMetrics {
604 active_connections: active,
605 idle_connections: self.idle_connections.load(Ordering::SeqCst),
606 total_connections: total,
607 queries_executed: self.queries_executed.load(Ordering::SeqCst),
608 query_duration_ms: 0.0,
609 errors: self.errors.load(Ordering::SeqCst),
610 utilization_rate: if total > 0 { (active as f64) / (total as f64) } else { 0.0 },
611 }
612 }
613
614 pub fn get_dynamic_config(&self) -> DynamicPoolConfig {
615 self.dynamic_config.read().unwrap().clone()
616 }
617
618 pub fn set_dynamic_config(&self, config: DynamicPoolConfig) {
619 let mut current = self.dynamic_config.write().unwrap();
620 *current = config;
621 }
622
623 pub fn update_dynamic_config<F>(&self, f: F)
624 where
625 F: FnOnce(&mut DynamicPoolConfig),
626 {
627 let mut config = self.dynamic_config.write().unwrap();
628 f(&mut config);
629 }
630
631 pub fn is_scaling_in_progress(&self) -> bool {
632 self.scaling_in_progress.load(Ordering::SeqCst)
633 }
634
635 pub fn get_current_max_connections(&self) -> u32 {
636 self.current_max_connections.load(Ordering::SeqCst) as u32
637 }
638
639 pub async fn force_scale_up(&self, count: u32) -> RiResult<()> {
640 let dynamic_config = self.dynamic_config.read().unwrap().clone();
641 let total = self.total_connections.load(Ordering::SeqCst) as u32;
642
643 let to_add = std::cmp::min(count, dynamic_config.max_connections.saturating_sub(total));
644
645 for _ in 0..to_add {
646 match self.create_connection().await {
647 Ok(conn) => {
648 let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
649 let now = Instant::now();
650 self.available.insert(id, PoolConnection {
651 db: conn,
652 acquired_at: now,
653 created_at: now,
654 });
655 self.idle_connections.fetch_add(1, Ordering::SeqCst);
656 self.total_connections.fetch_add(1, Ordering::SeqCst);
657 }
658 Err(e) => {
659 self.errors.fetch_add(1, Ordering::SeqCst);
660 return Err(e);
661 }
662 }
663 }
664
665 let new_total = self.total_connections.load(Ordering::SeqCst);
666 self.current_max_connections.store(new_total, Ordering::SeqCst);
667
668 Ok(())
669 }
670
671 pub async fn force_scale_down(&self, count: u32) -> RiResult<()> {
672 let dynamic_config = self.dynamic_config.read().unwrap().clone();
673 let total = self.total_connections.load(Ordering::SeqCst) as u32;
674
675 let to_remove = std::cmp::min(
676 count,
677 total.saturating_sub(dynamic_config.min_connections)
678 );
679
680 if to_remove > 0 {
681 self.remove_idle_connections(to_remove).await;
682 }
683
684 Ok(())
685 }
686}
687
688#[cfg(feature = "pyo3")]
689#[pyo3::prelude::pymethods]
690impl RiDatabasePool {
691 #[new]
692 fn py_new(config: RiDatabaseConfig) -> Self {
693 let dynamic_config = DynamicPoolConfig {
694 enable_dynamic_scaling: true,
695 scale_up_threshold: 0.8,
696 scale_down_threshold: 0.3,
697 scale_down_cooldown_secs: 300,
698 min_connections: config.min_idle_connections,
699 max_connections: config.max_connections,
700 scale_up_step: 2,
701 scale_down_step: 1,
702 };
703
704 Self {
705 config: config.clone(),
706 connections: Arc::new(DashMap::new()),
707 available: Arc::new(DashMap::new()),
708 connection_ids: Arc::new(AtomicU64::new(0)),
709 semaphore: Arc::new(Semaphore::new(config.max_connections as usize)),
710 max_idle_time: Duration::from_secs(config.idle_timeout_secs),
711 max_lifetime: Duration::from_secs(config.max_lifetime_secs),
712 idle_connections: Arc::new(AtomicU64::new(0)),
713 active_connections: Arc::new(AtomicU64::new(0)),
714 total_connections: Arc::new(AtomicU64::new(0)),
715 queries_executed: Arc::new(AtomicU64::new(0)),
716 errors: Arc::new(AtomicU64::new(0)),
717 dynamic_config: Arc::new(RwLock::new(dynamic_config)),
718 low_utilization_tracker: Arc::new(RwLock::new(LowUtilizationTracker::new())),
719 scaling_in_progress: Arc::new(AtomicBool::new(false)),
720 current_max_connections: Arc::new(AtomicU64::new(config.max_connections as u64)),
721 }
722 }
723
724 fn status(&self) -> String {
725 format!(
726 "Pool status - Active: {}, Idle: {}, Total: {}, Queries: {}, Errors: {}, Utilization: {:.2}%",
727 self.active_connections.load(Ordering::SeqCst),
728 self.idle_connections.load(Ordering::SeqCst),
729 self.total_connections.load(Ordering::SeqCst),
730 self.queries_executed.load(Ordering::SeqCst),
731 self.errors.load(Ordering::SeqCst),
732 self.utilization_rate() * 100.0
733 )
734 }
735
736 fn get_config(&self) -> RiDatabaseConfig {
737 self.config.clone()
738 }
739
740 fn get_utilization_rate(&self) -> f64 {
741 self.utilization_rate()
742 }
743
744 fn get_metrics(&self) -> DatabaseMetrics {
745 self.metrics()
746 }
747
748 fn get_dynamic_pool_config(&self) -> DynamicPoolConfig {
749 self.get_dynamic_config()
750 }
751
752 fn set_dynamic_pool_config(&self, config: DynamicPoolConfig) {
753 self.set_dynamic_config(config);
754 }
755
756 fn set_enable_dynamic_scaling(&self, enable: bool) {
757 self.update_dynamic_config(|c| c.enable_dynamic_scaling = enable);
758 }
759
760 fn set_scale_up_threshold(&self, threshold: f64) {
761 self.update_dynamic_config(|c| c.scale_up_threshold = threshold);
762 }
763
764 fn set_scale_down_threshold(&self, threshold: f64) {
765 self.update_dynamic_config(|c| c.scale_down_threshold = threshold);
766 }
767
768 fn set_scale_down_cooldown_secs(&self, secs: u64) {
769 self.update_dynamic_config(|c| c.scale_down_cooldown_secs = secs);
770 }
771
772 fn set_min_connections(&self, min: u32) {
773 self.update_dynamic_config(|c| c.min_connections = min);
774 }
775
776 fn set_max_connections(&self, max: u32) {
777 self.update_dynamic_config(|c| c.max_connections = max);
778 }
779
780 fn set_scale_up_step(&self, step: u32) {
781 self.update_dynamic_config(|c| c.scale_up_step = step);
782 }
783
784 fn set_scale_down_step(&self, step: u32) {
785 self.update_dynamic_config(|c| c.scale_down_step = step);
786 }
787
788 fn is_scaling(&self) -> bool {
789 self.is_scaling_in_progress()
790 }
791
792 fn get_current_pool_size(&self) -> u32 {
793 self.get_current_max_connections()
794 }
795}
796
797impl Clone for RiDatabasePool {
798 fn clone(&self) -> Self {
799 Self {
800 config: self.config.clone(),
801 connections: self.connections.clone(),
802 available: self.available.clone(),
803 connection_ids: self.connection_ids.clone(),
804 semaphore: self.semaphore.clone(),
805 max_idle_time: self.max_idle_time,
806 max_lifetime: self.max_lifetime,
807 idle_connections: self.idle_connections.clone(),
808 active_connections: self.active_connections.clone(),
809 total_connections: self.total_connections.clone(),
810 queries_executed: self.queries_executed.clone(),
811 errors: self.errors.clone(),
812 dynamic_config: self.dynamic_config.clone(),
813 low_utilization_tracker: self.low_utilization_tracker.clone(),
814 scaling_in_progress: self.scaling_in_progress.clone(),
815 current_max_connections: self.current_max_connections.clone(),
816 }
817 }
818}