Skip to main content

ri/database/
pool.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of Ri.
4//! The Ri project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18use crate::core::RiResult;
19use crate::database::{RiDatabase, RiDatabaseConfig, RiDBResult, RiDBRow};
20use dashmap::DashMap;
21use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
22use std::sync::Arc;
23use tokio::sync::Semaphore;
24use tokio::time::{Duration, Instant};
25use std::sync::RwLock;
26
27#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
28#[derive(Debug, Clone, Default)]
29pub struct DatabaseMetrics {
30    pub active_connections: u64,
31    pub idle_connections: u64,
32    pub total_connections: u64,
33    pub queries_executed: u64,
34    pub query_duration_ms: f64,
35    pub errors: u64,
36    pub utilization_rate: f64,
37}
38
39#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
40#[derive(Debug, Clone)]
41pub struct DynamicPoolConfig {
42    pub enable_dynamic_scaling: bool,
43    pub scale_up_threshold: f64,
44    pub scale_down_threshold: f64,
45    pub scale_down_cooldown_secs: u64,
46    pub min_connections: u32,
47    pub max_connections: u32,
48    pub scale_up_step: u32,
49    pub scale_down_step: u32,
50}
51
52impl Default for DynamicPoolConfig {
53    fn default() -> Self {
54        Self {
55            enable_dynamic_scaling: true,
56            scale_up_threshold: 0.8,
57            scale_down_threshold: 0.3,
58            scale_down_cooldown_secs: 300,
59            min_connections: 2,
60            max_connections: 50,
61            scale_up_step: 2,
62            scale_down_step: 1,
63        }
64    }
65}
66
67#[cfg(feature = "pyo3")]
68#[pyo3::prelude::pymethods]
69impl DynamicPoolConfig {
70    #[new]
71    fn py_new() -> Self {
72        Self::default()
73    }
74
75    #[staticmethod]
76    fn create() -> Self {
77        Self::default()
78    }
79
80    fn get_enable_dynamic_scaling(&self) -> bool {
81        self.enable_dynamic_scaling
82    }
83
84    fn set_enable_dynamic_scaling(&mut self, value: bool) {
85        self.enable_dynamic_scaling = value;
86    }
87
88    fn get_scale_up_threshold(&self) -> f64 {
89        self.scale_up_threshold
90    }
91
92    fn set_scale_up_threshold(&mut self, value: f64) {
93        self.scale_up_threshold = value;
94    }
95
96    fn get_scale_down_threshold(&self) -> f64 {
97        self.scale_down_threshold
98    }
99
100    fn set_scale_down_threshold(&mut self, value: f64) {
101        self.scale_down_threshold = value;
102    }
103
104    fn get_scale_down_cooldown_secs(&self) -> u64 {
105        self.scale_down_cooldown_secs
106    }
107
108    fn set_scale_down_cooldown_secs(&mut self, value: u64) {
109        self.scale_down_cooldown_secs = value;
110    }
111
112    fn get_min_connections(&self) -> u32 {
113        self.min_connections
114    }
115
116    fn set_min_connections(&mut self, value: u32) {
117        self.min_connections = value;
118    }
119
120    fn get_max_connections(&self) -> u32 {
121        self.max_connections
122    }
123
124    fn set_max_connections(&mut self, value: u32) {
125        self.max_connections = value;
126    }
127
128    fn get_scale_up_step(&self) -> u32 {
129        self.scale_up_step
130    }
131
132    fn set_scale_up_step(&mut self, value: u32) {
133        self.scale_up_step = value;
134    }
135
136    fn get_scale_down_step(&self) -> u32 {
137        self.scale_down_step
138    }
139
140    fn set_scale_down_step(&mut self, value: u32) {
141        self.scale_down_step = value;
142    }
143}
144
145#[derive(Clone)]
146pub struct PooledDatabase {
147    id: u32,
148    inner: Arc<dyn RiDatabase>,
149    pool: Arc<RiDatabasePool>,
150}
151
152impl PooledDatabase {
153    pub fn new(id: u32, inner: Arc<dyn RiDatabase>, pool: Arc<RiDatabasePool>) -> Self {
154        Self { id, inner, pool }
155    }
156
157    pub fn id(&self) -> u32 {
158        self.id
159    }
160
161    pub async fn execute(&self, sql: &str) -> RiResult<u64> {
162        self.inner.execute(sql).await
163    }
164
165    pub async fn query(&self, sql: &str) -> RiResult<RiDBResult> {
166        self.inner.query(sql).await
167    }
168
169    pub async fn query_one(&self, sql: &str) -> RiResult<Option<RiDBRow>> {
170        self.inner.query_one(sql).await
171    }
172
173    pub async fn ping(&self) -> RiResult<bool> {
174        self.inner.ping().await
175    }
176
177    pub fn is_connected(&self) -> bool {
178        self.inner.is_connected()
179    }
180
181    pub fn pool_metrics(&self) -> DatabaseMetrics {
182        self.pool.metrics()
183    }
184}
185
186#[async_trait::async_trait]
187impl RiDatabase for PooledDatabase {
188    fn database_type(&self) -> crate::database::DatabaseType {
189        self.inner.database_type()
190    }
191
192    async fn execute(&self, sql: &str) -> RiResult<u64> {
193        self.inner.execute(sql).await
194    }
195
196    async fn query(&self, sql: &str) -> RiResult<RiDBResult> {
197        self.inner.query(sql).await
198    }
199
200    async fn query_one(&self, sql: &str) -> RiResult<Option<RiDBRow>> {
201        self.inner.query_one(sql).await
202    }
203
204    async fn ping(&self) -> RiResult<bool> {
205        self.inner.ping().await
206    }
207
208    fn is_connected(&self) -> bool {
209        self.inner.is_connected()
210    }
211
212    async fn close(&self) -> RiResult<()> {
213        self.pool.close().await
214    }
215
216    async fn batch_execute(&self, sql: &str, params: &[Vec<serde_json::Value>]) -> RiResult<Vec<u64>> {
217        self.inner.batch_execute(sql, params).await
218    }
219
220    async fn batch_query(&self, sql: &str, params: &[Vec<serde_json::Value>]) -> RiResult<Vec<RiDBResult>> {
221        self.inner.batch_query(sql, params).await
222    }
223
224    async fn execute_with_params(&self, sql: &str, params: &[serde_json::Value]) -> RiResult<u64> {
225        self.inner.execute_with_params(sql, params).await
226    }
227
228    async fn query_with_params(&self, sql: &str, params: &[serde_json::Value]) -> RiResult<RiDBResult> {
229        self.inner.query_with_params(sql, params).await
230    }
231
232    async fn transaction(&self) -> RiResult<Box<dyn crate::database::RiDatabaseTransaction>> {
233        self.inner.transaction().await
234    }
235}
236
237struct PoolConnection {
238    db: Arc<dyn RiDatabase>,
239    acquired_at: Instant,
240    created_at: Instant,
241}
242
243struct LowUtilizationTracker {
244    below_threshold_since: Option<Instant>,
245    was_below_threshold: bool,
246}
247
248impl LowUtilizationTracker {
249    fn new() -> Self {
250        Self {
251            below_threshold_since: None,
252            was_below_threshold: false,
253        }
254    }
255
256    fn update(&mut self, is_below_threshold: bool) {
257        if is_below_threshold && !self.was_below_threshold {
258            self.below_threshold_since = Some(Instant::now());
259            self.was_below_threshold = true;
260        } else if !is_below_threshold {
261            self.below_threshold_since = None;
262            self.was_below_threshold = false;
263        }
264    }
265
266    fn duration_below_threshold(&self) -> Option<Duration> {
267        self.below_threshold_since.map(|since| since.elapsed())
268    }
269}
270
271#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
272pub struct RiDatabasePool {
273    config: RiDatabaseConfig,
274    connections: Arc<DashMap<u32, PoolConnection>>,
275    available: Arc<DashMap<u32, PoolConnection>>,
276    connection_ids: Arc<AtomicU64>,
277    semaphore: Arc<Semaphore>,
278    max_idle_time: Duration,
279    max_lifetime: Duration,
280    idle_connections: Arc<AtomicU64>,
281    active_connections: Arc<AtomicU64>,
282    total_connections: Arc<AtomicU64>,
283    queries_executed: Arc<AtomicU64>,
284    errors: Arc<AtomicU64>,
285    dynamic_config: Arc<RwLock<DynamicPoolConfig>>,
286    low_utilization_tracker: Arc<RwLock<LowUtilizationTracker>>,
287    scaling_in_progress: Arc<AtomicBool>,
288    current_max_connections: Arc<AtomicU64>,
289}
290
291impl RiDatabasePool {
292    pub async fn new(config: RiDatabaseConfig) -> RiResult<Self> {
293        let dynamic_config = DynamicPoolConfig {
294            enable_dynamic_scaling: true,
295            scale_up_threshold: 0.8,
296            scale_down_threshold: 0.3,
297            scale_down_cooldown_secs: 300,
298            min_connections: config.min_idle_connections,
299            max_connections: config.max_connections,
300            scale_up_step: 2,
301            scale_down_step: 1,
302        };
303
304        let pool = Self {
305            config: config.clone(),
306            connections: Arc::new(DashMap::new()),
307            available: Arc::new(DashMap::new()),
308            connection_ids: Arc::new(AtomicU64::new(0)),
309            semaphore: Arc::new(Semaphore::new(config.max_connections as usize)),
310            max_idle_time: Duration::from_secs(config.idle_timeout_secs),
311            max_lifetime: Duration::from_secs(config.max_lifetime_secs),
312            idle_connections: Arc::new(AtomicU64::new(0)),
313            active_connections: Arc::new(AtomicU64::new(0)),
314            total_connections: Arc::new(AtomicU64::new(0)),
315            queries_executed: Arc::new(AtomicU64::new(0)),
316            errors: Arc::new(AtomicU64::new(0)),
317            dynamic_config: Arc::new(RwLock::new(dynamic_config)),
318            low_utilization_tracker: Arc::new(RwLock::new(LowUtilizationTracker::new())),
319            scaling_in_progress: Arc::new(AtomicBool::new(false)),
320            current_max_connections: Arc::new(AtomicU64::new(config.max_connections as u64)),
321        };
322
323        for _ in 0..config.min_idle_connections {
324            if let Ok(conn) = pool.create_connection().await {
325                let id = pool.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
326                let now = Instant::now();
327                pool.available.insert(id, PoolConnection { 
328                    db: conn, 
329                    acquired_at: now,
330                    created_at: now,
331                });
332                pool.idle_connections.fetch_add(1, Ordering::SeqCst);
333                pool.total_connections.fetch_add(1, Ordering::SeqCst);
334            }
335        }
336
337        Ok(pool)
338    }
339
340    async fn create_connection(&self) -> RiResult<Arc<dyn RiDatabase>> {
341        match self.config.database_type {
342            #[cfg(feature = "postgres")]
343            crate::database::DatabaseType::Postgres => {
344                let connection_string = self.config.connection_string();
345                let db = crate::database::postgres::PostgresDatabase::new(&connection_string, self.config.clone()).await
346                    .map_err(|e| crate::core::RiError::Config(e.to_string()))?;
347                Ok(Arc::new(db) as Arc<dyn RiDatabase>)
348            }
349            #[cfg(feature = "mysql")]
350            crate::database::DatabaseType::MySQL => {
351                let connection_string = self.config.connection_string();
352                let db = crate::database::mysql::MySQLDatabase::new(&connection_string, self.config.clone()).await
353                    .map_err(|e| crate::core::RiError::Config(e.to_string()))?;
354                Ok(Arc::new(db) as Arc<dyn RiDatabase>)
355            }
356            #[cfg(feature = "sqlite")]
357            crate::database::DatabaseType::SQLite => {
358                let url = format!("sqlite:{}", self.config.database);
359                let db = tokio::runtime::Handle::current().block_on(
360                    crate::database::sqlite::SQLiteDatabase::new(&url, self.config.clone())
361                );
362                match db {
363                    Ok(db) => Ok(Arc::new(db) as Arc<dyn RiDatabase>),
364                    Err(e) => Err(crate::core::RiError::Config(e.to_string())),
365                }
366            }
367            _ => Err(crate::core::RiError::Config("Unsupported database type".to_string())),
368        }
369    }
370
371    pub fn utilization_rate(&self) -> f64 {
372        let active = self.active_connections.load(Ordering::SeqCst);
373        let total = self.total_connections.load(Ordering::SeqCst);
374        
375        if total == 0 {
376            return 0.0;
377        }
378        
379        (active as f64) / (total as f64)
380    }
381
382    pub async fn check_and_scale(&self) -> RiResult<()> {
383        let dynamic_config = self.dynamic_config.read().unwrap().clone();
384        
385        if !dynamic_config.enable_dynamic_scaling {
386            return Ok(());
387        }
388
389        if self.scaling_in_progress.compare_exchange(
390            false,
391            true,
392            Ordering::SeqCst,
393            Ordering::SeqCst,
394        ).is_err() {
395            return Ok(());
396        }
397
398        let result = self.do_scaling(&dynamic_config).await;
399        
400        self.scaling_in_progress.store(false, Ordering::SeqCst);
401        
402        result
403    }
404
405    async fn do_scaling(&self, dynamic_config: &DynamicPoolConfig) -> RiResult<()> {
406        let utilization = self.utilization_rate();
407        let total = self.total_connections.load(Ordering::SeqCst) as u32;
408        let _active = self.active_connections.load(Ordering::SeqCst) as u32;
409        
410        if utilization > dynamic_config.scale_up_threshold {
411            {
412                let mut tracker = self.low_utilization_tracker.write().unwrap();
413                tracker.update(false);
414            }
415            
416            if total < dynamic_config.max_connections {
417                let to_add = std::cmp::min(
418                    dynamic_config.scale_up_step,
419                    dynamic_config.max_connections - total,
420                );
421                
422                for _ in 0..to_add {
423                    match self.create_connection().await {
424                        Ok(conn) => {
425                            let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
426                            let now = Instant::now();
427                            self.available.insert(id, PoolConnection { 
428                                db: conn, 
429                                acquired_at: now,
430                                created_at: now,
431                            });
432                            self.idle_connections.fetch_add(1, Ordering::SeqCst);
433                            self.total_connections.fetch_add(1, Ordering::SeqCst);
434                        }
435                        Err(e) => {
436                            self.errors.fetch_add(1, Ordering::SeqCst);
437                            return Err(e);
438                        }
439                    }
440                }
441                
442                let new_total = self.total_connections.load(Ordering::SeqCst);
443                self.current_max_connections.store(new_total, Ordering::SeqCst);
444            }
445        } else if utilization < dynamic_config.scale_down_threshold {
446            let should_scale_down = {
447                let mut tracker = self.low_utilization_tracker.write().unwrap();
448                tracker.update(true);
449                
450                if let Some(duration) = tracker.duration_below_threshold() {
451                    duration >= Duration::from_secs(dynamic_config.scale_down_cooldown_secs)
452                } else {
453                    false
454                }
455            };
456            
457            if should_scale_down && total > dynamic_config.min_connections {
458                let idle = self.idle_connections.load(Ordering::SeqCst) as u32;
459                let to_remove = std::cmp::min(
460                    std::cmp::min(dynamic_config.scale_down_step, idle),
461                    total - dynamic_config.min_connections,
462                );
463                
464                if to_remove > 0 {
465                    self.remove_idle_connections(to_remove).await;
466                }
467            }
468        } else {
469            let mut tracker = self.low_utilization_tracker.write().unwrap();
470            tracker.update(false);
471        }
472        
473        Ok(())
474    }
475
476    async fn remove_idle_connections(&self, count: u32) {
477        let mut removed = 0u32;
478        let now = Instant::now();
479        
480        let to_remove: Vec<u32> = self.available
481            .iter()
482            .filter(|entry| {
483                let conn = entry.value();
484                now.duration_since(conn.acquired_at) > self.max_idle_time
485            })
486            .take(count as usize)
487            .map(|entry| *entry.key())
488            .collect();
489        
490        for id in to_remove {
491            if removed >= count {
492                break;
493            }
494            
495            if let Some((_, conn)) = self.available.remove(&id) {
496                let _ = conn.db.close().await;
497                self.idle_connections.fetch_sub(1, Ordering::SeqCst);
498                self.total_connections.fetch_sub(1, Ordering::SeqCst);
499                removed += 1;
500            }
501        }
502        
503        if removed < count {
504            let additional_remove: Vec<u32> = self.available
505                .iter()
506                .take((count - removed) as usize)
507                .map(|entry| *entry.key())
508                .collect();
509            
510            for id in additional_remove {
511                if removed >= count {
512                    break;
513                }
514                
515                if let Some((_, conn)) = self.available.remove(&id) {
516                    let _ = conn.db.close().await;
517                    self.idle_connections.fetch_sub(1, Ordering::SeqCst);
518                    self.total_connections.fetch_sub(1, Ordering::SeqCst);
519                    removed += 1;
520                }
521            }
522        }
523        
524        let new_total = self.total_connections.load(Ordering::SeqCst);
525        self.current_max_connections.store(new_total, Ordering::SeqCst);
526    }
527
528    pub async fn get(&self) -> RiResult<PooledDatabase> {
529        let _permit = self.semaphore.acquire().await.map_err(|e| crate::core::RiError::Config(e.to_string()))?;
530
531        let mut reused_db = None;
532        let mut reused_id = None;
533
534        let now = Instant::now();
535        
536        for entry in self.available.iter() {
537            let id = *entry.key();
538            let conn = entry.value();
539            if now.duration_since(conn.acquired_at) > self.max_idle_time || now.duration_since(conn.created_at) > self.max_lifetime {
540                self.available.remove(&id);
541                let _ = conn.db.close().await;
542                self.idle_connections.fetch_sub(1, Ordering::SeqCst);
543                self.total_connections.fetch_sub(1, Ordering::SeqCst);
544            } else {
545                reused_db = Some(conn.db.clone());
546                reused_id = Some(id);
547                self.available.remove(&id);
548                self.idle_connections.fetch_sub(1, Ordering::SeqCst);
549                self.active_connections.fetch_add(1, Ordering::SeqCst);
550                break;
551            }
552        }
553
554        let (db, id) = if let Some((existing_db, existing_id)) = reused_db.zip(reused_id) {
555            (existing_db, existing_id)
556        } else {
557            match self.create_connection().await {
558                Ok(new_conn) => {
559                    let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
560                    self.total_connections.fetch_add(1, Ordering::SeqCst);
561                    self.active_connections.fetch_add(1, Ordering::SeqCst);
562                    (new_conn, id)
563                }
564                Err(e) => {
565                    self.errors.fetch_add(1, Ordering::SeqCst);
566                    return Err(e);
567                }
568            }
569        };
570
571        let _ = self.check_and_scale().await;
572
573        Ok(PooledDatabase::new(id, db, Arc::new(self.clone())))
574    }
575
576    pub async fn release(&self, db: PooledDatabase) {
577        self.active_connections.fetch_sub(1, Ordering::SeqCst);
578        self.idle_connections.fetch_add(1, Ordering::SeqCst);
579        
580        self.available.insert(db.id(), PoolConnection { 
581            db: db.inner.clone(),
582            acquired_at: Instant::now(),
583            created_at: Instant::now(),
584        });
585
586        let _ = self.check_and_scale().await;
587    }
588
589    pub async fn close(&self) -> RiResult<()> {
590        self.semaphore.close();
591        for entry in self.connections.iter() {
592            let _ = entry.value().db.close().await;
593        }
594        self.connections.clear();
595        self.available.clear();
596        Ok(())
597    }
598
599    pub fn metrics(&self) -> DatabaseMetrics {
600        let active = self.active_connections.load(Ordering::SeqCst);
601        let total = self.total_connections.load(Ordering::SeqCst);
602        
603        DatabaseMetrics {
604            active_connections: active,
605            idle_connections: self.idle_connections.load(Ordering::SeqCst),
606            total_connections: total,
607            queries_executed: self.queries_executed.load(Ordering::SeqCst),
608            query_duration_ms: 0.0,
609            errors: self.errors.load(Ordering::SeqCst),
610            utilization_rate: if total > 0 { (active as f64) / (total as f64) } else { 0.0 },
611        }
612    }
613
614    pub fn get_dynamic_config(&self) -> DynamicPoolConfig {
615        self.dynamic_config.read().unwrap().clone()
616    }
617
618    pub fn set_dynamic_config(&self, config: DynamicPoolConfig) {
619        let mut current = self.dynamic_config.write().unwrap();
620        *current = config;
621    }
622
623    pub fn update_dynamic_config<F>(&self, f: F) 
624    where
625        F: FnOnce(&mut DynamicPoolConfig),
626    {
627        let mut config = self.dynamic_config.write().unwrap();
628        f(&mut config);
629    }
630
631    pub fn is_scaling_in_progress(&self) -> bool {
632        self.scaling_in_progress.load(Ordering::SeqCst)
633    }
634
635    pub fn get_current_max_connections(&self) -> u32 {
636        self.current_max_connections.load(Ordering::SeqCst) as u32
637    }
638
639    pub async fn force_scale_up(&self, count: u32) -> RiResult<()> {
640        let dynamic_config = self.dynamic_config.read().unwrap().clone();
641        let total = self.total_connections.load(Ordering::SeqCst) as u32;
642        
643        let to_add = std::cmp::min(count, dynamic_config.max_connections.saturating_sub(total));
644        
645        for _ in 0..to_add {
646            match self.create_connection().await {
647                Ok(conn) => {
648                    let id = self.connection_ids.fetch_add(1, Ordering::SeqCst) as u32;
649                    let now = Instant::now();
650                    self.available.insert(id, PoolConnection { 
651                        db: conn, 
652                        acquired_at: now,
653                        created_at: now,
654                    });
655                    self.idle_connections.fetch_add(1, Ordering::SeqCst);
656                    self.total_connections.fetch_add(1, Ordering::SeqCst);
657                }
658                Err(e) => {
659                    self.errors.fetch_add(1, Ordering::SeqCst);
660                    return Err(e);
661                }
662            }
663        }
664        
665        let new_total = self.total_connections.load(Ordering::SeqCst);
666        self.current_max_connections.store(new_total, Ordering::SeqCst);
667        
668        Ok(())
669    }
670
671    pub async fn force_scale_down(&self, count: u32) -> RiResult<()> {
672        let dynamic_config = self.dynamic_config.read().unwrap().clone();
673        let total = self.total_connections.load(Ordering::SeqCst) as u32;
674        
675        let to_remove = std::cmp::min(
676            count,
677            total.saturating_sub(dynamic_config.min_connections)
678        );
679        
680        if to_remove > 0 {
681            self.remove_idle_connections(to_remove).await;
682        }
683        
684        Ok(())
685    }
686}
687
688#[cfg(feature = "pyo3")]
689#[pyo3::prelude::pymethods]
690impl RiDatabasePool {
691    #[new]
692    fn py_new(config: RiDatabaseConfig) -> Self {
693        let dynamic_config = DynamicPoolConfig {
694            enable_dynamic_scaling: true,
695            scale_up_threshold: 0.8,
696            scale_down_threshold: 0.3,
697            scale_down_cooldown_secs: 300,
698            min_connections: config.min_idle_connections,
699            max_connections: config.max_connections,
700            scale_up_step: 2,
701            scale_down_step: 1,
702        };
703
704        Self {
705            config: config.clone(),
706            connections: Arc::new(DashMap::new()),
707            available: Arc::new(DashMap::new()),
708            connection_ids: Arc::new(AtomicU64::new(0)),
709            semaphore: Arc::new(Semaphore::new(config.max_connections as usize)),
710            max_idle_time: Duration::from_secs(config.idle_timeout_secs),
711            max_lifetime: Duration::from_secs(config.max_lifetime_secs),
712            idle_connections: Arc::new(AtomicU64::new(0)),
713            active_connections: Arc::new(AtomicU64::new(0)),
714            total_connections: Arc::new(AtomicU64::new(0)),
715            queries_executed: Arc::new(AtomicU64::new(0)),
716            errors: Arc::new(AtomicU64::new(0)),
717            dynamic_config: Arc::new(RwLock::new(dynamic_config)),
718            low_utilization_tracker: Arc::new(RwLock::new(LowUtilizationTracker::new())),
719            scaling_in_progress: Arc::new(AtomicBool::new(false)),
720            current_max_connections: Arc::new(AtomicU64::new(config.max_connections as u64)),
721        }
722    }
723
724    fn status(&self) -> String {
725        format!(
726            "Pool status - Active: {}, Idle: {}, Total: {}, Queries: {}, Errors: {}, Utilization: {:.2}%",
727            self.active_connections.load(Ordering::SeqCst),
728            self.idle_connections.load(Ordering::SeqCst),
729            self.total_connections.load(Ordering::SeqCst),
730            self.queries_executed.load(Ordering::SeqCst),
731            self.errors.load(Ordering::SeqCst),
732            self.utilization_rate() * 100.0
733        )
734    }
735
736    fn get_config(&self) -> RiDatabaseConfig {
737        self.config.clone()
738    }
739
740    fn get_utilization_rate(&self) -> f64 {
741        self.utilization_rate()
742    }
743
744    fn get_metrics(&self) -> DatabaseMetrics {
745        self.metrics()
746    }
747
748    fn get_dynamic_pool_config(&self) -> DynamicPoolConfig {
749        self.get_dynamic_config()
750    }
751
752    fn set_dynamic_pool_config(&self, config: DynamicPoolConfig) {
753        self.set_dynamic_config(config);
754    }
755
756    fn set_enable_dynamic_scaling(&self, enable: bool) {
757        self.update_dynamic_config(|c| c.enable_dynamic_scaling = enable);
758    }
759
760    fn set_scale_up_threshold(&self, threshold: f64) {
761        self.update_dynamic_config(|c| c.scale_up_threshold = threshold);
762    }
763
764    fn set_scale_down_threshold(&self, threshold: f64) {
765        self.update_dynamic_config(|c| c.scale_down_threshold = threshold);
766    }
767
768    fn set_scale_down_cooldown_secs(&self, secs: u64) {
769        self.update_dynamic_config(|c| c.scale_down_cooldown_secs = secs);
770    }
771
772    fn set_min_connections(&self, min: u32) {
773        self.update_dynamic_config(|c| c.min_connections = min);
774    }
775
776    fn set_max_connections(&self, max: u32) {
777        self.update_dynamic_config(|c| c.max_connections = max);
778    }
779
780    fn set_scale_up_step(&self, step: u32) {
781        self.update_dynamic_config(|c| c.scale_up_step = step);
782    }
783
784    fn set_scale_down_step(&self, step: u32) {
785        self.update_dynamic_config(|c| c.scale_down_step = step);
786    }
787
788    fn is_scaling(&self) -> bool {
789        self.is_scaling_in_progress()
790    }
791
792    fn get_current_pool_size(&self) -> u32 {
793        self.get_current_max_connections()
794    }
795}
796
797impl Clone for RiDatabasePool {
798    fn clone(&self) -> Self {
799        Self {
800            config: self.config.clone(),
801            connections: self.connections.clone(),
802            available: self.available.clone(),
803            connection_ids: self.connection_ids.clone(),
804            semaphore: self.semaphore.clone(),
805            max_idle_time: self.max_idle_time,
806            max_lifetime: self.max_lifetime,
807            idle_connections: self.idle_connections.clone(),
808            active_connections: self.active_connections.clone(),
809            total_connections: self.total_connections.clone(),
810            queries_executed: self.queries_executed.clone(),
811            errors: self.errors.clone(),
812            dynamic_config: self.dynamic_config.clone(),
813            low_utilization_tracker: self.low_utilization_tracker.clone(),
814            scaling_in_progress: self.scaling_in_progress.clone(),
815            current_max_connections: self.current_max_connections.clone(),
816        }
817    }
818}