Skip to main content

ri/gateway/
mod.rs

1//! Copyright © 2025-2026 Wenze Wei. All Rights Reserved.
2//!
3//! This file is part of Ri.
4//! The Ri project belongs to the Dunimd Team.
5//!
6//! Licensed under the Apache License, Version 2.0 (the "License");
7//! You may not use this file except in compliance with the License.
8//! You may obtain a copy of the License at
9//!
10//!     http://www.apache.org/licenses/LICENSE-2.0
11//!
12//! Unless required by applicable law or agreed to in writing, software
13//! distributed under the License is distributed on an "AS IS" BASIS,
14//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//! See the License for the specific language governing permissions and
16//! limitations under the License.
17
18#![allow(non_snake_case)]
19
20//! # Gateway Module
21//! 
22//! This module provides a comprehensive API gateway functionality for Ri, offering routing, middleware support,
23//! load balancing, rate limiting, and circuit breaking capabilities.
24//! 
25//! ## Key Components
26//! 
27//! - **RiGateway**: Main gateway struct implementing the RiModule trait
28//! - **RiGatewayConfig**: Configuration for gateway behavior
29//! - **RiGatewayRequest**: Request structure for gateway operations
30//! - **RiGatewayResponse**: Response structure for gateway operations
31//! - **RiRoute**: Route definition for API endpoints
32//! - **RiRouter**: Router for handling request routing
33//! - **RiMiddleware**: Middleware interface for request processing
34//! - **RiMiddlewareChain**: Chain of middleware for sequential execution
35//! - **RiLoadBalancer**: Load balancing for distributing requests across multiple services
36//! - **RiLoadBalancerStrategy**: Load balancing strategies (RoundRobin, LeastConnections, etc.)
37//! - **RiRateLimiter**: Rate limiting for controlling request rates
38//! - **RiRateLimitConfig**: Configuration for rate limiting
39//! - **RiCircuitBreaker**: Circuit breaker for preventing cascading failures
40//! - **RiCircuitBreakerConfig**: Configuration for circuit breakers
41//! 
42//! ## Design Principles
43//! 
44//! 1. **Modular Design**: Separate components for routing, middleware, load balancing, rate limiting, and circuit breaking
45//! 2. **Async-First**: All gateway operations are asynchronous
46//! 3. **Configurable**: Highly configurable gateway behavior through RiGatewayConfig
47//! 4. **Middleware Support**: Extensible middleware system for request processing
48//! 5. **Resilience**: Built-in circuit breaker and rate limiting for service resilience
49//! 6. **Load Balancing**: Support for distributing requests across multiple service instances
50//! 7. **CORS Support**: Built-in CORS configuration for cross-origin requests
51//! 8. **Logging**: Comprehensive logging support
52//! 9. **Service Integration**: Implements RiModule trait for seamless integration into Ri
53//! 10. **Thread-safe**: Uses Arc and RwLock for safe concurrent access
54//! 
55//! ## Usage
56//! 
57//! ```rust
58//! use ri::prelude::*;
59//! use ri::gateway::{RiGateway, RiGatewayConfig, RiRoute};
60//! use std::collections::HashMap;
61//! 
62//! async fn example() -> RiResult<()> {
63//!     // Create gateway configuration
64//!     let gateway_config = RiGatewayConfig {
65//!         listen_address: "0.0.0.0".to_string(),
66//!         listen_port: 8080,
67//!         max_connections: 10000,
68//!         request_timeout_seconds: 30,
69//!         enable_rate_limiting: true,
70//!         enable_circuit_breaker: true,
71//!         enable_load_balancing: true,
72//!         cors_enabled: true,
73//!         cors_origins: vec!["*".to_string()],
74//!         cors_methods: vec!["GET".to_string(), "POST".to_string()],
75//!         cors_headers: vec!["Content-Type".to_string(), "Authorization".to_string()],
76//!         enable_logging: true,
77//!         log_level: "info".to_string(),
78//!     };
79//!     
80//!     // Create gateway instance
81//!     let gateway = RiGateway::new();
82//!     
83//!     // Get router and add routes
84//!     let router = gateway.router();
85//!     
86//!     // Add a simple GET route
87//!     router.add_route(RiRoute {
88//!         path: "/api/v1/health".to_string(),
89//!         method: "GET".to_string(),
90//!         handler: Arc::new(|req| Box::pin(async move {
91//!             Ok(RiGatewayResponse::json(200, &serde_json::json!({ "status": "ok" }), req.id.clone())?)
92//!         })),
93//!         ..Default::default()
94//!     }).await?;
95//!     
96//!     // Add middleware
97//!     let middleware_chain = gateway.middleware_chain();
98//!     middleware_chain.add_middleware(Arc::new(|req, next| Box::pin(async move {
99//!         // Log request
100//!         println!("Request: {} {}", req.method, req.path);
101//!         next(req).await
102//!     }))).await;
103//!     
104//!     // Handle a sample request
105//!     let sample_request = RiGatewayRequest::new(
106//!         "GET".to_string(),
107//!         "/api/v1/health".to_string(),
108//!         HashMap::new(),
109//!         HashMap::new(),
110//!         None,
111//!         "127.0.0.1:12345".to_string(),
112//!     );
113//!     
114//!     let response = gateway.handle_request(sample_request).await;
115//!     println!("Response: {} {}", response.status_code, String::from_utf8_lossy(&response.body));
116//!     
117//!     Ok(())
118//! }
119//! ```
120
121use crate::core::{RiModule, RiServiceContext};
122use log;
123use serde::{Deserialize, Serialize};
124use std::collections::HashMap;
125use std::sync::Arc;
126use tokio::sync::RwLock;
127
128pub mod middleware;
129pub mod routing;
130pub mod radix_tree;
131pub mod circuit_breaker;
132pub mod load_balancer;
133pub mod rate_limiter;
134pub mod server;
135
136pub use routing::{RiRoute, RiRouter};
137pub use radix_tree::{RiRadixTree, RadixNode, PathSegment, SegmentType, RouteMatch};
138pub use middleware::{RiMiddleware, RiMiddlewareChain};
139pub use load_balancer::{RiLoadBalancer, RiLoadBalancerStrategy};
140pub use rate_limiter::{RiRateLimiter, RiRateLimitConfig};
141pub use circuit_breaker::{RiCircuitBreaker, RiCircuitBreakerConfig};
142
143#[cfg(feature = "gateway")]
144pub use server::{RiGatewayServer, load_tls_config};
145
146/// Configuration for the Ri Gateway.
147/// 
148/// This struct defines the configuration options for the API gateway, including network settings,
149/// feature toggles, and CORS configuration.
150#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass(get_all, set_all))]
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct RiGatewayConfig {
153    /// Address to listen on
154    pub listen_address: String,
155    /// Port to listen on
156    pub listen_port: u16,
157    /// Maximum number of concurrent connections
158    pub max_connections: usize,
159    /// Request timeout in seconds
160    pub request_timeout_seconds: u64,
161    /// Whether to enable rate limiting
162    pub enable_rate_limiting: bool,
163    /// Whether to enable circuit breaker
164    pub enable_circuit_breaker: bool,
165    /// Whether to enable load balancing
166    pub enable_load_balancing: bool,
167    /// Whether to enable CORS
168    pub cors_enabled: bool,
169    /// Allowed CORS origins
170    pub cors_origins: Vec<String>,
171    /// Allowed CORS methods
172    pub cors_methods: Vec<String>,
173    /// Allowed CORS headers
174    pub cors_headers: Vec<String>,
175    /// Whether to enable logging
176    pub enable_logging: bool,
177    /// Log level for gateway operations
178    pub log_level: String,
179}
180
181#[cfg(feature = "pyo3")]
182/// Python bindings for RiGatewayConfig
183#[pyo3::prelude::pymethods]
184impl RiGatewayConfig {
185    #[new]
186    fn py_new() -> Self {
187        Self::default()
188    }
189    
190    #[staticmethod]
191    fn py_new_with_address(listen_address: String, listen_port: u16) -> Self {
192        Self {
193            listen_address,
194            listen_port,
195            ..Self::default()
196        }
197    }
198}
199
200impl RiGatewayConfig {
201    /// Creates a new RiGatewayConfig with default values.
202    pub fn new() -> Self {
203        Self::default()
204    }
205}
206
207impl Default for RiGatewayConfig {
208    /// Returns the default configuration for the gateway.
209    /// 
210    /// Default values:
211    /// - listen_address: "0.0.0.0"
212    /// - listen_port: 8080
213    /// - max_connections: 10000
214    /// - request_timeout_seconds: 30
215    /// - enable_rate_limiting: true
216    /// - enable_circuit_breaker: true
217    /// - enable_load_balancing: true
218    /// - cors_enabled: true
219    /// - cors_origins: ["*"]
220    /// - cors_methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
221    /// - cors_headers: ["Content-Type", "Authorization", "X-Requested-With"]
222    /// - enable_logging: true
223    /// - log_level: "info"
224    fn default() -> Self {
225        Self {
226            listen_address: "0.0.0.0".to_string(),
227            listen_port: 8080,
228            max_connections: 10000,
229            request_timeout_seconds: 30,
230            enable_rate_limiting: true,
231            enable_circuit_breaker: true,
232            enable_load_balancing: true,
233            cors_enabled: true,
234            cors_origins: vec!["*".to_string()],
235            cors_methods: vec!["GET".to_string(), "POST".to_string(), "PUT".to_string(), "DELETE".to_string(), "OPTIONS".to_string()],
236            cors_headers: vec!["Content-Type".to_string(), "Authorization".to_string(), "X-Requested-With".to_string()],
237            enable_logging: true,
238            log_level: "info".to_string(),
239        }
240    }
241}
242
243/// Request structure for gateway operations.
244/// 
245/// This struct represents an HTTP request received by the gateway, including method, path, headers,
246/// query parameters, body, and remote address.
247#[derive(Debug, Clone)]
248#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
249pub struct RiGatewayRequest {
250    /// Unique request ID
251    pub id: String,
252    /// HTTP method (GET, POST, etc.)
253    pub method: String,
254    /// Request path
255    pub path: String,
256    /// HTTP headers
257    pub headers: HashMap<String, String>,
258    /// Query parameters
259    pub query_params: HashMap<String, String>,
260    /// Request body (if any)
261    pub body: Option<Vec<u8>>,
262    /// Remote address of the client
263    pub remote_addr: String,
264    /// Timestamp when the request was created
265    pub timestamp: std::time::Instant,
266}
267
268impl RiGatewayRequest {
269    /// Creates a new gateway request.
270    /// 
271    /// # Parameters
272    /// 
273    /// - `method`: HTTP method
274    /// - `path`: Request path
275    /// - `headers`: HTTP headers
276    /// - `query_params`: Query parameters
277    /// - `body`: Request body (optional)
278    /// - `remote_addr`: Remote address of the client
279    /// 
280    /// # Returns
281    /// 
282    /// A new `RiGatewayRequest` instance
283    pub fn new(
284        method: String,
285        path: String,
286        headers: HashMap<String, String>,
287        query_params: HashMap<String, String>,
288        body: Option<Vec<u8>>,
289        remote_addr: String,
290    ) -> Self {
291        Self {
292            id: uuid::Uuid::new_v4().to_string(),
293            method,
294            path,
295            headers,
296            query_params,
297            body,
298            remote_addr,
299            timestamp: std::time::Instant::now(),
300        }
301    }
302}
303
304/// Response structure for gateway operations.
305/// 
306/// This struct represents an HTTP response returned by the gateway, including status code,
307/// headers, body, and request ID.
308#[derive(Debug, Clone)]
309#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
310pub struct RiGatewayResponse {
311    /// HTTP status code
312    pub status_code: u16,
313    /// HTTP headers
314    pub headers: HashMap<String, String>,
315    /// Response body
316    pub body: Vec<u8>,
317    /// Request ID associated with this response
318    pub request_id: String,
319}
320
321impl RiGatewayResponse {
322    /// Creates a new gateway response.
323    /// 
324    /// # Parameters
325    /// 
326    /// - `status_code`: HTTP status code
327    /// - `body`: Response body
328    /// - `request_id`: Request ID associated with this response
329    /// 
330    /// # Returns
331    /// 
332    /// A new `RiGatewayResponse` instance
333    pub fn new(status_code: u16, body: Vec<u8>, request_id: String) -> Self {
334        let mut headers = HashMap::new();
335        headers.insert("Content-Type".to_string(), "application/json".to_string());
336        headers.insert("X-Request-ID".to_string(), request_id.clone());
337        
338        Self {
339            status_code,
340            headers,
341            body,
342            request_id,
343        }
344    }
345
346    /// Adds a header to the response.
347    /// 
348    /// # Parameters
349    /// 
350    /// - `key`: Header name
351    /// - `value`: Header value
352    /// 
353    /// # Returns
354    /// 
355    /// The updated `RiGatewayResponse` instance
356    pub fn with_header(mut self, key: String, value: String) -> Self {
357        self.headers.insert(key, value);
358        self
359    }
360
361    /// Creates a JSON response.
362    /// 
363    /// # Parameters
364    /// 
365    /// - `status_code`: HTTP status code
366    /// - `data`: Data to serialize as JSON
367    /// - `request_id`: Request ID associated with this response
368    /// 
369    /// # Returns
370    /// 
371    /// A `RiResult<Self>` containing the JSON response
372    pub fn json<T: serde::Serialize>(status_code: u16, data: &T, request_id: String) -> crate::core::RiResult<Self> {
373        let body = serde_json::to_vec(data)?;
374        Ok(Self::new(status_code, body, request_id))
375    }
376
377    /// Creates an error response.
378    /// 
379    /// # Parameters
380    /// 
381    /// - `status_code`: HTTP status code
382    /// - `message`: Error message
383    /// - `request_id`: Request ID associated with this response
384    /// 
385    /// # Returns
386    /// 
387    /// A new `RiGatewayResponse` instance with error information
388    pub fn error(status_code: u16, message: String, request_id: String) -> Self {
389        let error_body = serde_json::json!({
390            "error": message,
391            "request_id": request_id
392        });
393        
394        let body = serde_json::to_vec(&error_body).unwrap_or_else(|_| b"{}".to_vec());
395        Self::new(status_code, body, request_id)
396    }
397}
398
399/// Main gateway struct implementing the RiModule trait.
400/// 
401/// This struct provides the core gateway functionality, including request handling,
402/// routing, middleware execution, rate limiting, and circuit breaking.
403#[cfg_attr(feature = "pyo3", pyo3::prelude::pyclass)]
404pub struct RiGateway {
405    /// Gateway configuration, protected by a RwLock for thread-safe access
406    config: RwLock<RiGatewayConfig>,
407    /// Router for handling request routing
408    router: Arc<RiRouter>,
409    /// Middleware chain for request processing
410    middleware_chain: Arc<RiMiddlewareChain>,
411    /// Rate limiter for controlling request rates
412    rate_limiter: Option<Arc<RiRateLimiter>>,
413    /// Circuit breaker for preventing cascading failures
414    circuit_breaker: Option<Arc<RiCircuitBreaker>>,
415}
416
417impl Default for RiGateway {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423impl RiGateway {
424    /// Creates a new gateway instance with default configuration.
425    /// 
426    /// # Returns
427    /// 
428    /// A new `RiGateway` instance
429    pub fn new() -> Self {
430        let config = RiGatewayConfig::default();
431        let router = Arc::new(RiRouter::new());
432        let middleware_chain = Arc::new(RiMiddlewareChain::new());
433        
434        let rate_limiter = if config.enable_rate_limiting {
435            Some(Arc::new(RiRateLimiter::new(RiRateLimitConfig::default())))
436        } else {
437            None
438        };
439        
440        let circuit_breaker = if config.enable_circuit_breaker {
441            Some(Arc::new(RiCircuitBreaker::new(RiCircuitBreakerConfig::default())))
442        } else {
443            None
444        };
445
446        Self {
447            config: RwLock::new(config),
448            router,
449            middleware_chain,
450            rate_limiter,
451            circuit_breaker,
452        }
453    }
454
455    /// Returns a reference to the router.
456    /// 
457    /// # Returns
458    /// 
459    /// An Arc<RiRouter> providing thread-safe access to the router
460    pub fn router(&self) -> Arc<RiRouter> {
461        self.router.clone()
462    }
463
464    /// Returns a reference to the middleware chain.
465    /// 
466    /// # Returns
467    /// 
468    /// An Arc<RiMiddlewareChain> providing thread-safe access to the middleware chain
469    pub fn middleware_chain(&self) -> Arc<RiMiddlewareChain> {
470        self.middleware_chain.clone()
471    }
472
473    /// Handles a gateway request.
474    /// 
475    /// This method processes a request through the gateway pipeline, including:
476    /// 1. Rate limiting
477    /// 2. Circuit breaker check
478    /// 3. Middleware chain execution
479    /// 4. Request routing
480    /// 5. Route handler execution
481    /// 
482    /// # Parameters
483    /// 
484    /// - `request`: The request to handle
485    /// 
486    /// # Returns
487    /// 
488    /// A `RiGatewayResponse` containing the response to the request
489    pub async fn handle_request(&self, request: RiGatewayRequest) -> RiGatewayResponse {
490        let request_id = request.id.clone();
491        
492        // Apply rate limiting
493        if let Some(rate_limiter) = &self.rate_limiter {
494            if !rate_limiter.check_request(&request).await {
495                return RiGatewayResponse::new(429, "Rate limit exceeded".to_string().into_bytes(), request_id);
496            }
497        }
498
499        // Apply circuit breaker
500        if let Some(circuit_breaker) = &self.circuit_breaker {
501            if !circuit_breaker.allow_request() {
502                return RiGatewayResponse::new(503, "Service temporarily unavailable".to_string().into_bytes(), request_id);
503            }
504        }
505
506        // Apply middleware chain
507        let mut request = request;
508        match self.middleware_chain.execute(&mut request).await {
509            Ok(()) => {
510                // Route the request
511                match self.router.route(&request).await {
512                    Ok(route_handler) => {
513                        // Execute the route handler
514                        match route_handler(request).await {
515                            Ok(response) => response,
516                            Err(e) => {
517                                RiGatewayResponse::new(500, format!("Internal server error: {e}").into_bytes(), request_id)
518                            }
519                        }
520                    },
521                    Err(e) => {
522                        RiGatewayResponse::new(404, format!("Route not found: {e}").into_bytes(), request_id)
523                    }
524                }
525            },
526            Err(e) => {
527                RiGatewayResponse::new(403, format!("Middleware error: {e}").into_bytes(), request_id)
528            }
529        }
530    }
531}
532
533#[async_trait::async_trait]
534impl RiModule for RiGateway {
535    /// Returns the name of the gateway module.
536    /// 
537    /// # Returns
538    /// 
539    /// The module name as a string
540    fn name(&self) -> &str {
541        "Ri.Gateway"
542    }
543
544    /// Initializes the gateway module.
545    /// 
546    /// # Parameters
547    /// 
548    /// - `ctx`: Service context containing configuration and other services
549    /// 
550    /// # Returns
551    /// 
552    /// A `RiResult<()>` indicating success or failure
553    async fn init(&mut self, ctx: &mut RiServiceContext) -> crate::core::RiResult<()> {
554        let logger = ctx.logger();
555        logger.info("Ri.Gateway", "Initializing API gateway module")?;
556
557        let config = self.config.read().await;
558        logger.info(
559            "Ri.Gateway",
560            format!("Gateway will listen on {}:{}", config.listen_address, config.listen_port)
561        )?;
562
563        logger.info("Ri.Gateway", "API gateway module initialized successfully")?;
564        Ok(())
565    }
566
567    /// Performs cleanup after the gateway has shut down.
568    /// 
569    /// This method ensures proper resource cleanup during gateway shutdown:
570    /// - Clears all rate limiter buckets
571    /// - Resets the circuit breaker to closed state
572    /// - Clears all registered routes
573    /// 
574    /// # Parameters
575    /// 
576    /// - `_ctx`: Service context (not used in this implementation)
577    /// 
578    /// # Returns
579    /// 
580    /// A `RiResult<()>` indicating success or failure
581    /// 
582    /// # Logs
583    /// 
584    /// Logs cleanup progress at INFO level for debugging purposes
585    async fn after_shutdown(&mut self, _ctx: &mut RiServiceContext) -> crate::core::RiResult<()> {
586        log::info!("Cleaning up Ri Gateway Module");
587        
588        if let Some(rate_limiter) = &self.rate_limiter {
589            rate_limiter.clear_all_buckets();
590            log::info!("Rate limiter cleanup completed");
591        }
592        
593        if let Some(circuit_breaker) = &self.circuit_breaker {
594            circuit_breaker.reset();
595            log::info!("Circuit breaker reset completed");
596        }
597        
598        self.router.clear_routes();
599        log::info!("Router cleanup completed");
600        
601        log::info!("Ri Gateway Module cleanup completed");
602        Ok(())
603    }
604}
605
606#[cfg(feature = "pyo3")]
607/// Python bindings for RiGateway
608#[pyo3::prelude::pymethods]
609impl RiGateway {
610    #[new]
611    fn py_new() -> Self {
612        Self::new()
613    }
614}